Creating adapters

Build your own source, destination, or processor adapter.

When the built-in adapters do not cover a use case, you can write your own. Adapters are plain TypeScript classes that implement one of three interfaces.

Source

A source produces data and starts the pipeline. Implement the Source interface:

import { type Source } from '@routecraft/routecraft'

class MyQueueAdapter implements Source<Message> {
  readonly adapterId = 'acme.adapter.my-queue'

  async subscribe(context, handler, abort) {
    while (!abort.signal.aborted) {
      const message = await queue.receive()
      await handler(message)
    }
  }
}

Destination

A destination receives the final exchange. Implement the Destination interface:

import { type Destination } from '@routecraft/routecraft'

class MyStorageAdapter implements Destination<Record<string, unknown>, void> {
  readonly adapterId = 'acme.adapter.my-storage'

  async send(exchange) {
    await storage.write(exchange.body)
  }
}

If send returns a value, the exchange body is replaced with it. If it returns nothing, the body is unchanged.

Use a Destination with .enrich() when you need to fetch external data and merge it into the body:

class MyEnricherAdapter implements Destination<InputType, ExtraFields> {
  readonly adapterId = 'acme.adapter.my-enricher'

  async send(exchange) {
    return fetchExtra(exchange.body.id)
  }
}

// The returned value is merged into the body
.enrich(myEnricher({ apiKey: process.env.ENRICH_KEY }))

Processor

A processor sits in the middle of a pipeline and modifies the exchange. Implement the Processor interface. Use this when you need header or context access alongside body reshaping -- for body-only changes, .transform() is the simpler choice:

import { type Processor } from '@routecraft/routecraft'

class MyTransformAdapter implements Processor<InputType, OutputType> {
  readonly adapterId = 'acme.adapter.my-transform'

  async process(exchange) {
    const tenantId = exchange.headers['x-tenant']
    return { ...exchange, body: { ...exchange.body, tenantId } }
  }
}

Factory function

Expose your adapter as a factory function so it reads naturally in the DSL. The recommended pattern is one factory per adapter -- one name, one import:

// adapters/my-storage.ts
export function myStorage(options?: MyStorageOptions) {
  return new MyStorageAdapter(options)
}

// Usage -- destination
.to(myStorage({ bucket: 'uploads' }))
// adapters/my-queue.ts
export function myQueue(options?: MyQueueOptions) {
  return new MyQueueAdapter(options)
}

// Usage -- source
.from(myQueue({ queue: 'orders' }))
// adapters/my-enricher.ts
export function myEnricher(options?: MyEnricherOptions) {
  return new MyEnricherAdapter(options)
}

// Usage -- enricher (merges result into body)
.enrich(myEnricher({ apiKey: process.env.ENRICH_KEY }))

Keeping one factory per adapter makes imports predictable and avoids a proliferation of role-suffixed exports (myQueueSource, myQueueDestination, etc.). The adapter class itself handles the role -- the factory just wires up the options.

An adapter class can implement multiple interfaces when it makes sense. A queue adapter, for example, may work as both a source and a destination:

class MyQueueAdapter implements Source<Message>, Destination<Message, void> {
  readonly adapterId = 'acme.adapter.my-queue'

  async subscribe(context, handler, abort) {
    while (!abort.signal.aborted) {
      const message = await queue.receive(this.options.queue)
      await handler(message)
    }
  }

  async send(exchange) {
    await queue.send(this.options.queue, exchange.body)
  }
}

export function myQueue(options: MyQueueOptions) {
  return new MyQueueAdapter(options)
}

// Same factory, different positions
.from(myQueue({ queue: 'orders' }))
.to(myQueue({ queue: 'results' }))

Sharing state between adapters

Adapters can use the context store to share state, read global configuration set by plugins, or maintain connections across exchanges.

class DbAdapter implements Destination<any, void> {
  async send(exchange) {
    const config = exchange.context.getStore('db.config')
    await db(config.connectionString).insert(exchange.body)
  }
}

See Plugins for how to populate the context store at startup.


Adapters

How adapters work and how to configure them.

Adapters reference

Full catalog with all options and signatures.

Previous
Composing Capabilities