Composing Capabilities
Connect capabilities together to build multi-stage pipelines.
The direct() adapter is an in-process channel that lets one capability hand off data to another. Each capability stays focused on a single concern; direct() connects them without coupling the files.
Linear chain
The simplest pattern: one capability fetches data, passes it to a processor, which passes it to a notifier.
// capabilities/fetch-orders.ts
export default craft()
.id('orders.fetch')
.from(timer({ intervalMs: 300_000 }))
.transform(fetchNewOrders)
.to(direct('orders.process'))
// capabilities/process-orders.ts
export default craft()
.id('orders.process')
.from(direct('orders.process', {}))
.transform(fulfillOrder)
.to(direct('orders.notify'))
// capabilities/notify-orders.ts
export default craft()
.id('orders.notify')
.from(direct('orders.notify', {}))
.to(http({ method: 'POST', path: '/notifications' }))
The channel name is just a string -- use a namespaced convention (e.g. domain.stage) to keep them readable as the project grows.
Fan-out
To send to multiple downstream capabilities, use .tap() for all but the primary output. .tap() is fire-and-forget and does not alter the exchange.
// capabilities/ingest-event.ts
export default craft()
.id('events.ingest')
.from(http({ path: '/events', method: 'POST' }))
.tap(direct('events.audit'))
.tap(direct('events.metrics'))
.to(direct('events.process'))
// capabilities/audit-event.ts
export default craft()
.id('events.audit')
.from(direct('events.audit', {}))
.to(json({ path: './logs/audit.jsonl' }))
// capabilities/metrics-event.ts
export default craft()
.id('events.metrics')
.from(direct('events.metrics', {}))
.transform(({ type }) => ({ counter: type }))
.to(http({ method: 'POST', path: '/metrics' }))
Dynamic routing
The destination channel can be resolved at runtime from the exchange body or headers. This lets a single capability route to different consumers without knowing them all in advance.
// capabilities/route-by-priority.ts
export default craft()
.id('jobs.route')
.from(http({ path: '/jobs', method: 'POST' }))
.to(direct((exchange) => `jobs.${exchange.body.priority}`))
// capabilities/high-priority.ts
export default craft()
.id('jobs.high')
.from(direct('jobs.high', {}))
.transform(processUrgent)
.to(log())
// capabilities/normal-priority.ts
export default craft()
.id('jobs.normal')
.from(direct('jobs.normal', {}))
.transform(processNormal)
.to(log())
Schema validation on receive
The source side of direct() accepts a schema option. RouteCraft validates the incoming body before the capability runs and throws RC5002 if validation fails.
import { z } from 'zod'
export default craft()
.id('orders.process')
.from(direct('orders.process', {
schema: z.object({
orderId: z.string(),
items: z.array(z.string()),
}),
}))
.transform(fulfillOrder)
.to(log())
How direct() knows its role
direct() is overloaded -- the number of arguments determines whether it acts as a source or destination:
direct('channel', options)-- two arguments, acts as a source (.from())direct('channel')-- one argument, acts as a destination (.to(),.tap())
One channel name, one import, two roles.
Related
Capabilities
Author small, focused capabilities using the DSL.
Adapters reference
Full catalog with all options and signatures.