Creating adapters
Build your own source, destination, or processor adapter.
When the built-in adapters do not cover a use case, you can write your own. Adapters are plain TypeScript classes that implement one of three interfaces.
Source
A source produces data and starts the pipeline. Implement the Source interface:
import { type Source } from '@routecraft/routecraft'
class MyQueueAdapter implements Source<Message> {
readonly adapterId = 'acme.adapter.my-queue'
async subscribe(context, handler, abort) {
while (!abort.signal.aborted) {
const message = await queue.receive()
await handler(message)
}
}
}
Destination
A destination receives the final exchange. Implement the Destination interface:
import { type Destination } from '@routecraft/routecraft'
class MyStorageAdapter implements Destination<Record<string, unknown>, void> {
readonly adapterId = 'acme.adapter.my-storage'
async send(exchange) {
await storage.write(exchange.body)
}
}
If send returns a value, the exchange body is replaced with it. If it returns nothing, the body is unchanged.
Use a Destination with .enrich() when you need to fetch external data and merge it into the body:
class MyEnricherAdapter implements Destination<InputType, ExtraFields> {
readonly adapterId = 'acme.adapter.my-enricher'
async send(exchange) {
return fetchExtra(exchange.body.id)
}
}
// The returned value is merged into the body
.enrich(myEnricher({ apiKey: process.env.ENRICH_KEY }))
Processor
A processor sits in the middle of a pipeline and modifies the exchange. Implement the Processor interface. Use this when you need header or context access alongside body reshaping -- for body-only changes, .transform() is the simpler choice:
import { type Processor } from '@routecraft/routecraft'
class MyTransformAdapter implements Processor<InputType, OutputType> {
readonly adapterId = 'acme.adapter.my-transform'
async process(exchange) {
const tenantId = exchange.headers['x-tenant']
return { ...exchange, body: { ...exchange.body, tenantId } }
}
}
Factory function
Expose your adapter as a factory function so it reads naturally in the DSL. The recommended pattern is one factory per adapter -- one name, one import:
// adapters/my-storage.ts
export function myStorage(options?: MyStorageOptions) {
return new MyStorageAdapter(options)
}
// Usage -- destination
.to(myStorage({ bucket: 'uploads' }))
// adapters/my-queue.ts
export function myQueue(options?: MyQueueOptions) {
return new MyQueueAdapter(options)
}
// Usage -- source
.from(myQueue({ queue: 'orders' }))
// adapters/my-enricher.ts
export function myEnricher(options?: MyEnricherOptions) {
return new MyEnricherAdapter(options)
}
// Usage -- enricher (merges result into body)
.enrich(myEnricher({ apiKey: process.env.ENRICH_KEY }))
Keeping one factory per adapter makes imports predictable and avoids a proliferation of role-suffixed exports (myQueueSource, myQueueDestination, etc.). The adapter class itself handles the role -- the factory just wires up the options.
An adapter class can implement multiple interfaces when it makes sense. A queue adapter, for example, may work as both a source and a destination:
class MyQueueAdapter implements Source<Message>, Destination<Message, void> {
readonly adapterId = 'acme.adapter.my-queue'
async subscribe(context, handler, abort) {
while (!abort.signal.aborted) {
const message = await queue.receive(this.options.queue)
await handler(message)
}
}
async send(exchange) {
await queue.send(this.options.queue, exchange.body)
}
}
export function myQueue(options: MyQueueOptions) {
return new MyQueueAdapter(options)
}
// Same factory, different positions
.from(myQueue({ queue: 'orders' }))
.to(myQueue({ queue: 'results' }))
Sharing state between adapters
Adapters can use the context store to share state, read global configuration set by plugins, or maintain connections across exchanges.
class DbAdapter implements Destination<any, void> {
async send(exchange) {
const config = exchange.context.getStore('db.config')
await db(config.connectionString).insert(exchange.body)
}
}
See Plugins for how to populate the context store at startup.
Related
Adapters
How adapters work and how to configure them.
Adapters reference
Full catalog with all options and signatures.