Skip to content

Inline Projections

Build read models with read-after-write consistency

This guide walks you through implementing inline projections — projections that run synchronously within the same request as the event append. After reading this, your read models will be up-to-date the moment your command handler returns.


An order management system where:

  1. Commands append events to the event store
  2. Inline projections update read models in the same request
  3. Queries return consistent data immediately after writes

Terminal window
pnpm add @delta-base/server @delta-base/toolkit

src/core/orders.ts
import type { Event } from '@delta-base/toolkit';
export type OrderEvent =
| Event<'order.placed', {
orderId: string;
customerId: string;
items: Array<{ productId: string; name: string; quantity: number; price: number }>;
totalAmount: number;
}>
| Event<'order.shipped', {
orderId: string;
trackingNumber: string;
shippedAt: string;
}>
| Event<'order.delivered', {
orderId: string;
deliveredAt: string;
}>
| Event<'order.cancelled', {
orderId: string;
reason: string;
}>;
src/projections/order-summary.ts
export type OrderSummaryReadModel = {
orderId: string;
customerId: string;
items: Array<{ productId: string; name: string; quantity: number; price: number }>;
totalAmount: number;
status: 'pending' | 'shipped' | 'delivered' | 'cancelled';
trackingNumber?: string;
placedAt: string;
shippedAt?: string;
deliveredAt?: string;
};

Use singleStreamProjection for one-document-per-stream projections:

src/projections/order-summary.ts
import { singleStreamProjection } from '@delta-base/toolkit';
import type { IReadModelStore } from '@delta-base/toolkit';
import type { OrderEvent } from '../core/orders';
export type OrderSummaryReadModel = {
orderId: string;
customerId: string;
items: Array<{ productId: string; name: string; quantity: number; price: number }>;
totalAmount: number;
status: 'pending' | 'shipped' | 'delivered' | 'cancelled';
trackingNumber?: string;
placedAt: string;
shippedAt?: string;
deliveredAt?: string;
};
export function createOrderSummaryProjection(store: IReadModelStore) {
return singleStreamProjection<OrderSummaryReadModel, OrderEvent>({
projectionName: 'order-summary',
store,
canHandle: ['order.placed', 'order.shipped', 'order.delivered', 'order.cancelled'],
getDocumentId: (streamId) => `order-summary:${streamId}`,
initialState: () => ({
orderId: '',
customerId: '',
items: [],
totalAmount: 0,
status: 'pending',
placedAt: '',
}),
// evolve must be pure and deterministic -- same input, same output
evolve: (doc, event) => {
switch (event.type) {
case 'order.placed':
return {
...doc,
orderId: event.data.orderId,
customerId: event.data.customerId,
items: event.data.items,
totalAmount: event.data.totalAmount,
status: 'pending',
placedAt: event.createdAt,
};
case 'order.shipped':
return {
...doc,
status: 'shipped',
trackingNumber: event.data.trackingNumber,
shippedAt: event.data.shippedAt,
};
case 'order.delivered':
return {
...doc,
status: 'delivered',
deliveredAt: event.data.deliveredAt,
};
case 'order.cancelled':
return {
...doc,
status: 'cancelled',
};
default:
return doc;
}
},
});
}

Step 4: Register the Projection on the EventStore

