Operations

DSL operators with signatures and examples.

.id('my-route')
.batch()
.from(simple('x'))
.retry(3)
.transform((s) => s + '!')
.throttle({ requestsPerSecond: 10 })
.to(log())

Operations overview

OperationCategoryDescription
idRouteSet the unique identifier for the route
batchRouteProcess exchanges in batches instead of one at a time
fromSourceDefine the source of data for the route
retryWrapperRetry the next operation on failure
throttleWrapperRate limit the next operation
sampleTransformTake every Nth exchange or time-based sampling
debounceTransformOnly pass exchanges after a quiet period
timeoutWrapperAdd timeout to the next operation
delayWrapperAdd delay before the next operation
onErrorWrapperHandle errors from the next operation
transformTransformTransform data using a function (body only)
mapTransformMap fields from source to target object
processTransformProcess data with full exchange access
headerTransformSet or override an exchange header
enrichTransformAdd additional data to current data
filterFlow ControlFilter data based on predicate
validateFlow ControlValidate data against schema
choiceFlow ControlRoute to different paths based on conditions
splitFlow ControlSplit arrays into individual items
aggregateFlow ControlCombine multiple items into single result
multicastFlow ControlSend exchange to multiple destinations
loopFlow ControlRepeat operations while condition is true
tapSide EffectExecute side effects without changing data
toDestinationSend data to destination

Operation scope and ordering

  • Route operations (e.g. id, batch) configure the route itself and apply to the entire route. They configure the next route created by from().

    • Place them before from().
    • If called after a route already exists in the chain, they are staged and will apply to the next from() (they do not change the current route).
  • Wrapper operations (e.g. retry, throttle, timeout, delay, onError) wrap the next operation only.

    • Place them immediately before the operation they should affect.
    • Multiple wrappers can be stacked; they will all apply to the next single operation.

Route operations

Route operations configure the route itself and apply to the entire route. They configure the next route created by from(). Place them before from(). If called after an existing route, they are staged for the next from().

id

id(routeId: string): RouteBuilder<Current>

Set the unique identifier for the next route. Place before from(). If called after a route already exists, it is staged and applies to the next from() (it does not rename the current route).

craft()
  .id('data-processor')
  .from(source)
  .to(destination)

// If called after an existing route, id() is staged for the next route
// (does not change the current route)
craft()
  .from(source)
  .id('next-route-id')
  .from(otherSource)
  .to(destination)

If no ID is specified, a random UUID will be generated automatically.

batch

batch(options?: { size?: number; flushIntervalMs?: number }): RouteBuilder<Current>

Process exchanges in batches instead of one at a time. Useful for bulk operations like database inserts or API batch requests.

craft()
  .id('bulk-processor')
  .batch({ size: 50, flushIntervalMs: 5000 })
  .from(timer({ intervalMs: 1000 }))
  .to(database({ operation: 'bulkInsert' }))

Options:

  • size - Maximum exchanges per batch (default: 100)
  • flushIntervalMs - Maximum wait time before flushing partial batch (default: 5000ms)

Linting: route-level positioning

Use the ESLint rule @routecraft/routecraft/batch-before-from to ensure batch() is placed before .from(). See Linting Rules.

Incompatible with synchronous sources

The batch() operation only works with asynchronous message sources like timer(). It cannot be used with direct() sources because direct endpoints are synchronous and blocking—each sender waits for the consumer to fully process a message before the next can be sent, preventing message accumulation.

If you need to combine multiple messages from split branches, use the aggregate() operation instead.

Wrapper operations

Wrapper operations modify the behavior of the next operation in the chain. They create a wrapper around the subsequent step to add cross-cutting concerns.

retry wip

retry(attempts: number, options?: { backoffMs?: number; exponential?: boolean }): RouteBuilder<Current>

Retry the next operation on failure. The retry logic wraps whatever operation comes next.

