concurrency

← All operations

concurrency(options: {
  max: number
  mode?: 'queue' | 'reject'
  maxQueue?: number
  key?: (exchange: Exchange) => string
  maxKeys?: number
  label?: string
}): RouteBuilder<Current>

Bound how many exchanges run an operation AT ONCE (a bulkhead). Where .throttle() caps a RATE (calls per time window), .concurrency() caps SIMULTANEITY (how many are in flight at the same instant): protect a connection pool, a memory-bound step, or a downstream with a hard concurrency cap. The two compose but are not substitutes, a 10/sec throttle still allows unbounded simultaneous calls if each is slow.

craft()
  .id('reserve-inventory')
  .from(source)
  .concurrency({ max: 5 }) // at most 5 reservations in flight at once
  .to(http({ url: 'https://inventory.internal/reserve' })) // bounded
  .transform(formatReceipt) // NOT bounded

Mental model: A pool of max slots. An exchange takes a slot before the wrapped work and frees it the moment the work settles (success, drop, or failure). When every slot is busy:

queue mode (default):  wait FIFO for a slot (backpressure), bounded by maxQueue
reject mode:           fail fast with RC5026 (no slot, no wait)

Parameters:

  • max - maximum simultaneous in-flight exchanges. A finite integer >= 1.
  • mode - what to do when all slots are busy. "queue" (default) waits FIFO for a slot; "reject" fails fast with RC5026. Mirrors .throttle()'s delay / reject.
  • maxQueue - queue mode only: cap the wait line. When max slots are busy AND maxQueue exchanges already wait, the next one fails fast with RC5026 instead of joining the queue. A finite integer >= 1; omit for an unbounded queue. Passing it in reject mode is a build error (reject is maxQueue: 0).
  • key - partition the limit so each distinct key gets its own independent pool (per user / tenant / connection pool). The selector runs once per exchange and must return a string; coalesce missing values (?? "anonymous").
  • maxKeys - cap on distinct keys tracked at once when key is set; per-key pools live in a bounded LRU. Default 10_000.
  • label - tag carried on this limiter's events so sibling bulkheads can be told apart.

Invalid options are rejected at build time (RC5003).

Dual mode: route scope vs step scope

Like the other resilience wrappers, position decides scope.

Before .from() (route scope): the bulkhead bounds the whole pipeline at the INNERMOST resilience position, inside .retry() and .timeout(). Innermost means a slot is acquired per attempt and released between retry backoffs, so a scarce slot is never held while a retry sleeps.

craft()
  .id('bounded-pipeline')
  .concurrency({ max: 10 })
  .from(queue('jobs'))
  .to(db.insert(...))

After .from() (step scope): the bulkhead wraps only the immediately-next step. Later steps run unbounded.

craft()
  .id('enrich-order')
  .from(direct())
  .concurrency({ max: 5, mode: 'reject' })
  .to(http({ url: 'https://inventory.api/check' })) // bounded, sheds load
  .transform(formatResponse) // NOT bounded

The two compose: a route-scope bulkhead over the whole pipeline plus a tighter step-scope one on a single scarce call. Multiple .concurrency() calls stack and nest (for example a global max plus a per-key max).

State is per route

The slot pool is shared across every exchange on the route, not per exchange, so simultaneity is bounded route-wide. A definition registered into multiple contexts gets an independent pool per route, so the contexts never steal each other's slots. State is in-memory and per instance; sharing a bulkhead across instances is a future addition built on the shared-store abstraction.

Interaction with .error() and .retry()

When the bulkhead rejects (reject mode, or a full maxQueue) it throws RC5026, which flows to a route-scope .error() handler if one is defined, so you can shed load deliberately (for example return a 503):

.error((err) => {
  if (err.rc === 'RC5026') throw err // surface backpressure to the caller
  throw err
})

RC5026 is retryable: a slot frees as soon as in-flight work completes, so an enclosing .retry() (which sits OUTSIDE the bulkhead) can back off and re-acquire one. That gives a useful composition, "do not queue indefinitely, retry-with-backoff instead":

.retry({ maxAttempts: 4, backoffMs: 50, factor: 2 })
.concurrency({ max: 8, mode: 'reject' })
.to(http({ url }))

This differs from .throttle()'s reject (RC5013), which sits OUTSIDE retry and so can only be handled by .error(), never re-attempted by retry. The difference is a direct consequence of the bulkhead's innermost placement.

.concurrency() vs .throttle()

.concurrency({ max }).throttle({ rate, per })
BoundsSimultaneous in-flight (how many at once)Rate (how many per time window)
ProtectsConnection pools, memory, hard concurrency capsDownstream rate limits, fair pacing
Over-limitQueue (backpressure) or reject (RC5026)Delay (pace) or reject (RC5013)
Chain positionInnermost resilience (inside retry/timeout)Outermost resilience (#5, outside retry/timeout)

They compose: rate-limit AND cap simultaneity by declaring both.

Events

The bulkhead emits the route:concurrency:* family. See the events reference for payload shapes. scope is "route" when declared before .from() and "step" for the wrapper after it.

  • route:concurrency:queued - all slots were busy; the exchange joined the wait queue (queue mode).
  • route:concurrency:acquired - a slot was acquired and the wrapped work began (waited tells you whether it had to queue first).
  • route:concurrency:released - the held slot was freed when the work settled.
  • route:concurrency:rejected - the exchange was fast-failed with RC5026 (reason is "busy" or "queue-full").