Section titled “Step 4: Register the Projection on the EventStore”
src/shared/event-store.ts
import { DeltaBase } from '@delta-base/server';
import { projections, KVReadModelStore } from '@delta-base/toolkit';
import { createOrderSummaryProjection } from '../projections/order-summary';
const deltabase = new DeltaBase({
apiKey: process.env.DELTABASE_API_KEY,
});
// Use any IReadModelStore implementation
const readModelStore = new KVReadModelStore(env.ORDER_READ_MODELS);
// Create the projection
const orderSummaryProjection = createOrderSummaryProjection(readModelStore);
// Register it -- every appendToStream now runs this projection
export const eventStore = deltabase.getEventStore('orders', {
projections: projections.inline([orderSummaryProjection]),
});
export { readModelStore };
src/routes/orders.ts
import { Hono } from 'hono';
import { handleCommandWithDecider } from '@delta-base/toolkit';
import { eventStore, readModelStore } from '../shared/event-store';
import { Orders } from '../core/orders';
import type { OrderSummaryReadModel } from '../projections/order-summary';
const app = new Hono();
app.post('/orders/:id/place', async (c) => {
const orderId = c.req.param('id');
const body = await c.req.json();
await handleCommandWithDecider(
eventStore,
orderId,
{ type: 'order.place', data: { orderId, ...body } },
Orders.decider,
);
// Read model is already up-to-date!
const order = await readModelStore.get<OrderSummaryReadModel>(
`order-summary:${orderId}`
);
return c.json(order);
});
app.post('/orders/:id/ship', async (c) => {
const orderId = c.req.param('id');
const body = await c.req.json();
await handleCommandWithDecider(
eventStore,
orderId,
{ type: 'order.ship', data: { orderId, ...body } },
Orders.decider,
);
const order = await readModelStore.get<OrderSummaryReadModel>(
`order-summary:${orderId}`
);
return c.json(order);
});
// Queries just read from the store -- no event processing needed
app.get('/orders/:id', async (c) => {
const order = await readModelStore.get<OrderSummaryReadModel>(
`order-summary:${c.req.param('id')}`
);
if (!order) return c.json({ error: 'Order not found' }, 404);
return c.json(order);
});

For read models that aggregate across multiple streams, use multiStreamProjection. A common example: a customer dashboard that tracks orders across all order streams.

import { multiStreamProjection } from '@delta-base/toolkit';
type CustomerDashboard = {
totalOrders: number;
totalSpent: number;
lastOrderAt: string | null;
};
const customerDashboard = multiStreamProjection<CustomerDashboard, OrderEvent>({
projectionName: 'customer-dashboard',
store: readModelStore,
canHandle: ['order.placed', 'order.cancelled'],
// Key derived from the event, not the stream
getDocumentId: (event) => `customer:${event.data.customerId}`,
initialState: () => ({ totalOrders: 0, totalSpent: 0, lastOrderAt: null }),
evolve: (doc, event) => {
switch (event.type) {
case 'order.placed':
return {
...doc,
totalOrders: doc.totalOrders + 1,
totalSpent: doc.totalSpent + event.data.totalAmount,
lastOrderAt: event.createdAt,
};
case 'order.cancelled':
return {
...doc,
totalOrders: doc.totalOrders - 1,
};
default:
return doc;
}
},
});

Register multiple projections together:

const eventStore = deltabase.getEventStore('orders', {
projections: projections.inline([
orderSummaryProjection, // single-stream
customerDashboard, // multi-stream
]),
});

Declarative projections are great when your read model lives in a KV store. But sometimes you want to write to SQL, use Drizzle, or structure your storage differently. That’s what InlineProjection<T> is for.

Instead of an evolve function, you implement processEvents — the SDK calls it with batches of ReadEvent<T>[] and you do whatever you want with them.

import type { InlineProjection } from '@delta-base/toolkit';
import { eq } from 'drizzle-orm';
import { db, orderSummaries, projectionCursors } from '../db/schema';
const orderProjection: InlineProjection<OrderEvent> = {
supportedEventTypes: ['order.placed', 'order.shipped', 'order.cancelled'],
async processEvents(events) {
// Wrap the whole batch in a transaction
await db.transaction(async (tx) => {
for (const event of events) {
switch (event.type) {
case 'order.placed':
await tx.insert(orderSummaries).values({
orderId: event.data.orderId,
customerId: event.data.customerId,
totalAmount: event.data.totalAmount,
status: 'pending',
}).onConflictDoNothing();
break;
case 'order.shipped':
await tx.update(orderSummaries)
.set({ status: 'shipped', trackingNumber: event.data.trackingNumber })
.where(eq(orderSummaries.orderId, event.data.orderId));
break;
case 'order.cancelled':
await tx.update(orderSummaries)
.set({ status: 'cancelled' })
.where(eq(orderSummaries.orderId, event.data.orderId));
break;
}
}
// Update cursor in the same transaction
const last = events.at(-1);
if (last) {
await tx.insert(projectionCursors)
.values({ name: 'order-summary', position: last.globalPosition })
.onConflictDoUpdate({
target: projectionCursors.name,
set: { position: last.globalPosition },
});
}
});
},
// Tell the SDK where you left off -- it handles the rest
async lastProcessedPosition() {
const row = await db.select()
.from(projectionCursors)
.where(eq(projectionCursors.name, 'order-summary'))
.get();
return row?.position ?? 0;
},
};