craft()
  .id('resilient-processor')
  .from(source)
  .retry(3, { backoffMs: 1000, exponential: true })
  .transform(unreliableTransformation) // This transform will be retried
  .to(destination)

Parameters:

  • attempts - Maximum retry attempts
  • backoffMs - Base delay between retries (default: 1000ms)
  • exponential - Use exponential backoff (default: false)

throttle wip

throttle(options: { requestsPerSecond: number } | { requestsPerMinute: number }): RouteBuilder<Current>

Rate limit the next operation to prevent overwhelming downstream systems.

craft()
  .id('rate-limited-api')
  .from(source)
  .throttle({ requestsPerSecond: 10 })
  .process(apiCall) // API calls will be throttled to 10/second
  .to(destination)

timeout wip

timeout(timeoutMs: number): RouteBuilder<Current>

Add a timeout to the next operation. If the operation takes longer than specified, it will be cancelled.

craft()
  .id('timeout-protected')
  .from(source)
  .timeout(5000)
  .process(slowOperation) // Operation will timeout after 5 seconds
  .to(destination)

delay wip

delay(delayMs: number): RouteBuilder<Current>

Add a fixed delay before executing the next operation. Useful for rate limiting or adding processing delays.

craft()
  .id('delayed-processor')
  .from(source)
  .delay(1000)
  .process(operation) // Operation will execute after 1 second delay
  .to(destination)

onError wip

onError(handler: (error: Error, exchange: Exchange<Current>) => Exchange<Current> | void): RouteBuilder<Current>

Handle errors from the next operation. If the next operation fails, the error handler is invoked.

craft()
  .id('error-resilient')
  .from(source)
  .onError((error, exchange) => {
    logger.warn('Operation failed, using fallback', { error })
    return { ...exchange, body: { fallback: true } }
  })
  .transform(riskyOperation) // Errors from this transform will be handled
  .to(destination)

Source operations

from

from<T>(src: Source<T> | CallableSource<T>): RouteBuilder<T>

Define the source of data for the route. This operation creates the route and must come after any route configuration operations.

// Simple source
.id('timer-route')
.from(timer({ intervalMs: 1000 }))

// HTTP server source
.id('webhook-handler')
.from(httpServer({ port: 3000 }))

// Callable source
.id('data-fetcher')
.from(async () => await fetchData())

Transform operations

transform

transform<Next>(fn: Transformer<Current, Next> | CallableTransformer<Current, Next>): RouteBuilder<Next>

Transform the exchange body using a function. The function receives only the body and returns the new body.

.transform((body: string) => body.toUpperCase())
.transform(async (user) => await enrichUserData(user))
header(key: string, valueOrFn: HeaderValue | ((exchange: Exchange<Current>) => HeaderValue | Promise<HeaderValue>)): RouteBuilder<Current>

Set or override a header on the exchange. The body remains unchanged.

// Static header
.header('x-env', 'prod')

// Derived from body
.header('user.id', (exchange) => exchange.body.id)

// Derived from headers
.header('correlation', (exchange) => exchange.headers['x-request-id'])

// Async derived value
.header('request.trace', async (exchange) => await computeTrace(exchange.body))

// Override an existing header later in the chain
.header('x-env', 'staging')

map

map<Return>(fieldMappings: Record<keyof Return, (src: Current) => Return[keyof Return]>): RouteBuilder<Return>

Map fields from the current data to create a new object of a specified type. This is a specialized transformer that creates a new object by mapping fields from the source object.

// Map from API response to database model
.map<DbUser>({
  id: (apiUser) => apiUser.userId,
  name: (apiUser) => apiUser.fullName,
  email: (apiUser) => apiUser.emailAddress
})

// Transform with computed fields
.map<Summary>({
  fullName: (user) => `${user.firstName} ${user.lastName}`,
  isActive: (user) => user.status === 'active',
  displayEmail: (user) => user.email.toLowerCase()
})

