Reference
Adapters
Full catalog of adapters with signatures and options.
Adapter overview
| Adapter | Category | Description | Types |
|---|---|---|---|
simple | Core | Static or dynamic data sources | Source |
log | Core | Console logging for debugging | Destination |
timer | Core | Scheduled/recurring execution | Source |
cron | Core | Cron-scheduled execution with timezone support | Source |
direct | Core | Synchronous inter-route communication | Source, Destination |
http | Core | Outbound HTTP client requests (inbound/server support planned) | Destination |
noop | Test | No-operation placeholder | Destination |
pseudo | Test | Typed placeholder for docs/examples | Source, Destination, Processor |
spy | Test | Records exchanges for assertions | Destination, Processor |
file | File | Read/write text files | Source, Destination |
json | File | JSON file handling with parsing | Source, Destination, Transformer |
csv | File | CSV file processing | Source, Destination |
jsonl | File | JSON Lines file processing | Source, Destination |
html | File | HTML parsing and file handling | Source, Destination, Transformer |
mail | Messaging | Read email via IMAP or send via SMTP | Source, Destination |
agentBrowser | Browser | Automate a browser session (navigate, click, snapshot, etc.) | Destination |
mcp | AI | Expose capabilities as MCP tools or call remote MCP servers | Source, Destination |
llm | AI | Call a language model and get text or structured output | Destination |
agent | AI | Run an LLM with a fixed system prompt (inline or registered) | Destination |
embedding | AI | Generate vector embeddings from text | Destination |
Core adapters
simple
simple<T>(producer: (() => T | Promise<T>) | T): Source<T>
Create a static or dynamic data source. When the producer returns an array, each element becomes a separate exchange processed independently through the pipeline.
// Static value
.id('hello-route')
.from(simple('Hello, World!'))
// Array of values (each becomes a separate exchange)
.id('items-route')
.from(simple(['item1', 'item2', 'item3']))
// Dynamic function
.id('api-route')
.from(simple(async () => {
const response = await fetch('https://api.example.com/data')
return response.json()
}))
// With custom ID
.id('data-loader')
.from(simple(() => loadData()))
Use cases: Testing, static data, API polling, file reading
log
log<T>(formatter?: (exchange: Exchange<T>) => unknown, options?: LogOptions): Destination<T, void>
Log messages to the console. Can be used as a destination with .to() or for side effects with .tap().
// Log final result (default: logs exchange ID, body, and headers at info level)
.to(log())
// Log intermediate data without changing flow
.tap(log())
// Log with custom formatter function
.tap(log((ex) => `Exchange with id: ${ex.id}`))
.tap(log((ex) => `Body: ${JSON.stringify(ex.body)}`))
.tap(log((ex) => `Exchange with uuid: ${ex.headers.uuid}`))
// Log at different levels
.tap(log(undefined, { level: 'debug' }))
.tap(log((ex) => ex.body, { level: 'warn' }))
.tap(log((ex) => ex.body, { level: 'error' }))
// For debug logging, use the convenience helper
.tap(debug())
.tap(debug((ex) => ex.body))
Log Levels:
trace- Most verbosedebug- Development/debugging (usedebug()helper)info- Default levelwarn- Warningserror- Errorsfatal- Critical failures
Output format:
- Without formatter: Logs exchange ID, body, and headers in a clean format
- With formatter: Logs the value returned by the formatter function
debug
debug<T>(formatter?: (exchange: Exchange<T>) => unknown): Destination<T, void>
Convenience helper for debug-level logging. Equivalent to log(formatter, { level: 'debug' }).
// Log at debug level (default format)
.tap(debug())
// Log with custom formatter at debug level
.tap(debug((ex) => `Debug: ${JSON.stringify(ex.body)}`))
.tap(debug((ex) => ({ id: ex.id, bodySize: JSON.stringify(ex.body).length })))
// Use throughout development workflow
craft().from(source).tap(debug((ex) => `Input: ${ex.body}`)).transform(processData).tap(debug((ex) => `Processed: ${ex.body}`)).to(destination)
Use cases: Development debugging, verbose logging during troubleshooting
timer
timer(options?: TimerOptions): Source<undefined>
Trigger routes at regular intervals or specific times. Produces undefined as the message body.
// Simple interval (every second)
.id('ticker')
.from(timer({ intervalMs: 1000 }))
// Limited runs (10 times, then stop)
.id('batch-job')
.from(timer({ intervalMs: 5000, repeatCount: 10 }))
// Start with delay
.id('delayed-start')
.from(timer({ intervalMs: 1000, delayMs: 5000 }))
// Daily at specific time
.id('daily-report')
.from(timer({ exactTime: '09:30:00' }))
// Fixed rate (ignore execution time)
.id('heartbeat')
.from(timer({ intervalMs: 1000, fixedRate: true }))
// Add random jitter to prevent synchronized execution
.id('distributed-task')
.from(timer({ intervalMs: 1000, jitterMs: 200 }))
Options:
| Field | Type | Default | Required | Description |
|---|---|---|---|---|
intervalMs | number | 1000 | No | Time between executions in milliseconds |
delayMs | number | 0 | No | Delay before first execution in milliseconds |
repeatCount | number | Infinity | No | Number of executions before stopping |
fixedRate | boolean | false | No | Execute at exact intervals ignoring processing time |
exactTime | string | -- | No | Execute daily at time of day HH:mm:ss (fires once/day) |
timePattern | string | -- | No | Custom date format for execution times |
jitterMs | number | 0 | No | Random jitter added to each scheduled run |
Headers added: Timer metadata including fired time, counter, period, and next run time
cron
cron(expression: string, options?: CronOptions): Source<undefined>
Trigger routes on a cron schedule with timezone support. Produces undefined as the message body. More expressive than timer() for complex recurring schedules.
Supports standard 5-field cron (minute granularity), extended 6-field (second granularity), and nicknames (@daily, @weekly, @hourly, @monthly, @yearly, @annually, @midnight).
// Every 5 minutes
.id('poller')
.from(cron('*/5 * * * *'))
// Weekdays at 9am Eastern
.id('morning-report')
.from(cron('0 9 * * 1-5', { timezone: 'America/New_York' }))
// Daily at midnight (nickname)
.id('nightly-cleanup')
.from(cron('@daily'))
// Every 30 seconds (6-field)
.id('health-check')
.from(cron('*/30 * * * * *'))
// First day of month, limited to 12 fires
.id('monthly-report')
.from(cron('@monthly', { maxFires: 12, name: 'monthly-report' }))
// With jitter to prevent thundering herd
.id('distributed-poll')
.from(cron('*/5 * * * *', { jitterMs: 5000 }))
// Run only during Q1 2026
.id('q1-campaign')
.from(cron('@daily', { startAt: '2026-01-01', stopAt: '2026-04-01' }))
Options:
| Field | Type | Default | Required | Description |
|---|---|---|---|---|
timezone | string | System local | No | IANA timezone (e.g., "America/New_York", "UTC") |
maxFires | number | Infinity | No | Maximum number of fires before stopping (delegated to croner's maxRuns) |
jitterMs | number | 0 | No | Random delay in milliseconds added to each fire |
name | string | -- | No | Human-readable job name for observability |
protect | boolean | true | No | Prevents overlapping handler execution when the previous run is still in progress |
startAt | Date | string | -- | No | Date or ISO 8601 string at which the cron job should start running |
stopAt | Date | string | -- | No | Date or ISO 8601 string at which the cron job should stop running |
Cron expression format:
| Format | Example | Description |
|---|---|---|
| 5-field | */5 * * * * | minute, hour, day-of-month, month, day-of-week |
| 6-field | */30 * * * * * | second, minute, hour, day-of-month, month, day-of-week |
| Nickname | @daily | Predefined schedule |
Supported nicknames: @yearly / @annually, @monthly, @weekly, @daily / @midnight, @hourly
Headers added: Cron metadata including expression, fired time, counter, next run, timezone, and name (via routecraft.cron.* headers)
event
import { event } from '@routecraft/routecraft'
Produce exchanges from framework events. Use as the source with .from(event(filter)); the exchange body is the event payload.
// Single event
craft().from(event('route:started')).to(log())
// Multiple events
craft().from(event(['route:started', 'route:stopped'])).to(log())
Filter (EventFilter): an event name, an array of names, or a wildcard pattern.
*(single-level) matches exactly one colon-separated segment:route:*matchesroute:startedbut notroute:pay:exchange:started.**(globstar) matches zero or more segments at any depth:route:**matches every route event;route:*:operation:**matches operations at any adapter depth.*on its own matches all events.
Static subscriptions (context:started, route:started, ...) expand wildcards at startup against known event names; hierarchical events (route:<id>:exchange:<phase>) need explicit patterns or ** to match runtime route ids. See the Events reference for the full taxonomy.
direct
// Source (endpoint = route id). Body types are unknown at the adapter
// layer; schemas live on the route builder via `.input()` / `.output()`.
direct(options?: Partial<DirectServerOptions>): Source<unknown>
// Destination (registry-aware: body type resolves from DirectEndpointRegistry when populated)
direct<K extends RegisteredDirectEndpoint>(endpoint: K): Destination<ResolveBody<DirectEndpointRegistry, K>, unknown>
// Destination (names a target route)
direct<T>(endpoint: string | ((exchange: Exchange<T>) => string)): Destination<T, T>
// Destination with explicit input != output (e.g. in-process agent call)
direct<TIn, TOut>(
endpoint: RegisteredDirectEndpoint | ((exchange: Exchange<TIn>) => string),
): Destination<TIn, TOut>
See Type Safety: Registries for how to populate DirectEndpointRegistry.
Enable synchronous inter-route communication. Perfect for composable route architectures where you need request-response patterns. The source form uses the route's .id() as the endpoint name; destinations address the target by id.
Discovery metadata (.title(), .description()) and schemas (.input(), .output()) live on the route builder, not the adapter. The framework validates .input() before the pipeline runs and .output() before the primary destination fires -- any source adapter (direct, mcp, future ones) inherits this validation automatically.
// Producer route that sends to a direct endpoint
craft()
.id('data-producer')
.from(source)
.transform(processData)
.to(direct('processed-data'))
// Consumer route that receives from the endpoint (route id = endpoint)
craft()
.id('processed-data')
.from(direct())
.process(businessLogic)
.to(destination)
// Consumer with framework-enforced validation
craft()
.id('order-processing')
.description('Validate and persist an incoming order')
.input({ body: z.object({ orderId: z.string() }) })
.output({ body: z.object({ status: z.literal('created'), orderId: z.string() }) })
.from(direct())
.process(validateOrder)
.process(saveOrder)
.transform(() => ({ status: 'created', orderId: '12345' }))
// Dynamic endpoint based on message content (destination side)
craft()
.id('dynamic-router')
.from(source)
.to(direct((ex) => `handler-${ex.body.type}`))
// Route messages to different handlers based on priority
craft()
.id('priority-router')
.from(source)
.to(direct((ex) => {
const priority = ex.headers['priority'] || 'normal';
return `processing-${priority}`;
}))
// Consumer routes -- their ids match the dynamic target names
craft()
.id('processing-high')
.from(direct())
.to(urgentProcessor)
craft()
.id('processing-normal')
.from(direct())
.to(standardProcessor)
// Agent-only capability -- no .id() means a UUID endpoint,
// discoverable by agents but not callable from code
craft()
.description('Internal knowledge base lookup')
.input({ body: z.object({ query: z.string() }) })
.from(direct())
.process(fetchSnippets)
// Destination where the callee returns a different body shape than the caller sends.
// Supply two type arguments to express the response shape (e.g. an in-process agent).
craft()
.id('agent-caller')
.from(httpSource)
.transform((body) => ({ name: body.agent, query: body.text }))
.enrich(direct<{ name: string; query: string }, AgentResult>('agent'))
Source options (adapter-specific only):
channelType- Custom direct channel implementation (default: in-memory). Per-route override of the context-level default.
Route-level metadata lives on the builder: .title('...'), .description('...'), .input({ body, headers }), .output({ body, headers }). .input() and .output() also accept a bare Standard Schema as a body-only shorthand.
Key characteristics:
- Synchronous: Calling route waits for response from the consuming route
- Endpoint = route id: The direct source uses the route's
.id()as its endpoint name. Destinations reference the consumer by that id. - Agent-only capabilities: Omit
.id()to register under a UUID the builder generates; agents can still discover the route via the registry, but it cannot be addressed as a string from code. - Framework-enforced validation:
.input()and.output()schemas are validated by the engine, not the adapter. Validation failure emitsexchange:dropped(input) or routes to the error handler (output) withRC5002. - Automatic endpoint name sanitization: URL-unsafe characters in the route id are URL-encoded for collision-free registry keys.
- Dynamic destinations: Destination endpoints can be computed from the exchange; sources always use the route id.
Perfect for:
- Breaking large routes into smaller, composable pieces
- HTTP request-response patterns
- Synchronous business logic orchestration
- Testing individual route segments in isolation
Limitations:
- Not compatible with
batch(): Becausedirect()is synchronous and blocking, each sender waits for the consumer route to fully process the message before the next message can be sent. This prevents the batch consumer from accumulating multiple messages. If you need to batch messages from multiple sources or split branches, use theaggregate()operation instead.
Schema Validation
Direct routes support StandardSchema validation for type safety. Behavior depends on your schema library.
No Schema (Default)
Without a schema, all data passes through unchanged:
craft()
.id('user-processor')
.from(direct()) // No schema -- all data passes through
.process(processUser)
Zod 4 Object Types
Zod 4 uses different object constructors to control extra field handling:
| Constructor | Extra fields | Use case |
|---|---|---|
z.object() | Stripped (default) | Strict contracts, clean data |
z.looseObject() | Preserved | Flexible schemas, passthrough |
z.strictObject() | Error (RC5002) | Reject unexpected fields |
import { z } from 'zod'
// z.object() - strips extra fields (default behavior)
const strictSchema = z.object({
userId: z.string().uuid(),
action: z.enum(['create', 'update', 'delete'])
})
craft()
.id('user-processor')
.input({ body: strictSchema })
.from(direct())
.process(processUser)
// Passes: { userId: '...', action: 'create' }
// Passes: { userId: '...', action: 'create', extra: 'field' }
// Extra fields silently removed from result
// RC5002: { userId: '...', missing: 'action' }
// z.looseObject() - preserves extra fields
const looseSchema = z.looseObject({
userId: z.string().uuid(),
action: z.enum(['create', 'update'])
})
craft()
.id('user-processor')
.input({ body: looseSchema })
.from(direct())
.process(processUser)
// Passes: { userId: '...', action: 'create', extra: 'field' }
// All fields preserved including extra
// z.strictObject() - rejects extra fields with error
const veryStrictSchema = z.strictObject({
userId: z.string().uuid(),
action: z.enum(['create', 'update'])
})
craft()
.id('user-processor')
.input({ body: veryStrictSchema })
.from(direct())
.process(processUser)
// Passes: { userId: '...', action: 'create' }
// RC5002: { userId: '...', action: 'create', extra: 'field' }
Header Validation
Without input.headers, all headers pass through unchanged. When specified, the same Zod 4 rules apply, with one twist: validated header values are always merged over the original request headers, so caller-supplied pass-through keys survive schemas that would normally strip them.
// No header schema - all headers pass through unchanged
craft()
.id('api-handler')
.input({ body: z.object({ id: z.string() }) })
// input.headers not specified - all headers preserved
.from(direct())
.process(handleRequest)
// z.looseObject() - validate required headers, keep extras
craft()
.id('api-handler')
.input({
headers: z.looseObject({
'x-tenant-id': z.string().uuid(),
'x-trace-id': z.string().optional(),
}),
})
.from(direct())
.process(handleRequest)
// Passes: { 'x-tenant-id': '...', 'x-other': '...' } (validates x-tenant-id, keeps x-other)
// z.object() - validate declared headers; merge preserves pass-through keys
craft()
.id('api-handler')
.input({
headers: z.object({
'x-tenant-id': z.string().uuid(),
}),
})
.from(direct())
.process(handleRequest)
// Passes: { 'x-tenant-id': '...', 'x-other': '...' } (x-other preserved via merge)
Schema Coercion
Validated values are used (schemas can transform data):
const schema = z.object({
userId: z.string(),
createdAt: z.coerce.date() // Transforms string to Date
})
craft()
.id('processor')
.input({ body: schema })
.from(direct())
.process((data) => {
// data.createdAt is Date, not string
console.log(data.createdAt.getFullYear())
})
Validation occurs on consumer side only. Producers send data unchanged; consumers validate on receive.
Route Registry
Each direct route registers in ADAPTER_DIRECT_REGISTRY so in-process agents can discover and document the routes available in the current context:
import { ADAPTER_DIRECT_REGISTRY } from '@routecraft/routecraft'
craft()
.id('fetch-content')
.title('Fetch content')
.description('Fetch and summarize web content from URL')
.input({ body: z.object({ url: z.string().url() }) })
.output({ body: z.object({ summary: z.string() }) })
.from(direct())
.process(fetchAndSummarize)
// Later, query registered routes from context
const ctx = await new ContextBuilder().routes(...).build()
await ctx.start()
const registry = ctx.getStore(ADAPTER_DIRECT_REGISTRY)
const routes = registry ? Array.from(registry.values()) : []
// [{ endpoint, title?, description?, input?, output? }]
The direct registry stores only the direct adapter's own metadata. Other adapters that expose routes externally (such as mcp() or a future inbound http()) maintain their own parallel registries; they are never written to or read from the direct registry.
http
http<T, R>(options: HttpOptions<T>): Destination<T, HttpResult<R>>
Make HTTP requests. Returns a Destination adapter that works with both .to() and .enrich().
Current support: Routecraft currently exports http() only as an outbound/client adapter for making HTTP requests.
Planned inbound support: Routecraft does not yet ship an inbound HTTP source/server adapter. The planned design is shown in Planned inbound/server HTTP support below and may change before implementation.
With .enrich() (merge result into body):
// Static GET request - result merged into body
.enrich(http({
method: 'GET',
url: 'https://api.example.com/users'
}))
// Dynamic URL based on exchange data
.enrich(http({
method: 'GET',
url: (exchange) => `https://api.example.com/users/${exchange.body.userId}`
}))
// Custom aggregator to control merge behavior
.enrich(
http({ url: 'https://api.example.com/profile' }),
(original, result) => ({
...original,
body: { ...original.body, profileData: result.body }
})
)
With .to() (side-effect or body replacement):
.to(http(...)) always invokes the http() adapter. When the adapter returns an HttpResult, .to() replaces the exchange body with that result. The first example below is a fire-and-forget pattern in intent only (the code does not read the response), but at runtime the body is still replaced by the HttpResult. To merge or preserve the original exchange body, use .enrich() with an aggregator instead of .to(http(...)).
// Fire-and-forget intent (code does not read the response); body is still replaced by HttpResult at runtime
.to(http({
method: 'POST',
url: 'https://api.example.com/webhook',
body: (exchange) => exchange.body
}))
// http() returns HttpResult; .to() replaces exchange body with it
.to(http({
method: 'GET',
url: 'https://api.example.com/transform'
}))
// Body is now the HttpResult (status, headers, body). Use .enrich() with an aggregator to merge or preserve the original body.
// With query parameters
.enrich(http({
url: 'https://api.example.com/search',
query: (exchange) => ({ q: exchange.body.searchTerm, limit: 10 })
}))
Options:
| Field | Type | Default | Required | Description |
|---|---|---|---|---|
method | HttpMethod | 'GET' | No | HTTP method to use |
url | string | (exchange) => string | -- | Yes | Target URL (string or derived from exchange) |
headers | Record<string,string> | (exchange) => Record<string,string> | {} | No | Request headers |
query | `Record<string,string | number | boolean> | (exchange) => Query` | {} |
body | unknown | (exchange) => unknown | -- | No | Request body (JSON serialized when not string/binary) |
throwOnHttpError | boolean | true | No | Throw when response is non-2xx |
timeoutMs | number | -- | No | Request timeout in milliseconds |
Returns: HttpResult object with status, headers, body, and url
Planned inbound/server HTTP support planned
Tentative source signature: http({ path, method, ...options }).
// Simple webhook endpoint
.id('webhook-receiver')
.from(http({ path: '/webhook', method: 'POST' }))
// Multiple methods on same path
.id('data-api')
.from(http({ path: '/api/data', method: ['GET', 'POST', 'PUT'] }))
| Option | Type | Default | Required | Description |
|---|---|---|---|---|
path | string | '/' | No | URL path to mount |
method | HttpMethod | HttpMethod[] | 'POST' | No | Accepted HTTP methods |
Exchange body: { method, url, headers, body, query, params }. The final exchange becomes the HTTP response; no explicit .to() step is required.
Response behavior:
- The final exchange is returned to the HTTP client. If the final body is an object with optional fields
{ status?: number, headers?: Record<string,string>, body?: unknown }, those fields are used to build the response. - If
statusorheadersare not provided, Routecraft returns the body with200status and no additional headers. - For serialization and setting
Content-Type, use a formatting step in your capability (e.g., a.transform(...)that sets appropriate headers).
Test adapters
noop
noop<T>(): NoopAdapter<T>
A no-operation adapter that discards messages. Useful for testing, development, or conditional routing.
// Conditional destination based on environment
.to(process.env.NODE_ENV === 'production' ? realDestination() : noop())
// Testing placeholder
.to(noop()) // Messages are discarded but logged
spy
spy<T>(): SpyAdapter<T>
Records all exchanges passing through it. Use as a destination, processor, or enricher to capture and assert on pipeline output.
import { spy } from '@routecraft/routecraft'
const spyAdapter = spy()
const route = craft()
.id('my-route')
.from(simple('payload'))
.to(spyAdapter)
const t = await testContext().routes(route).build()
await t.test()
expect(spyAdapter.received).toHaveLength(1)
expect(spyAdapter.received[0].body).toBe('payload')
expect(spyAdapter.calls.send).toBe(1)
Properties:
| Field | Type | Default | Required | Description |
|---|---|---|---|---|
received | Exchange[] | [] | No | All exchanges recorded |
calls.send | number | 0 | No | Number of times used as destination |
calls.process | number | 0 | No | Number of times used as processor |
calls.enrich | number | 0 | No | Number of times used as enricher |
Methods:
| Method | Returns | Description |
|---|---|---|
reset() | void | Clear all recorded data |
lastReceived() | Exchange | Most recent exchange |
receivedBodies() | unknown[] | Array of just the body values |
See Testing for full usage patterns.
pseudo
pseudo<Opts>(name?: string, options?: PseudoOptions): PseudoFactory<Opts>
pseudo<Opts>(name: string, options: PseudoKeyedOptions): PseudoKeyedFactory<Opts>
Create a typed placeholder adapter that satisfies the DSL at compile time but throws at runtime (or no-ops when runtime: "noop"). Use it to write example routes and documentation that compile without real adapter implementations; later, swap in the real adapter by changing only the import.
The returned factory can be used in .from(), .to(), .enrich(), .tap(), and .process(). Specify the result type with a generic on the call so the route body type flows correctly:
import { craft, timer, log, pseudo } from "@routecraft/routecraft";
// Option types (move to real adapter package later)
interface McpCallOptions {
server: string;
tool: string;
args?: Record<string, unknown>;
}
interface GmailListResult {
messages: { id: string; subject?: string }[];
nextPageToken?: string;
}
const mcp = pseudo<McpCallOptions>("mcp");
// Object-only call: mcp<Result>(options)
craft()
.from(timer({ intervalMs: 60_000 }))
.enrich(
mcp<GmailListResult>({
server: "gmail",
tool: "messages.list",
args: { query: "is:unread" },
}),
)
.split((r) => r.messages)
.tap(log());
Keyed (string-first) signature: use args: "keyed" when the real adapter takes a key then options (e.g. queue name, table name):
const queue = pseudo<{ ttl?: number }>("queue", { args: "keyed" });
craft()
.from(source)
.to(queue<void>("outbound", { ttl: 5000 }));
Options:
| Field | Type | Default | Description |
|---|---|---|---|
runtime | "throw" or "noop" | "throw" | "throw" (default): throw with adapter name when executed. "noop": resolve without error (for tests). |
args | "keyed" | -- | Set to "keyed" to get a factory (key: string, opts?) => PseudoAdapter<R>. |
Replacing with a real adapter: keep the same call shape; only the import changes:
// Before (pseudo)
import { pseudo } from "@routecraft/routecraft";
const mcp = pseudo<McpCallOptions>("mcp");
// After (real adapter)
import { mcp } from "@routecraft/mcp-adapter";
// mcp<GmailListResult>({ server, tool, args }) still works
Exported types: PseudoAdapter<R>, PseudoFactory<Opts>, PseudoKeyedFactory<Opts>, PseudoOptions, PseudoKeyedOptions
Parse error handling
Source adapters that convert raw bytes into a structured body (json, html, csv, jsonl, mail) accept a uniform onParseError option that controls what happens when parsing fails (malformed JSON, structurally-invalid CSV row, broken MIME, etc.). The default is 'fail'.
All three modes are observable on the events bus -- parse failures are never silent.
| Value | Lifecycle events | Use case |
|---|---|---|
'fail' (default) | exchange:started -> exchange:failed (or error:caught if .error() recovers) with error.rc === 'RC5016'. Streaming adapters continue to the next item. | Per-item observability with stream continuation. |
'abort' | exchange:started -> exchange:failed for the bad item, then the source rejects and context:error fires. | Atomic-load semantics where partial data is unacceptable. |
'drop' | exchange:started -> exchange:dropped with reason: 'parse-failed'. No .error() invocation. Streaming adapters continue. | Lossy upstreams (scraping, public feeds) where malformed items are expected but should still be counted. |
// Default: route per-line parse errors through .error(), keep streaming.
craft()
.from(jsonl({ path: './events.jsonl', chunked: true }))
.error((err, exchange) => {
log.warn({ err, line: exchange.headers['routecraft.jsonl.line'] }, 'bad line');
return null;
})
.filter((e) => e.body != null)
.to(db());
// Stop the stream on the first malformed row (atomic-import semantics).
craft().from(csv({ path: './daily.csv', chunked: true, onParseError: 'abort' })).to(load());
// Drop unparseable mail with structured event observability.
craft().from(mail('INBOX', { onParseError: 'drop' })).to(process());
// Subscribe to parse drops across all routes:
ctx.on('route:*:exchange:dropped', ({ details }) => {
if (details.reason === 'parse-failed') metrics.increment('source.parse.dropped');
});
Internally, all three modes defer parsing to a synthetic first pipeline step injected by the runtime, so exchange:started fires before parsing runs. The synthetic step decides per-mode whether to throw ('fail'/'abort') or emit exchange:dropped ('drop').
File adapters
file
file(options: FileOptions & { chunked: true }): Source<string>
file(options: FileOptions): FileAdapter // Source<string> & Destination<unknown, void>
Read and write plain text files. For structured data, use json or csv adapters.
Source mode (reads files):
// Read file once
.from(file({ path: './input.txt' }))
// Custom encoding
.from(file({ path: './data.txt', encoding: 'latin1' }))
Destination mode (writes files):
// Write to file (overwrite)
.to(file({ path: './output.txt', mode: 'write' }))
// Append to file
.to(file({ path: './log.txt', mode: 'append' }))
// Dynamic file paths with directory creation
.to(file({
path: (exchange) => `./data/${exchange.body.date}.txt`,
mode: 'write',
createDirs: true
}))
Options:
| Option | Type | Default | Description |
|---|---|---|---|
path | string | (exchange) => string | Required | File path (static or dynamic function) |
mode | 'read' | 'write' | 'append' | 'read' for source, 'write' for destination | File operation mode |
encoding | BufferEncoding | 'utf-8' | Text encoding |
createDirs | boolean | false | Create parent directories (destination only) |
chunked | boolean | false | Emit one exchange per line instead of entire file (source only) |
Chunked mode: When chunked: true, the file source emits one exchange per line. Each exchange includes FILE_LINE (1-based line number) and FILE_PATH headers. When chunked, the adapter returns Source only (no Destination).
// Per-line emission
.from(file({ path: './big.txt', chunked: true }))
Exported types: FileAdapter, FileOptions
json
json(options?: JsonOptions): JsonAdapter | JsonFileAdapter
Parse and format JSON data, or read/write JSON files.
Transformer mode (in-memory JSON parsing):
// Parse JSON string from body
.transform(json())
// Extract nested data using dot notation
.transform(json({ path: 'data.items' }))
// Custom parsing with getValue
.transform(json({
from: (b) => b.rawJson,
getValue: (parsed) => parsed as User[]
}))
// Write to custom field
.transform(json({
to: (body, result) => ({ ...body, parsed: result })
}))
Source mode (read JSON files):
// Read and parse JSON file
.from(json({ path: './data.json' }))
// With custom reviver
.from(json({
path: './data.json',
reviver: (key, value) => {
if (key === 'date') return new Date(value);
return value;
}
}))
Destination mode (write JSON files):
// Write with formatting
.to(json({
path: './output.json',
indent: 2
}))
// Dynamic paths with directory creation
.to(json({
path: (exchange) => `./exports/${exchange.body.id}.json`,
createDirs: true
}))
// With custom replacer
.to(json({
path: './filtered.json',
replacer: (key, value) => {
if (key.startsWith('_')) return undefined;
return value;
}
}))
Transformer Options (when no path provided):
| Option | Type | Default | Description |
|---|---|---|---|
path | string | -- | Dot-notation path to extract (e.g., "data.items[0]") |
from | (body) => string | Uses body or body.body | Extract JSON string from exchange |
getValue | (parsed) => V | -- | Transform parsed value |
to | (body, result) => R | Replaces body | Where to put result |
File Options (when path is a file path):
| Option | Type | Default | Description |
|---|---|---|---|
path | string | (exchange) => string | Required | File path (static or dynamic) |
mode | 'read' | 'write' | 'append' | 'read' for source, 'write' for destination | File operation mode |
encoding | BufferEncoding | 'utf-8' | Text encoding |
createDirs | boolean | false | Create parent directories (destination only) |
indent / space | number | 0 | JSON formatting spaces (destination only) |
reviver | (key, value) => unknown | -- | JSON.parse reviver (source only) |
replacer | (key, value) => unknown | -- | JSON.stringify replacer (destination only) |
onParseError | 'fail' | 'abort' | 'drop' | 'fail' | How to handle a parse failure (source only). See parse error handling. |
Exported types: JsonAdapter, JsonFileAdapter, JsonOptions, JsonTransformerOptions, JsonFileOptions
csv
csv(options: CsvOptions & { chunked: true }): Source<CsvRow>
csv(options: CsvOptions): CsvAdapter // Source<CsvData> & Destination<unknown, void>
Read and write CSV files with automatic parsing/formatting. Requires papaparse as a peer dependency.
npm install papaparse
Source mode (read CSV files):
// Read CSV with headers
.from(csv({ path: './data.csv', header: true }))
// Emits array of objects: [{ name: 'Alice', age: '30' }, ...]
// Read CSV without headers
.from(csv({ path: './data.csv', header: false }))
// Emits array of arrays: [['Alice', '30'], ['Bob', '25'], ...]
// Custom delimiter and encoding
.from(csv({
path: './data.csv',
delimiter: ';',
encoding: 'latin1',
header: true
}))
Destination mode (write CSV files):
// Write array of objects to CSV
.to(csv({
path: './output.csv',
header: true
}))
// Automatically includes headers from object keys
// Write to tab-separated file
.to(csv({
path: './data.tsv',
delimiter: '\t',
header: true
}))
// Dynamic paths with directory creation
.to(csv({
path: (exchange) => `./reports/${exchange.body.reportDate}.csv`,
createDirs: true,
header: true
}))
// Append to existing CSV (skips header if file exists)
.to(csv({
path: './log.csv',
mode: 'append',
header: true
}))
Options:
| Option | Type | Default | Description |
|---|---|---|---|
path | string | (exchange) => string | Required | File path (static or dynamic) |
header | boolean | true | Use first row as headers (source), include headers (destination) |
delimiter | string | ',' | Field separator |
quoteChar | string | '"' | Quote character |
skipEmptyLines | boolean | true | Skip empty lines during parsing |
encoding | BufferEncoding | 'utf-8' | Text encoding |
mode | 'write' | 'append' | 'write' | File operation mode (destination only) |
createDirs | boolean | false | Create parent directories (destination only) |
chunked | boolean | false | Emit one exchange per row instead of entire array (source only) |
onParseError | 'fail' | 'abort' | 'drop' | 'fail' | How to handle a row parse failure (source only). See parse error handling. |
Behavior:
- Source (default): Emits entire CSV as array of records (objects if
header: true, arrays ifheader: false) - Source (
chunked: true): Emits one exchange per row withCSV_ROW(1-based row number) andCSV_PATHheaders. ReturnsSourceonly (noDestination). WithonParseError: 'fail'(default) malformed rows are routed through the route's.error()handler and the stream continues;'abort'reverts to fail-fast on the first bad row;'drop'emitsexchange:droppedwithreason: 'parse-failed'. - Destination: Writes exchange body (array of objects/arrays) as CSV. For
mode: 'append', skips header row if file exists
// Per-row emission
.from(csv({ path: './big.csv', chunked: true }))
Peer dependency: Requires papaparse to be installed separately.
Exported types: CsvAdapter, CsvOptions, CsvRow, CsvData
jsonl
jsonl<T>(options: JsonlSourceOptions & { chunked: true }): Source<T>
jsonl<T>(options: JsonlCombinedOptions): Source<T[]> & Destination<unknown, void>
jsonl(options: JsonlDestinationOptions): Destination<unknown, void>
Read and write JSON Lines files (one JSON object per line).
Source mode (read JSONL files):
// Read all lines as array
.from(jsonl({ path: './events.jsonl' }))
// Emits: [{ type: 'click', ts: 1 }, { type: 'view', ts: 2 }, ...]
// Per-line emission (chunked)
.from(jsonl({ path: './events.jsonl', chunked: true }))
// Emits one exchange per line with JSONL_LINE and JSONL_PATH headers
// Custom reviver
.from(jsonl({
path: './data.jsonl',
reviver: (key, value) => key === 'date' ? new Date(value) : value
}))
Destination mode (write JSONL files):
// Append to JSONL file (default)
.to(jsonl({ path: './output.jsonl' }))
// Overwrite file
.to(jsonl({ path: './output.jsonl', mode: 'write' }))
// Dynamic path with directory creation
.to(jsonl({
path: (exchange) => `./logs/${exchange.body.date}.jsonl`,
createDirs: true
}))
// Custom replacer (omit sensitive fields)
.to(jsonl({
path: './output.jsonl',
replacer: (key, value) => key === 'secret' ? undefined : value
}))
Source options (JsonlSourceOptions):
| Option | Type | Default | Description |
|---|---|---|---|
path | string | Required | File path to the JSONL file |
encoding | BufferEncoding | 'utf-8' | Text encoding |
chunked | boolean | false | Emit one exchange per line instead of a single array |
reviver | (key, value) => unknown | - | Reviver function passed to JSON.parse |
onParseError | 'fail' | 'abort' | 'drop' | 'fail' | How to handle a line parse failure. See parse error handling. |
Destination options (JsonlDestinationOptions):
| Option | Type | Default | Description |
|---|---|---|---|
path | string | (exchange) => string | Required | File path (static or dynamic) |
encoding | BufferEncoding | 'utf-8' | Text encoding |
mode | 'write' | 'append' | 'append' | File operation mode |
createDirs | boolean | false | Create parent directories |
replacer | ((key, value) => unknown) | Array<string | number> | null | - | Replacer passed to JSON.stringify |
Behavior:
- Source (default): Reads file, splits lines, parses each as JSON, emits
T[]array. Empty lines are skipped. - Source (
chunked: true): Emits oneTexchange per line withJSONL_LINE(1-based) andJSONL_PATHheaders. ReturnsSourceonly (noDestination). WithonParseError: 'fail'(default) malformed lines are routed through the route's.error()handler and the stream continues;'abort'aborts on the first bad line;'drop'emitsexchange:droppedwithreason: 'parse-failed'. - Destination: Stringifies body to
JSON.stringify(body) + '\n'. Array bodies write one line per element. Default mode is append.
Chunked headers:
| Header | Type | Description |
|---|---|---|
JSONL_LINE | number | 1-based line number in the source file |
JSONL_PATH | string | Path of the source file |
Exported types: JsonlSourceOptions, JsonlDestinationOptions, JsonlCombinedOptions, JsonlOptions
html
html(options: HtmlOptions): HtmlAdapter
Extract data from HTML using CSS selectors (powered by cheerio), or read/write HTML files.
Transformer mode (in-memory HTML parsing):
// Extract text from title
.transform(html({ selector: 'title', extract: 'text' }))
// Extract multiple elements (returns array)
.transform(html({ selector: 'h2', extract: 'text' }))
// Result: ['First Heading', 'Second Heading', ...]
// Extract HTML content
.transform(html({ selector: '.content', extract: 'html' }))
// Extract attribute value
.transform(html({ selector: 'a', extract: 'attr', attr: 'href' }))
// Extract outer HTML (including element tag)
.transform(html({ selector: 'article', extract: 'outerHtml' }))
// Custom parsing from sub-field
.transform(html({
selector: 'p',
extract: 'text',
from: (body) => body.htmlContent,
to: (body, result) => ({ ...body, paragraphs: result })
}))
Source mode (read HTML files and extract):
// Read HTML file and extract title
.from(html({
path: './page.html',
selector: 'title',
extract: 'text'
}))
// Extract multiple links from file
.from(html({
path: './page.html',
selector: 'a',
extract: 'attr',
attr: 'href'
}))
// Emits array: ['https://example.com', '/about', ...]
Destination mode (write HTML files):
// Write HTML string to file
.to(html({ path: './output.html' }))
// Dynamic paths with directory creation
.to(html({
path: (exchange) => `./pages/${exchange.body.slug}.html`,
createDirs: true
}))
// Append to HTML file
.to(html({
path: './log.html',
mode: 'append'
}))
Transformer Options (when no path provided):
| Option | Type | Default | Description |
|---|---|---|---|
selector | string | Required | CSS selector to match elements |
extract | 'text' | 'html' | 'attr' | 'outerHtml' | 'innerText' | 'textContent' | 'text' | What to extract from matched elements |
attr | string | -- | Attribute name (required when extract: 'attr') |
from | (body) => string | Uses body or body.body | Extract HTML string from exchange |
to | (body, result) => R | Replaces body | Where to put extracted result |
File Options (when path is provided):
All transformer options above, plus:
| Option | Type | Default | Description |
|---|---|---|---|
path | string | (exchange) => string | Required | File path (static or dynamic) |
mode | 'read' | 'write' | 'append' | 'read' for source, 'write' for destination | File operation mode |
encoding | BufferEncoding | 'utf-8' | Text encoding |
createDirs | boolean | false | Create parent directories (destination only) |
onParseError | 'fail' | 'abort' | 'drop' | 'fail' | How to handle an extraction failure (source only). See parse error handling. |
Extract types:
text/innerText/textContent: Plain text content (strips HTML tags, removes<style>and<script>)html: Inner HTML contentouterHtml: Element including its tagattr: Attribute value (requiresattroption)
Behavior:
- Single match: Returns string
- Multiple matches: Returns array of strings
- No matches: Returns empty string
- Source mode: Reads HTML file and extracts data using selector
- Destination mode: Writes HTML string (from
exchange.bodyorexchange.body.body) to file
Exported types: HtmlAdapter, HtmlOptions, HtmlResult
Messaging adapters
mail(folder: string, options: Partial<MailServerOptions>): Source<MailMessage>
mail(folder: string): Destination<unknown, MailFetchResult>
mail(options: Partial<MailServerOptions>): Destination<unknown, MailFetchResult>
mail(action: MailAction): Destination<unknown, void>
mail(options?: Partial<MailClientOptions>): Destination<MailSendPayload, MailSendResult>
Read email via IMAP, send via SMTP, or perform IMAP operations. The adapter has four modes determined by the arguments you pass.
Source mode (IMAP push): Pass a folder and options to receive new messages via IMAP IDLE or polling. Each new email becomes a separate exchange.
craft()
.id('inbox-watcher')
.from(mail('INBOX', { markSeen: true }))
.to(log())
Source delivery modes: the source runs in one of two modes.
- IDLE (default): the server pushes notifications when new mail arrives. The
\Seenflag is the cross-cycle dedupe state, so each message is delivered exactly once per subscription. IDLE is the right default for "process each new email once" workloads. If the IMAP connection drops mid-subscription the source reconnects automatically with exponential backoff; auth failures stop the subscription immediately. - Poll (opt-in): set
pollIntervalMsto fetch on a cadence instead of IDLE. Required whenever you opt out of the\Seendedupe model (markSeen: falseorunseen: false), for example to re-evaluate the inbox on every cycle and rely on a folder move as the done-signal. IDLE has no cycle boundary, so combining it with those overrides would refetch the entire folder on every inbound message; the source throwsRC5003at startup to prevent this footgun.
// Re-evaluate the inbox every minute; archive a message to mark it done.
// If you later extend `matchesCriteria`, previously-unmatched mail that is
// still in INBOX is picked up on the next cycle.
craft()
.id('inbox-processor')
.from(mail('INBOX', {
pollIntervalMs: 60_000,
markSeen: false,
unseen: false,
}))
.filter(matchesCriteria)
.process(processMessage)
.to(mail({ action: 'move', folder: 'Archive' }))
The \Seen flag is written per-message after the handler resolves successfully, so a downstream failure leaves the message un-Seen and it is retried on the next cycle. limit combined with IDLE is a latency trap (backlog beyond the limit only drains when new mail arrives) and emits a warning at subscribe time.
Fetch destination (IMAP pull): Pass a folder string or server options to fetch messages. Use with .enrich() to pull mail on demand.
craft()
.id('check-inbox')
.from(cron('0 */5 * * * *'))
.enrich(mail('INBOX'))
.to(log())
Send destination (SMTP): Call with no arguments or client options to send email. The exchange body must be a MailSendPayload.
craft()
.id('outbound')
.from(direct())
.to(mail())
Combined read and send:
// Forward unread mail to a different address
craft()
.id('mail-forwarder')
.from(mail('INBOX', { unseen: true, markSeen: true }))
.transform((msg) => ({
to: 'team@example.com',
subject: `Fwd: ${msg.subject}`,
text: msg.body.text ?? '',
}))
.to(mail())
IMAP operations: Call with a MailAction object to move, copy, delete, flag, unflag, or append messages.
// Archive after processing
craft()
.id('archive-processed')
.from(mail('INBOX', { unseen: true }))
.tap(processMessage)
.to(mail({ action: 'move', folder: 'Archive' }))
// Flag important messages
craft()
.id('flag-important')
.from(mail('INBOX', { subject: 'URGENT' }))
.to(mail({ action: 'flag', flags: '\\Flagged' }))
Configuration via named accounts:
Mail connection details are set once in your craft.config.ts so individual routes do not need to repeat them. Each capability file re-exports the config:
// craft.config.ts
import type { CraftConfig } from '@routecraft/routecraft'
export const craftConfig: CraftConfig = {
mail: {
accounts: {
default: {
imap: {
host: 'imap.gmail.com',
auth: { user: process.env.MAIL_USER!, pass: process.env.MAIL_APP_PASSWORD! },
},
smtp: {
host: 'smtp.gmail.com',
auth: { user: process.env.MAIL_USER!, pass: process.env.MAIL_APP_PASSWORD! },
from: process.env.MAIL_USER!,
},
},
},
},
}
// capabilities/inbox-watcher.ts
export { craftConfig } from '../craft.config'
import { craft, mail, log } from '@routecraft/routecraft'
export default craft()
.id('inbox-watcher')
.from(mail('INBOX', { markSeen: true }))
.to(log())
When multiple accounts are configured, select one per adapter call with the account option:
.from(mail('INBOX', { account: 'support' }))
.to(mail({ account: 'notifications' }))
Server options (MailServerOptions):
| Option | Type | Default | Description |
|---|---|---|---|
host | string | IMAP host (e.g. 'imap.gmail.com') | |
port | number | 993 | IMAP port |
secure | boolean | true | Use TLS |
auth | MailAuth | { user, pass } credentials | |
folder | string | 'INBOX' | IMAP mailbox folder |
markSeen | boolean | true | Mark fetched messages as seen |
since | Date | Only fetch messages since this date | |
unseen | boolean | true | Only fetch unseen messages |
from | string | string[] | Filter by sender (IMAP FROM search). Array = OR | |
to | string | string[] | Filter by recipient (IMAP TO search). Array = OR | |
subject | string | string[] | Filter by subject text (IMAP SUBJECT search). Array = OR | |
body | string | string[] | Filter by body text (IMAP TEXT search). Array = OR | |
header | Record<string, string | string[]> | Filter by arbitrary IMAP headers. Array values = OR | |
includeHeaders | true | string[] | Raw headers to include on fetched messages. true = all | |
verify | 'off' | 'headers' | 'strict' | 'headers' | Sender analysis. 'headers' reads Authentication-Results/ARC/List-Id the receiving server wrote (no network). 'strict' additionally runs cryptographic verification via optional mailauth (DNS lookups). 'off' skips analysis. |
limit | number | Maximum messages per fetch | |
pollIntervalMs | number | Poll interval in ms (default: IMAP IDLE) | |
account | string | Named account from context config (uses default if omitted) | |
onParseError | 'fail' | 'abort' | 'drop' | 'fail' | How to handle a per-message MIME parse failure. See parse error handling. All three modes mark the malformed message Seen so it does not refetch forever. 'fail' routes the failure through the route's .error() handler (or exchange:failed if no handler is set). 'drop' does NOT invoke .error(); it emits exchange:dropped with reason: 'parse-failed' so subscribers can count parse drops as a structured event without scraping logs. Pre-#187 behaviour was equivalent to a silent 'drop' (logged at debug, no event); set onParseError: 'drop' to keep lossy-ingest semantics with structured observability. |
Client options (MailClientOptions):
| Option | Type | Default | Description |
|---|---|---|---|
host | string | SMTP host (e.g. 'smtp.gmail.com') | |
port | number | 465 | SMTP port |
secure | boolean | true | Use TLS |
auth | MailAuth | { user, pass } credentials | |
from | string | Default sender address | |
replyTo | string | Default reply-to address | |
cc | string | string[] | Default CC recipients | |
bcc | string | string[] | Default BCC recipients | |
account | string | Named account from context config (uses default if omitted) |
MailMessage (exchange body in source/fetch modes):
| Field | Type | Description |
|---|---|---|
uid | number | IMAP UID |
messageId | string | Message-ID header |
from | string | Literal From: header. For mailing-list forwards this is the rewritten list address; use sender.address for the real sender. |
to | string | string[] | Recipient address(es) |
subject | string | Subject line |
date | Date | Date sent |
body | { text?: string; html?: string } | Message body. Both, either, or neither may be populated depending on what the sender composed (multipart/alternative vs single-part). |
cc | string[]? | CC recipients |
bcc | string[]? | BCC recipients |
replyTo | string? | Reply-to address |
attachments | MailAttachment[]? | File attachments |
rawHeaders | Record<string, string | string[]>? | Raw email headers (when includeHeaders is set) |
flags | Set<string> | IMAP flags (e.g. \Seen, \Flagged) |
folder | string | The IMAP folder this message was fetched from |
sender | MailSender? | Computed effective sender and forward chain (see below). Omitted when verify: 'off'. |
MailSender (on MailMessage.sender):
Resolves the real sender of mailing-list and auto-forwarded messages, so apps can gate on origin without re-parsing headers. For a Google Groups forward, sender.address is the original sender and from is the rewritten list address.
| Field | Type | Description |
|---|---|---|
address | string | Effective sender address, after unwinding list / auto-forward rewrites. |
name | string? | Display name, when present. |
domain | string | Domain portion of address. |
forwardType | 'direct' | 'auto-forward' | 'mailing-list' | How the message reached the recipient. |
forwardChain | ForwardHop[] | Hops between original sender and final recipient, nearest hop first. Empty for direct mail. |
trust | 'verified' | 'unverified' | 'failed' | Trust state. Direct mail is verified when dmarc=pass; forwarded mail is verified when ARC cv=pass. |
reason | string | Machine-readable slug (e.g. 'list-forward-arc-verified', 'direct-dmarc-aligned'). |
authentication | { dkim, spf, dmarc, arc } | Per-method verdicts (pass / fail / neutral / none; ARC is pass / fail / none). |
headerFrom | EmailAddress? | Literal From: header, only set when it differs from the effective sender. |
Filter on the effective sender:
craft()
.from(mail('INBOX'))
.filter((ex) => {
const s = ex.body.sender;
if (s?.address === 'alice@allowed.com' && s.trust === 'verified') {
return true;
}
return { reason: s?.reason ?? 'no sender info' };
})
.to(log())
MailSendPayload (exchange body for .to(mail())):
| Field | Type | Description |
|---|---|---|
to | string | string[] | Recipient address(es) |
subject | string | Subject line |
text | string? | Plain text body |
html | string? | HTML body |
cc | string | string[]? | CC recipients |
bcc | string | string[]? | BCC recipients |
from | string? | Sender (overrides option-level from) |
replyTo | string? | Reply-to (overrides option-level replyTo) |
attachments | Array<{ filename, content, contentType? }>? | File attachments |
MailSendResult:
| Field | Type | Description |
|---|---|---|
messageId | string | Message-ID of the sent email |
accepted | string[] | Accepted recipient addresses |
rejected | string[] | Rejected recipient addresses |
response | string | SMTP server response string |
Exported types: MailAuth, MailServerOptions, MailClientOptions, MailOptions, MailMessage, MailAttachment, MailSendPayload, MailSendResult, MailFetchResult, MailContextConfig, MailAccountConfig, MailAction, MailSender, EmailAddress, ForwardHop, ForwardType, TrustLevel, MailClientManager, MAIL_CLIENT_MANAGER. Helpers: analyzeHeaders, parseAuthResults.
Browser adapters
agentBrowser
import { agentBrowser } from '@routecraft/browser'
Automate a browser session using the agent-browser library. Each exchange gets an isolated session (derived from exchange.id), so split()/aggregate() flows work correctly. Use with .to(), .enrich(), or .tap(). Requires agent-browser as a peer dependency.
Navigate and take a snapshot:
import { agentBrowser } from '@routecraft/browser'
craft()
.id('scrape-page')
.from(simple({ url: 'https://example.com' }))
.to(agentBrowser('open', { url: (ex) => ex.body.url }))
.enrich(agentBrowser('snapshot', { json: true }))
.to(log())
// Result merged into body: { stdout: '...', parsed: { snapshot: '...', refs: {...} }, exitCode: 0 }
Click an element and get text:
craft()
.id('click-and-read')
.from(source)
.to(agentBrowser('click', { selector: '#submit-btn' }))
.enrich(agentBrowser('get', { info: 'text', selector: '.result' }))
.to(log())
Dynamic URL from exchange body:
craft()
.id('dynamic-browse')
.from(simple({ link: 'https://example.com/page' }))
.enrich(agentBrowser('open', { url: (ex) => ex.body.link }))
.enrich(agentBrowser('snapshot'))
.to(log())
Close the session explicitly:
.to(agentBrowser('close'))
Commands:
| Command | Required Options | Description |
|---|---|---|
open | url | Navigate to a URL |
click | selector | Click an element (optional newTab) |
dblclick | selector | Double-click an element |
fill | selector, value | Clear and fill a form field |
type | selector, value | Type text into a focused element |
press | key | Press a keyboard key |
hover | selector | Hover over an element |
focus | selector | Focus an element |
select | selector, value | Select a dropdown option |
check | selector | Check a checkbox |
uncheck | selector | Uncheck a checkbox |
scroll | direction | Scroll the page (up, down, left, right; optional pixels) |
snapshot | Take an accessibility snapshot (optional interactive) | |
screenshot | Take a screenshot (optional path, full, annotate) | |
eval | js | Evaluate JavaScript in the page |
get | info | Get page info: text, html, value, title, url, count, attr, box, styles (optional selector, attr) |
wait | Wait for a selector or timeout (optional selector, ms) | |
close | Close the browser session | |
back | Navigate back | |
forward | Navigate forward | |
reload | Reload the page | |
tab | Manage tabs (optional action: new, close, list; index; url) |
Command-specific option values that accept Resolvable<T, V> can be a static value or a function (exchange) => value for dynamic resolution.
Base options (available on every command):
| Option | Type | Default | Description |
|---|---|---|---|
session | string | (exchange) => string | exchange.id | Override auto-session derived from exchange ID |
headed | boolean | false | Run browser in headed mode (show window) |
json | boolean | false | Parse command output into result.parsed |
args | string[] | Extra CLI flags (ignored in library mode) |
Result shape (AgentBrowserResult):
| Field | Type | Description |
|---|---|---|
stdout | string | Text output from the command |
parsed | unknown | Parsed JSON output (only when json: true) |
exitCode | number | 0 for success, 1 for failure |
AI adapters
mcp
import { mcp } from '@routecraft/ai'
Expose capabilities as MCP tools or call remote MCP servers. Requires mcpPlugin() in your context plugins when used as a source.
Source mode -- define a discoverable MCP tool:
The tool name is the route id; the tool's title, description, and schemas live on the route builder (enforced framework-wide). Only MCP-protocol extras (annotations, icons) remain on mcp() itself.
import { mcp } from '@routecraft/ai'
import { z } from 'zod'
craft()
.id('fetch-webpage')
.title('Fetch webpage')
.description('Fetch the content of a webpage')
.input({ body: z.object({ url: z.string().url() }) })
.output({ body: z.object({ content: z.string() }) })
.from(mcp({ annotations: { readOnlyHint: true, openWorldHint: true } }))
.transform(async ({ url }) => {
const res = await fetch(url)
return { content: await res.text() }
})
A non-empty .description() is required for every MCP source route (surfaced as the tool description in tools/list); the route fails to subscribe otherwise. The tool name (route id) is validated against the MCP interop regex ^[A-Za-z0-9_-]{1,64}$.
Destination mode -- call a remote MCP tool:
// Recommended: by server id registered in mcpPlugin({ clients }).
// Auth is inherited from the client config automatically.
.enrich(mcp('browser:browser_navigate', { args: (ex) => ({ url: ex.body.url }) }))
// By URL and tool name (use inline auth if needed)
.enrich(mcp({ url: 'http://127.0.0.1:8089/mcp', tool: 'browser_navigate' }, { args: (ex) => ({ url: ex.body.url }) }))
When using the serverId path (recommended), auth configured on the client in mcpPlugin({ clients }) flows to the destination automatically. Inline auth on McpClientOptions is available as an escape hatch for the raw url path or to override registered config, but prefer centralizing credentials in the plugin config.
Options (McpServerOptions -- source, protocol extras only):
| Option | Type | Required | Description |
|---|---|---|---|
annotations | McpToolAnnotations | No | Behavior hints forwarded to MCP clients in the tools/list response |
icons | McpToolIcon[] | No | Icons forwarded on tools/list per the MCP spec |
All other tool metadata (title, description, input / output schemas) comes from the route builder and is enforced framework-wide:
| Builder method | Maps to | Notes |
|---|---|---|
.id('tool-name') | tool.name | Validated against ^[A-Za-z0-9_-]{1,64}$ at subscribe |
.title('...') | tool.title | Optional display title |
.description('...') | tool.description | Required for MCP source routes |
.input({ body, headers }) | tool.inputSchema + runtime check | body validation is framework-enforced; headers validated values merge over the originals |
.output({ body, headers }) | tool.outputSchema + runtime check | Framework-enforced before the primary destination fires |
McpToolAnnotations (optional hint fields, all booleans unless noted):
These mirror the MCP specification (2025-03-26) ToolAnnotations shape. They are hints only; clients must not rely on them for correctness or safety.
| Field | Type | Description |
|---|---|---|
title | string | Human-readable title for the tool (used for display in UIs). |
readOnlyHint | boolean | When true, the tool does not modify any state. Clients assume false when omitted. |
destructiveHint | boolean | When true, the tool may perform destructive operations. Clients assume true when omitted. |
idempotentHint | boolean | When true, calling the tool repeatedly with the same arguments has no additional effect. Clients assume false when omitted. |
openWorldHint | boolean | When true, the tool may interact with external systems (network, filesystem, etc.). Clients assume true when omitted. |
Options (McpClientOptions -- destination):
| Option | Type | Required | Description |
|---|---|---|---|
url | string | One of url/serverId | Direct HTTP URL of the remote MCP server |
serverId | string | One of url/serverId | Named server registered via mcpPlugin({ clients }) |
tool | string | No | Tool name to invoke (or set exchange.body.tool) |
args | (exchange) => Record<string, unknown> | No | Extractor for tool arguments; defaults to exchange.body |
auth | McpClientAuthOptions | No | Auth credentials for HTTP requests. Auto-inherited from mcpPlugin({ clients }) when using serverId; use to override or for inline url connections |
McpClientAuthOptions:
| Field | Type | Description |
|---|---|---|
token | string | string[] | (() => string | Promise<string>) | Bearer token, array of tokens (round-robin), or provider function called per request |
headers | Record<string, string> | Additional request headers; overrides token if Authorization is set |
Tool Registry
Each .from(mcp(...)) route registers in MCP_LOCAL_TOOL_REGISTRY so the MCP server can list and invoke it via the MCP protocol:
import { MCP_LOCAL_TOOL_REGISTRY } from '@routecraft/ai'
const ctx = await new ContextBuilder().routes(...).build()
await ctx.start()
const registry = ctx.getStore(MCP_LOCAL_TOOL_REGISTRY)
const tools = registry ? Array.from(registry.values()) : []
// [{ endpoint, title?, description, input?, output?, annotations?, icons?, handler }]
mcp() and direct() maintain separate, fully isolated registries. An MCP route with .id('foo').from(mcp()) and a direct route with .id('bar').from(direct()) both register by their own ids in their own stores; direct routes never appear in the MCP tools/list response.
See Expose as MCP and Call an MCP.
llm
import { llm } from '@routecraft/ai'
Call a language model and get text or structured output. Requires llmPlugin() in your context plugins.
import { llm } from '@routecraft/ai'
// Text output
craft()
.id('summarise')
.from(source)
.enrich(llm('anthropic:claude-haiku-4-5-20251001', {
system: 'Summarise the following in one sentence.',
user: (ex) => ex.body.content,
}))
.to(log())
// Result merged into body: { ..., text: '...', usage: { inputTokens, outputTokens } }
// Structured output with Zod schema
import { z } from 'zod'
const sentimentSchema = z.object({
sentiment: z.enum(['positive', 'neutral', 'negative']),
confidence: z.number(),
})
craft()
.id('classify')
.from(source)
.enrich(llm('openai:gpt-4o', {
system: 'Classify the sentiment of the text.',
user: (ex) => ex.body.text,
output: sentimentSchema,
}))
.to(log())
// result.output is typed as { sentiment: string, confidence: number }
Model ID format: "provider:model-name" (e.g., "ollama:llama3.2", "anthropic:claude-sonnet-4-6").
Supported providers: openai, anthropic, ollama, openrouter, gemini
Options:
| Option | Type | Default | Description |
|---|---|---|---|
system | string | (exchange) => string | -- | System prompt (static or derived from exchange) |
user | string | (exchange) => string | -- | User prompt (static or derived from exchange) |
output | StandardSchemaV1 | -- | Zod/Valibot/ArkType schema for structured output |
temperature | number | -- | Sampling temperature |
maxTokens | number | -- | Maximum tokens to generate |
topP | number | -- | Top-p sampling |
frequencyPenalty | number | -- | Frequency penalty |
presencePenalty | number | -- | Presence penalty |
Result shape (merged into body by .enrich()):
| Field | Type | Description |
|---|---|---|
text | string | Raw model output |
output | T | Parsed structured output (only when an output schema was supplied) |
usage.inputTokens | number | Input token count |
usage.outputTokens | number | Output token count |
usage.totalTokens | number | Total token count |
Provider credentials are configured once in llmPlugin() and shared across all llm() calls. See Plugins reference.
agent
import { agent } from '@routecraft/ai'
Run an LLM with a fixed system prompt on each incoming exchange. Replaces the body with AgentResult { text, usage? }. Two forms:
- Inline (
agent({ model, system, user? })) -- identity and description come from the enclosing route (.id(),.description()). Suitable when the route is the agent. - By name (
agent("summariser")) -- resolves a registered agent from the context. Register agents viaagentPlugin({ agents: { name: {...} } })(Plugins reference).
import { agent, agentPlugin } from '@routecraft/ai'
import { readFileSync } from 'node:fs'
// Inline: the route IS the agent. Other routes call it via direct("zoe").
craft()
.id('zoe')
.description('Internal ops assistant')
.from(direct())
.to(agent({
model: 'anthropic:claude-opus-4-7',
system: readFileSync('./prompts/zoe.md', 'utf-8'),
}))
.to(direct('reply'))
// By name: register once, use from any route in the context. Per-agent
// fields can be omitted when defaultOptions supplies them.
agentPlugin({
defaultOptions: {
model: 'anthropic:claude-opus-4-7',
},
agents: {
summariser: {
description: 'Summarises documents into bullet points',
system: 'Be concise.',
// model inherited from defaultOptions
},
},
})
craft()
.id('periodic-summary')
.from(timer({ intervalMs: 60_000 }))
.to(agent('summariser'))
.to(log())
Model ID format: "provider:model-name" (same as llm()). The provider must be registered via llmPlugin({ providers: {...} }). There is no inline-credentials escape hatch on agent({...}); centralised wiring via llmPlugin is the only path.
Supported providers: openai, anthropic, ollama, openrouter, gemini
AgentOptions (inline form):
| Option | Type | Default | Required | Description |
|---|---|---|---|---|
model | LlmModelId | -- | No* | "provider:model" string resolved via llmPlugin. Required unless defaultOptions.model supplies a fallback; otherwise dispatch throws RC5003 |
system | string | -- | Yes | System prompt. Load from disk yourself when sourcing from a file |
user | (exchange) => string | body as-is / JSON | No | Override for deriving the user prompt. Defaults to body (string as-is, JSON for objects) |
tools | ToolSelection | -- | No | Tool whitelist built via tools([...]). Inherits defaultOptions.tools when omitted; an explicit value replaces the default entirely |
principal | boolean | (principal, exchange) => string | false | No | When true, append a built-in ## Caller section to the system prompt describing exchange.principal (identity + roles), or stating the request is unauthenticated. Pass a function to render the section yourself. See Telling the agent who the caller is |
output | StandardSchemaV1 | -- | No | Schema for structured output. Validated and parsed onto AgentResult.output after dispatch (runtime ships in a follow-up release) |
AgentRegisteredOptions (entries in agentPlugin({ agents: {...} }), for by-name reuse): same as AgentOptions plus:
| Option | Type | Default | Required | Description |
|---|---|---|---|---|
description | string | -- | Yes | Human-readable description. Surfaces in observability and is used as the tool description when the agent is exposed to other agents |
The id is the record key in agentPlugin({ agents: { [id]: {...} } }).
Result shape (body is replaced by .to()):
| Field | Type | Description |
|---|---|---|
text | string | Generated text from the model |
output | T | Parsed structured output (only when an output schema was supplied; runtime ships in a follow-up) |
usage.inputTokens | number | Input token count (when reported) |
usage.outputTokens | number | Output token count (when reported) |
usage.totalTokens | number | Total token count (when reported) |
Resolution semantics:
agent("name")only resolves registered agents. To call a route-backed agent from another route, use.to(direct("route-id")).directruns the full pipeline of the target route;agent("name")runs the registered agent's LLM call inline.- Model resolution at dispatch is
instance value > defaultOptions.model > throw RC5003. - Duplicate registered agent ids, missing description, malformed model string when present, or a non-
ToolSelectiontoolsvalue fail at context init withRC5003(Adapter misconfigured). - Referencing an unknown registered agent name fails at dispatch with
RC5004(No handler available).
Provider credentials are configured once in llmPlugin() and shared across all agent() calls. See Plugins reference.
Telling the agent who the caller is
By default the only part of the exchange that reaches the model is the body (as the user prompt). The authenticated caller (exchange.principal) is not in the prompt, so the model does not know who it is serving unless you put that there yourself.
Set principal: true to append a ## Caller section to the system prompt. It is appended after your own prompt and any skills, and it covers the unauthenticated case explicitly so the model never invents an identity:
agent({
model: 'anthropic:claude-opus-4-7',
system: 'You are a support assistant.',
principal: true,
});
When the request is authenticated, the model sees:
## Caller
The current request is authenticated.
- Name: Jane Doe
- Email: jane@example.com
- Subject: user_2a9f
- Roles: admin, editor
When there is no principal:
## Caller
The current request is not authenticated. No verified user identity is
available. Do not assume, infer, or invent the caller's name, email, or
permissions.
Only the loggable identity fields (name, email, subject) and roles are surfaced; fields that are absent on the principal are omitted, and interpolated values have newlines collapsed so a subject-controlled field (a self-service display name, say) cannot forge prompt structure. Scopes, claims, userinfoClaims, and the bearer token are never injected. The block is informational context only: authorization is still enforced by .authorize() and tool guards, never by the model.
To control the wording or which fields are shown, pass a function instead of true. It receives the principal (undefined when unauthenticated) and the exchange, and returns the markdown to append (return '' to append nothing). Your renderer owns its own escaping and the same field exclusions apply:
agent({
model: 'anthropic:claude-opus-4-7',
system: 'You are a support assistant.',
principal: (p) =>
p ? `## Caller\n\nYou are assisting ${p.name ?? p.subject}.` : '',
});
To opt every agent in a context into caller-awareness at once, set principal on agentPlugin({ defaultOptions }); a per-agent principal (including false) overrides it.
Inside a tool handler, the same principal is available as ctx.principal (a deep-frozen, read-only snapshot).
embedding
import { embedding } from '@routecraft/ai'
Generate vector embeddings from text. Requires embeddingPlugin() in your context plugins.
import { embedding } from '@routecraft/ai'
craft()
.id('embed-document')
.from(source)
.enrich(embedding('openai:text-embedding-3-small', {
using: (ex) => ex.body.content,
}))
.to(vectorStore)
// Result merged into body: { ..., embedding: [0.123, -0.456, ...] }
// Embed a combination of fields
.enrich(embedding('ollama:nomic-embed-text', {
using: (ex) => `${ex.body.title} ${ex.body.description}`,
}))
Model ID format: "provider:model-name" (e.g., "huggingface:all-MiniLM-L6-v2", "ollama:nomic-embed-text").
Supported providers: huggingface (local ONNX, no API key), ollama, openai, mock (deterministic test vectors)
Options:
| Option | Type | Required | Description |
|---|---|---|---|
using | (exchange) => string | string[] | Yes | Extract the text to embed from the exchange |
Result shape (merged into body by .enrich()):
| Field | Type | Description |
|---|---|---|
embedding | number[] | Vector representation of the input text |
Provider credentials are configured once in embeddingPlugin() and shared across all embedding() calls. See Plugins reference.
Clustering adapters
group
import { group } from '@routecraft/routecraft'
Transformer that groups an array into clusters using a comparator. Use with .transform(group(options)). By default it reads the body as the array and replaces the body with the array of clusters; use from / to to read and write sub-fields, and map to shape each cluster.
.transform(group({
comparator: cosine({ field: 'embedding', threshold: 0.82 }),
from: (body) => body.items,
map: (cluster) => ({ size: cluster.length, first: cluster[0] }),
}))
Options (GroupOptions):
| Option | Type | Required | Description |
|---|---|---|---|
comparator | Comparator<T> | Yes | Decides whether two items belong in the same cluster (e.g. from cosine()) |
from | (body) => T[] | No | Read the array to cluster (default: the body itself) |
map | (cluster: T[]) => R | No | Shape each resulting cluster (default: the raw cluster) |
to | (body, result: R[]) => unknown | No | Write the clusters back (default: replace the body) |
cosine
import { cosine } from '@routecraft/routecraft'
Comparator that groups items by cosine similarity of a numeric vector field. Pass it to group({ comparator: cosine(options) }).
.transform(group({
comparator: cosine({ field: 'embedding', threshold: 0.85 }),
from: (body) => body.items,
}))
Options (CosineOptions):
| Option | Type | Required | Description |
|---|---|---|---|
field | string | Yes | Property on each item holding the embedding vector (number[]) |
threshold | number | No | Items cluster when their cosine similarity is strictly greater than this value (default: 0.82) |
Items whose field is not an array never match.
Related
Adapters
How adapters work and how to configure them.
Creating adapters
Build your own source, destination, or processor adapter.
Testing
Test your capabilities with testContext() and the spy() adapter.