Register it the same way:

const eventStore = deltabase.getEventStore('orders', {
projections: projections.inline([orderProjection]),
});

When lastProcessedPosition is defined, the SDK automatically:

  1. Reads events from (lastProcessedPosition + 1) in batches of 100
  2. Calls processEvents for each batch (your transaction boundary)
  3. Skips already-processed events in handleAppend (idempotency guard)

The SDK never persists the cursor — that’s your job, inside processEvents. This means your cursor update and your read model update happen in the same transaction. No split-brain.

If you need a different replay strategy (e.g. bulk inserts without per-event processing), implement catchUp instead:

const bulkProjection: InlineProjection<OrderEvent> = {
supportedEventTypes: ['order.placed'],
async processEvents(events) {
// Normal per-batch processing for live events
await db.insert(orderSummaries)
.values(events.map(toRow))
.onConflictDoNothing();
},
async catchUp(eventStore) {
// Custom: bulk-load everything in one shot
const result = await eventStore.queryEvents({
type: ['order.placed'],
sortBy: 'globalPosition',
sortDirection: 'asc',
});
await db.insert(orderSummaries)
.values(result.events.map(toRow))
.onConflictDoNothing();
},
};

When catchUp is defined, the SDK delegates to it entirely and doesn’t use lastProcessedPosition for the replay loop. lastProcessedPosition still guards handleAppend for live events though.


DCB (Dynamic Consistency Boundary) Example

Section titled “DCB (Dynamic Consistency Boundary) Example”

With DCB, events are appended to a single global stream using append() instead of appendToStream(). Events use tags to identify the entities they belong to. Inline projections work identically — they use multiStreamProjection with getDocumentId deriving keys from event tags.

import { multiStreamProjection, projections } from '@delta-base/toolkit';
import { DeltaBase } from '@delta-base/server';
// DCB events use tags instead of stream IDs
type CourseEvent =
| { type: 'CourseDefined'; data: { courseId: string; title: string; capacity: number } }
| { type: 'CourseCapacityChanged'; data: { courseId: string; newCapacity: number } }
| { type: 'CourseArchived'; data: { courseId: string } };
const courseProjection = multiStreamProjection<
{ exists: boolean; title: string | null; capacity: number },
CourseEvent
>({
projectionName: 'course-summary',
store: readModelStore,
canHandle: ['CourseDefined', 'CourseCapacityChanged', 'CourseArchived'],
// Derive document key from event tags (DCB pattern)
getDocumentId: (event) => {
const courseTag = event.tags?.find((t) => t.startsWith('course:'));
return courseTag ?? `course:${event.data.courseId}`;
},
initialState: () => ({ exists: false, title: null, capacity: 0 }),
evolve: (doc, event) => {
switch (event.type) {
case 'CourseDefined':
return { exists: true, title: event.data.title, capacity: event.data.capacity };
case 'CourseCapacityChanged':
return { ...doc, capacity: event.data.newCapacity };
case 'CourseArchived':
return { ...doc, exists: false };
default:
return doc;
}
},
});
const eventStore = deltabase.getEventStore('courses', {
projections: projections.inline([courseProjection]),
});
// DCB command handling
app.post('/courses/:id/change-capacity', async (c) => {
const courseId = c.req.param('id');
const { newCapacity } = await c.req.json();
// 1. Build decision model
const { state, appendCondition } = await eventStore.buildDecisionModel({
query: {
items: [{
types: ['CourseDefined', 'CourseArchived', 'CourseCapacityChanged'],
tags: [`course:${courseId}`],
}],
},
initialState: () => ({ exists: false, capacity: 0 }),
evolve: (state, event) => {
switch (event.type) {
case 'CourseDefined':
return { exists: true, capacity: event.data.capacity };
case 'CourseArchived':
return { ...state, exists: false };
case 'CourseCapacityChanged':
return { ...state, capacity: event.data.newCapacity };
default:
return state;
}
},
});
// 2. Validate
if (!state.exists) throw new Error('Course not found');
if (state.capacity === newCapacity) throw new Error('Capacity unchanged');
// 3. Append with condition -- projection runs automatically!
await eventStore.append(
[{
type: 'CourseCapacityChanged',
data: { courseId, newCapacity },
tags: [`course:${courseId}`],
}],
appendCondition,
);
// Read model is already consistent
const course = await readModelStore.get(`course:${courseId}`);
return c.json(course);
});