// Map complex nested data
.map<OrderSummary>({
  orderId: (order) => order.id,
  customerName: (order) => order.customer.name,
  totalAmount: (order) => order.items.reduce((sum, item) => sum + item.price, 0),
  itemCount: (order) => order.items.length
})

process

process<Next = Current>(fn: Processor<Current, Next> | CallableProcessor<Current, Next>): RouteBuilder<Next>

Process the exchange with full access to headers, body, and context. Use when you need more control than transform.

.process((exchange) => {
  const userId = exchange.headers.get('user-id')
  return {
    ...exchange.body,
    processedBy: userId,
    timestamp: new Date().toISOString()
  }
})

enrich

enrich<R = Current>(enricher: Enricher<Current, Partial<R>> | CallableEnricher<Current, Partial<R>>, aggregator?: EnrichAggregator<Current, Partial<R>>): RouteBuilder<R>

Add additional data to the current exchange body by calling an enricher function.

.enrich(async (user) => ({
  profile: await fetchUserProfile(user.id),
  permissions: await getUserPermissions(user.id)
}))

Flow control operations

filter

filter(fn: Filter<Current> | CallableFilter<Current>): RouteBuilder<Current>

Filter exchanges based on a predicate. The predicate receives the full Exchange object, allowing you to filter based on headers, body, or other exchange properties. Only exchanges that return true continue through the route.

// Filter based on body properties
.filter((exchange) => exchange.body.isActive)
.filter(async (exchange) => await isValidOrder(exchange.body))

// Filter based on headers
.filter((exchange) => exchange.headers['x-priority'] === 'high')
.filter((exchange) => exchange.headers['user-role'] === 'admin')

// Filter based on multiple criteria
.filter((exchange) => 
  exchange.body.status === 'active' && 
  exchange.headers['x-environment'] === 'production'
)

Filter vs Transform

Unlike .transform() which receives only the body, .filter() receives the full Exchange object. This allows filtering based on headers, correlation IDs, or other exchange metadata, not just the message body. This aligns with Apache Camel's Filter EIP behavior.

validate

validate(schema: StandardSchemaV1): RouteBuilder<Current>

Validate the exchange body against a schema. Invalid exchanges will cause the route to emit an error event.

import { z } from 'zod'

const userSchema = z.object({
  id: z.string(),
  email: z.string().email(),
  age: z.number().min(0)
})

.validate(userSchema)

choice wip

choice<T = Current>(routes: Array<{ when: (body: Current) => boolean; then: RouteBuilder<T> }>): RouteBuilder<T>

Route exchanges to different processing paths based on conditions. Like a switch statement for data flows.

.choice([
  {
    when: (order) => order.priority === 'urgent',
    then: craft().transform(priorityProcessing).to(urgentQueue)
  },
  {
    when: (order) => order.amount > 1000,
    then: craft().transform(highValueProcessing).to(reviewQueue)
  },
  {
    when: () => true, // default case
    then: craft().to(standardQueue)
  }
])

split

split<Item = Current extends Array<infer U> ? U : never>(fn?: (body: Current) => Item[]): RouteBuilder<Item>

Split arrays into individual items. Each item becomes a separate exchange with a new UUID and copied headers from the original exchange.

Similar to Apache Camel's Splitter EIP, the split function receives the message body and returns an array of items. The framework automatically creates exchanges for each item.

// Split array automatically
.split() // [1, 2, 3] becomes three exchanges: 1, 2, 3

// Extract nested array
.split((body) => body.items)

// Split string by delimiter
.split((body) => body.split(","))

// Transform items during split
.split((body) => body.users.map(u => u.id))

Key behaviors:

  • Each split item gets a new exchange with a unique UUID
  • Headers from the original exchange are copied to all split exchanges
  • Split hierarchy is tracked automatically for aggregation

aggregate

aggregate<R>(fn?: Aggregator<Current, R> | CallableAggregator<Current, R>): RouteBuilder<R>

