Routes
Build focused data processing pipelines with a fluent DSL.
What are routes?
Routes are isolated data processing pipelines that flow from a source through processing steps to one or more destinations, with the final exchange returned to the source. Each route has a single responsibility and runs independently from other routes.
import { craft, http, log } from '@routecraft/routecraft'
export default craft()
.id('user-processor')
.from(http({ path: '/users', method: 'POST' }))
.process(request => ({
id: Date.now(),
name: request.name,
email: request.email
}))
.tap(log())
Route anatomy
Every route consists of three main parts:
1. Route configuration
Configure the route before defining data flow:
craft()
.id('data-pipeline') // Unique identifier
.batch({ size: 100 }) // Batch processing (optional)
2. Source definition
Define where data comes from:
.from(timer({ intervalMs: 5000 })) // Timer source
.from(http({ path: '/webhook', method: 'POST' })) // HTTP endpoint
.from(direct('jobs')) // Channel source
3. Processing pipeline
Transform, filter, and route data:
.filter(row => row.status === 'active') // Filter items
.transform(row => ({ name: row.name })) // Transform data
.sample({ every: 10 }) // Sample data
.tap(log()) // Side effects
.to(fetch({ url: 'https://api.com' })) // Final output
Route lifecycle
Routes follow a predictable lifecycle within a context:
- Registration - Route definitions are added to context
- Validation - Routes are checked for valid configuration
- Starting - Sources begin producing data
- Processing - Exchanges flow through the pipeline
- Response - Final exchange is returned to the source
- Stopping - Sources are shut down gracefully
Exchange flow pattern
The key to understanding RouteCraft routes is the exchange flow pattern: data flows from source → processing → destinations → back to source.
Source → Operations → Destination
This pattern is especially important for request-response sources like HTTP servers, where the final exchange becomes the response sent back to the client.
Source types and final exchange handling
Different source types handle the final exchange differently:
HTTP routes (pathful): Accept requests at a path and method. The final exchange body becomes the HTTP response. – http({ path, method })
Pathless routes: Triggered by timers, channels, watchers, or jobs. They do not return a response to a caller. – timer() – scheduled jobs – direct(name) – inter-route messaging – file watchers, queues, or custom sources
Subscription Sources (long-running connections):
queue()- Maintains connection to message queue until shutdownwebsocket()- Keeps connection open until client disconnects- Custom streaming sources
// HTTP route: final exchange becomes response
craft()
.id('api-endpoint')
.from(http({ path: '/users', method: 'GET' }))
.process(() => ({ users: [...] }))
// Timer route: scheduled job, no response
craft()
.id('periodic-task')
.from(timer({ intervalMs: 60000 }))
.process(processData)
.to(log())
// Channel route: message-driven, runs until shutdown
craft()
.id('message-processor')
.from(direct('tasks'))
.process(async (task) => await processTask(task))
.to(log())
Route execution patterns
Understanding how different sources behave is crucial for route design:
One-Shot Execution (process once and complete):
simple()with static data- Custom sources that produce finite data
Request-Driven Execution (process per request):
http()server endpoints
Scheduled Execution (process on schedule):
timer()with intervals or exact times- Custom time-based triggers
Continuous Execution (process until shutdown):
direct()consumers- WebSocket or queue consumers (custom adapters)
// One-shot: Processes once then stops
craft()
.id('data-import')
.from(simple(['item1', 'item2', 'item3']))
.transform(item => ({ name: item }))
.to(json({ path: './output.json' }))
// Completes after processing all 3 items
// Continuous: Runs until manually stopped
craft()
.id('live-updates')
.from(timer({ intervalMs: 5000 }))
.process(() => ({ timestamp: Date.now() }))
.to(json({ path: './metrics.json', mode: 'append' }))
// Keeps running every 5 seconds until context stops
const ctx = context()
.routes([userRoute, orderRoute, notificationRoute])
.build()
// Start all routes
await ctx.start()
// Routes are now processing data...
// Stop all routes gracefully
await ctx.stop()
Route patterns
RouteCraft supports several common integration patterns:
- Data transformation - ETL pipelines that transform data formats
- HTTP APIs - Request-response endpoints with validation and processing
- Scheduled jobs - Time-based automation and reporting
- Event processing - Message routing based on content or conditions
- Batch processing - High-throughput bulk operations
For complete working examples of these patterns, see:
- File to HTTP - Read CSV file and send rows to API
- Sample Metrics - Collect metrics with sampling to reduce storage
- Webhook Router - Route webhook events to different destinations
- HTTP Server - Simple REST API endpoint
- Batch Processing - Process items in groups for efficiency
Best practices
Route organization
- One concern per route - Keep routes focused on a single responsibility
- Meaningful IDs - Use descriptive route identifiers like
user-processorordaily-report - File naming - Use
.route.tssuffix for route files
// ✅ Good: Focused responsibility
src/routes/user-registration.route.ts
src/routes/order-fulfillment.route.ts
src/routes/email-notifications.route.ts
// ❌ Bad: Mixed responsibilities
src/routes/everything.route.ts
Error handling
- Validate early - Use
.validate()to catch invalid data at the source - Handle failures gracefully - Use
.onError()for error recovery - Monitor with events - Subscribe to error events for alerting
craft()
.id('resilient-processor')
.from(source)
.validate(inputSchema) // Validate early
.onError(handleError) // Handle errors
.retry(3) // Retry failures
.transform(processData)
.to(destination)
Performance considerations
- Use batch processing - For high-throughput scenarios
- Implement backpressure - Don't overwhelm downstream systems
- Monitor resource usage - Track memory and CPU usage
// High-throughput route
craft()
.id('high-volume-processor')
.batch({ size: 100 })
.from(queue({ name: 'high-volume-queue' }))
.throttle({ requestsPerSecond: 50 })
.transform(processItems)
.to(destination)
Inter-route communication
- Use channels - For decoupled communication between routes
- Avoid shared state - Keep routes independent
- Design for failure - Routes should work even if others fail
// Producer route
craft()
.id('data-producer')
.from(source)
.transform(processData)
.to(direct('processed-data'))
// Consumer route
craft()
.id('data-consumer')
.from(direct('processed-data'))
.transform(enrichData)
.to(destination)
Testing routes
Routes are designed to be easily testable with standard testing frameworks. Use the spy() adapter to record interactions and assert behavior throughout your route pipeline.