When you register a projection on an event store that already has events, the projection needs to process historical events before it’s current. This happens automatically.

Single-stream projections catch up per-stream. When an appendToStream happens for stream order-123:

  1. Check the document’s _version against the first appended event’s streamPosition
  2. If there’s a gap, read missing events for that stream only
  3. Process gap events, then the newly appended events
  4. Update _version

Multi-stream projections catch up on the first append call:

  1. Read the cursor from the store (__cursor:{projectionName})
  2. Query historical events from that position, in batches of 100
  3. Process each batch, advancing the cursor
  4. Then process the newly appended events

Imperative projections (with lastProcessedPosition) catch up on the first append call:

  1. Call lastProcessedPosition() to find the starting point
  2. Query events from (position + 1) in batches of 100
  3. Call processEvents for each batch
  4. The projection updates its own cursor inside processEvents

For production deployments, trigger catch-up during Worker startup to avoid slow first requests:

// In your Worker's fetch handler or startup logic
const eventStore = deltabase.getEventStore('orders', {
projections: projections.inline([orderSummaryProjection]),
});
// Pre-warm the projections
await eventStore.catchUpProjections();

Events are facts. They’re never rolled back if a projection fails.

// Default: projection failure throws (fails the request)
projections.inline([orderSummaryProjection], {
onError: 'throw',
});
// Log and continue: request succeeds, projection catches up next time
projections.inline([orderSummaryProjection], {
onError: 'log',
});
// Custom handler: report to monitoring, etc.
projections.inline([orderSummaryProjection], {
onError: (error, projection, events) => {
monitoring.captureException(error, {
projection: projection.projectionName,
eventCount: events.length,
});
},
});
  1. Events are safely persisted in the event store
  2. The projection’s _version / cursor was NOT advanced
  3. On the next appendToStream call, the projection detects the gap and catches up
  4. No manual intervention needed

Idempotency: Why It Matters and How to Get It Right

Section titled “Idempotency: Why It Matters and How to Get It Right”

In concurrent environments (e.g., multiple Cloudflare Worker isolates), the same event may be processed more than once. Inline projections handle this automatically through version tracking, but your evolve function needs to cooperate.

  1. evolve must be a pure function. Given the same document and event, it must always return the same result.

  2. No side effects in evolve. Don’t call APIs, write logs, or generate IDs inside evolve. All data should come from the event.

  3. Don’t use Date.now() or Math.random(). Use timestamps from the event (event.createdAt) instead.

  4. Prefer set-semantics over increment-semantics when possible. Setting status: 'shipped' is inherently idempotent. Incrementing orderCount += 1 is not — if the same event is processed twice, the count is wrong. The framework’s _version tracking prevents this, but defensive evolve functions make the system more robust.