Combine multiple exchanges into a single result. Useful after split to recombine processed items.

If no aggregator is provided, exchange bodies are automatically collected into an array. If any body is an array, all arrays are flattened and combined with scalar values into a single flattened array.

// Automatically collect bodies into an array
.split()
.process((exchange) => ({ ...exchange, body: exchange.body * 2 }))
.aggregate() // Returns array of processed items: [2, 4, 6]

// Arrays are automatically flattened
// Input: [1, [2, 3], 4, [5, 6]]
// Output: [1, 2, 3, 4, 5, 6] (flattened)

// Mixed arrays and scalars are combined
// Input: [[1, 2], 3, [4, 5]]
// Output: [1, 2, 3, 4, 5] (arrays flattened, scalars added)

// Custom aggregation logic
.aggregate((items) => ({
  totalCount: items.length,
  processedAt: new Date().toISOString(),
  items
}))

multicast wip

multicast(destinations: Array<RouteBuilder<any>>): RouteBuilder<Current>

Send the same exchange to multiple destinations simultaneously. Each destination receives a copy of the exchange.

.multicast([
  craft().to(database),
  craft().to(auditLog),
  craft().transform(formatForAnalytics).to(analyticsService)
])

loop wip

loop(condition: (body: Current, iteration: number) => boolean, maxIterations?: number): RouteBuilder<Current>

Repeat the subsequent operations while the condition remains true. Includes safeguards to prevent infinite loops.

.loop(
  (data, iteration) => data.hasMore && iteration < 10,
  10 // max iterations safeguard
)
.transform(processPage)
.process(fetchNextPage)

sample wip

sample(options: { every?: number; intervalMs?: number }): RouteBuilder<Current>

Take every Nth exchange or sample at time intervals. Useful for reducing data volume while maintaining representativeness.

// Take every 5th exchange
.sample({ every: 5 })

// Sample every 10 seconds (first exchange in each window)
.sample({ intervalMs: 10000 })

// Typical use: Reduce high-frequency data
.id('metrics-sampling')
.from(direct('high-frequency-metrics'))
.sample({ every: 100 }) // Only process 1% of metrics
.to(database({ operation: 'save' }))

debounce wip

debounce(options: { quietMs: number }): RouteBuilder<Current>

Only pass exchanges after a specified quiet period with no new exchanges. Useful for handling bursts of similar events.

// Wait for 1 second of quiet before processing
.debounce({ quietMs: 1000 })

// Typical use: Batch file system changes
.id('file-watcher')
.from(file({ path: './config', watch: true }))
.debounce({ quietMs: 500 }) // Wait for editing to finish
.process(reloadConfig)

Side effect operations

tap

tap(fn: Tap<Current> | CallableTap<Current> | RouteBuilder<any>): RouteBuilder<Current>

Execute side effects without changing the exchange. The tap operation receives a deep copy of the exchange and runs non-blocking - errors in the tap won't affect the main route. Perfect for logging, auditing, notifications, or any fire-and-forget side effects.

// Simple function-based tapping
.tap(log()) // Built-in logging
.tap((exchange) => console.log('Processing:', exchange.body))
.tap(async (exchange) => await sendNotification(exchange.body))

// Multiple taps for different concerns
.tap(analytics())
.tap(monitoring())
.to(primaryDestination)

Key behaviors:

  • Non-blocking: Main route doesn't wait for tap to complete and continues even if tap fails
  • Copy semantics: Tap receives a deep copy, not the original exchange
  • Error isolation: Errors in tap don't affect the main route (they're logged but not thrown)
  • Perfect for: Logging, auditing, notifications, analytics, monitoring

Destination operations

to

to(dest: Destination<Current> | CallableDestination<Current>): RouteBuilder<Current>

Send the exchange to a destination. This is typically the final operation in a route.

.to(log()) // Log the final result
.to(database.insert()) // Insert into database
.to(async (data) => await sendToWebhook(data))