concurrency
concurrency(options: {
max: number
mode?: 'queue' | 'reject'
maxQueue?: number
key?: (exchange: Exchange) => string
maxKeys?: number
label?: string
}): RouteBuilder<Current>
Bound how many exchanges run an operation AT ONCE (a bulkhead). Where .throttle() caps a RATE (calls per time window), .concurrency() caps SIMULTANEITY (how many are in flight at the same instant): protect a connection pool, a memory-bound step, or a downstream with a hard concurrency cap. The two compose but are not substitutes, a 10/sec throttle still allows unbounded simultaneous calls if each is slow.
craft()
.id('reserve-inventory')
.from(source)
.concurrency({ max: 5 }) // at most 5 reservations in flight at once
.to(http({ url: 'https://inventory.internal/reserve' })) // bounded
.transform(formatReceipt) // NOT bounded
Mental model: A pool of max slots. An exchange takes a slot before the wrapped work and frees it the moment the work settles (success, drop, or failure). When every slot is busy:
queue mode (default): wait FIFO for a slot (backpressure), bounded by maxQueue
reject mode: fail fast with RC5026 (no slot, no wait)
Parameters:
max- maximum simultaneous in-flight exchanges. A finite integer >= 1.mode- what to do when all slots are busy."queue"(default) waits FIFO for a slot;"reject"fails fast withRC5026. Mirrors.throttle()'sdelay/reject.maxQueue- queue mode only: cap the wait line. Whenmaxslots are busy ANDmaxQueueexchanges already wait, the next one fails fast withRC5026instead of joining the queue. A finite integer >= 1; omit for an unbounded queue. Passing it in reject mode is a build error (reject ismaxQueue: 0).key- partition the limit so each distinct key gets its own independent pool (per user / tenant / connection pool). The selector runs once per exchange and must return a string; coalesce missing values (?? "anonymous").maxKeys- cap on distinct keys tracked at once whenkeyis set; per-key pools live in a bounded LRU. Default10_000.label- tag carried on this limiter's events so sibling bulkheads can be told apart.
Invalid options are rejected at build time (RC5003).
Dual mode: route scope vs step scope
Like the other resilience wrappers, position decides scope.
Before .from() (route scope): the bulkhead bounds the whole pipeline at the INNERMOST resilience position, inside .retry() and .timeout(). Innermost means a slot is acquired per attempt and released between retry backoffs, so a scarce slot is never held while a retry sleeps.
craft()
.id('bounded-pipeline')
.concurrency({ max: 10 })
.from(queue('jobs'))
.to(db.insert(...))
After .from() (step scope): the bulkhead wraps only the immediately-next step. Later steps run unbounded.
craft()
.id('enrich-order')
.from(direct())
.concurrency({ max: 5, mode: 'reject' })
.to(http({ url: 'https://inventory.api/check' })) // bounded, sheds load
.transform(formatResponse) // NOT bounded
The two compose: a route-scope bulkhead over the whole pipeline plus a tighter step-scope one on a single scarce call. Multiple .concurrency() calls stack and nest (for example a global max plus a per-key max).
State is per route
The slot pool is shared across every exchange on the route, not per exchange, so simultaneity is bounded route-wide. A definition registered into multiple contexts gets an independent pool per route, so the contexts never steal each other's slots. State is in-memory and per instance; sharing a bulkhead across instances is a future addition built on the shared-store abstraction.
Interaction with .error() and .retry()
When the bulkhead rejects (reject mode, or a full maxQueue) it throws RC5026, which flows to a route-scope .error() handler if one is defined, so you can shed load deliberately (for example return a 503):
.error((err) => {
if (err.rc === 'RC5026') throw err // surface backpressure to the caller
throw err
})
RC5026 is retryable: a slot frees as soon as in-flight work completes, so an enclosing .retry() (which sits OUTSIDE the bulkhead) can back off and re-acquire one. That gives a useful composition, "do not queue indefinitely, retry-with-backoff instead":
.retry({ maxAttempts: 4, backoffMs: 50, factor: 2 })
.concurrency({ max: 8, mode: 'reject' })
.to(http({ url }))
This differs from .throttle()'s reject (RC5013), which sits OUTSIDE retry and so can only be handled by .error(), never re-attempted by retry. The difference is a direct consequence of the bulkhead's innermost placement.
.concurrency() vs .throttle()
.concurrency({ max }) | .throttle({ rate, per }) | |
|---|---|---|
| Bounds | Simultaneous in-flight (how many at once) | Rate (how many per time window) |
| Protects | Connection pools, memory, hard concurrency caps | Downstream rate limits, fair pacing |
| Over-limit | Queue (backpressure) or reject (RC5026) | Delay (pace) or reject (RC5013) |
| Chain position | Innermost resilience (inside retry/timeout) | Outermost resilience (#5, outside retry/timeout) |
They compose: rate-limit AND cap simultaneity by declaring both.
Events
The bulkhead emits the route:concurrency:* family. See the events reference for payload shapes. scope is "route" when declared before .from() and "step" for the wrapper after it.
route:concurrency:queued- all slots were busy; the exchange joined the wait queue (queue mode).route:concurrency:acquired- a slot was acquired and the wrapped work began (waitedtells you whether it had to queue first).route:concurrency:released- the held slot was freed when the work settled.route:concurrency:rejected- the exchange was fast-failed withRC5026(reasonis"busy"or"queue-full").