// Good: Idempotent evolve
evolve: (doc, event) => {
switch (event.type) {
case 'order.placed':
return {
...doc,
orderId: event.data.orderId, // set, not increment
status: 'pending', // set, not toggle
placedAt: event.createdAt, // from event, not Date.now()
};
default:
return doc;
}
},
// Avoid: Non-deterministic evolve
evolve: (doc, event) => {
switch (event.type) {
case 'order.placed':
return {
...doc,
orderId: event.data.orderId,
status: 'pending',
placedAt: new Date().toISOString(), // Non-deterministic!
processedBy: getWorkerId(), // Side effect!
};
default:
return doc;
}
},

Inline projections work with InMemoryEventStore and InMemoryReadModelStore — no HTTP calls needed:

import { describe, it, expect, beforeEach } from 'vitest';
import {
InMemoryEventStore,
InMemoryReadModelStore,
projections,
} from '@delta-base/toolkit';
import { createOrderSummaryProjection } from '../projections/order-summary';
import type { OrderSummaryReadModel } from '../projections/order-summary';
describe('Order Summary Projection', () => {
let eventStore: InMemoryEventStore;
let readModelStore: InMemoryReadModelStore;
beforeEach(() => {
readModelStore = new InMemoryReadModelStore();
const projection = createOrderSummaryProjection(readModelStore);
eventStore = new InMemoryEventStore({
projections: projections.inline([projection]),
});
});
it('creates order summary on order.placed', async () => {
await eventStore.appendToStream('order-1', [{
type: 'order.placed',
data: {
orderId: 'order-1',
customerId: 'cust-1',
items: [{ productId: 'p1', name: 'Widget', quantity: 1, price: 9.99 }],
totalAmount: 9.99,
},
}]);
const summary = await readModelStore.get<OrderSummaryReadModel>(
'order-summary:order-1'
);
expect(summary).toBeDefined();
expect(summary?.status).toBe('pending');
expect(summary?.totalAmount).toBe(9.99);
});
it('updates status on order.shipped', async () => {
await eventStore.appendToStream('order-1', [{
type: 'order.placed',
data: {
orderId: 'order-1',
customerId: 'cust-1',
items: [{ productId: 'p1', name: 'Widget', quantity: 1, price: 9.99 }],
totalAmount: 9.99,
},
}]);
await eventStore.appendToStream('order-1', [{
type: 'order.shipped',
data: {
orderId: 'order-1',
trackingNumber: 'TRK-123',
shippedAt: '2026-01-15T10:00:00Z',
},
}]);
const summary = await readModelStore.get<OrderSummaryReadModel>(
'order-summary:order-1'
);
expect(summary?.status).toBe('shipped');
expect(summary?.trackingNumber).toBe('TRK-123');
});
it('catches up on gap between projection registration and existing events', async () => {
// Append events without projection
const plainStore = new InMemoryEventStore();
await plainStore.appendToStream('order-1', [{
type: 'order.placed',
data: {
orderId: 'order-1',
customerId: 'cust-1',
items: [],
totalAmount: 0,
},
}]);
// Now create a new event store with the projection
// The projection will catch up when the next append happens
const projection = createOrderSummaryProjection(readModelStore);
const storeWithProjection = new InMemoryEventStore({
projections: projections.inline([projection]),
});
// Trigger catch-up explicitly
await storeWithProjection.catchUpProjections();
});
});

You can run critical projections inline and keep others as async webhooks. This is the recommended approach for most production systems.

// Inline: user-facing read models that need read-after-write consistency
const eventStore = deltabase.getEventStore('orders', {
projections: projections.inline([
orderSummaryProjection,
customerDashboardProjection,
]),
});
// Async: analytics, notifications, cross-service sync
// These still use webhook subscriptions in deltabase.config.ts

The async subscriptions continue to work as before. Inline projections are additive — they don’t replace webhooks.