Inline Projections
Build read models with read-after-write consistency
This guide walks you through implementing inline projections — projections that run synchronously within the same request as the event append. After reading this, your read models will be up-to-date the moment your command handler returns.
What You’ll Build
Section titled “What You’ll Build”An order management system where:
- Commands append events to the event store
- Inline projections update read models in the same request
- Queries return consistent data immediately after writes
Prerequisites
Section titled “Prerequisites”pnpm add @delta-base/server @delta-base/toolkitStep 1: Define Your Events
Section titled “Step 1: Define Your Events”import type { Event } from '@delta-base/toolkit';
export type OrderEvent = | Event<'order.placed', { orderId: string; customerId: string; items: Array<{ productId: string; name: string; quantity: number; price: number }>; totalAmount: number; }> | Event<'order.shipped', { orderId: string; trackingNumber: string; shippedAt: string; }> | Event<'order.delivered', { orderId: string; deliveredAt: string; }> | Event<'order.cancelled', { orderId: string; reason: string; }>;Step 2: Define Your Read Model
Section titled “Step 2: Define Your Read Model”export type OrderSummaryReadModel = { orderId: string; customerId: string; items: Array<{ productId: string; name: string; quantity: number; price: number }>; totalAmount: number; status: 'pending' | 'shipped' | 'delivered' | 'cancelled'; trackingNumber?: string; placedAt: string; shippedAt?: string; deliveredAt?: string;};Step 3: Create an Inline Projection
Section titled “Step 3: Create an Inline Projection”Use singleStreamProjection for one-document-per-stream projections:
import { singleStreamProjection } from '@delta-base/toolkit';import type { IReadModelStore } from '@delta-base/toolkit';import type { OrderEvent } from '../core/orders';
export type OrderSummaryReadModel = { orderId: string; customerId: string; items: Array<{ productId: string; name: string; quantity: number; price: number }>; totalAmount: number; status: 'pending' | 'shipped' | 'delivered' | 'cancelled'; trackingNumber?: string; placedAt: string; shippedAt?: string; deliveredAt?: string;};
export function createOrderSummaryProjection(store: IReadModelStore) { return singleStreamProjection<OrderSummaryReadModel, OrderEvent>({ projectionName: 'order-summary', store, canHandle: ['order.placed', 'order.shipped', 'order.delivered', 'order.cancelled'], getDocumentId: (streamId) => `order-summary:${streamId}`,
initialState: () => ({ orderId: '', customerId: '', items: [], totalAmount: 0, status: 'pending', placedAt: '', }),
// evolve must be pure and deterministic -- same input, same output evolve: (doc, event) => { switch (event.type) { case 'order.placed': return { ...doc, orderId: event.data.orderId, customerId: event.data.customerId, items: event.data.items, totalAmount: event.data.totalAmount, status: 'pending', placedAt: event.createdAt, }; case 'order.shipped': return { ...doc, status: 'shipped', trackingNumber: event.data.trackingNumber, shippedAt: event.data.shippedAt, }; case 'order.delivered': return { ...doc, status: 'delivered', deliveredAt: event.data.deliveredAt, }; case 'order.cancelled': return { ...doc, status: 'cancelled', }; default: return doc; } }, });}Step 4: Register the Projection on the EventStore
Section titled “Step 4: Register the Projection on the EventStore”import { DeltaBase } from '@delta-base/server';import { projections, KVReadModelStore } from '@delta-base/toolkit';import { createOrderSummaryProjection } from '../projections/order-summary';
const deltabase = new DeltaBase({ apiKey: process.env.DELTABASE_API_KEY,});
// Use any IReadModelStore implementationconst readModelStore = new KVReadModelStore(env.ORDER_READ_MODELS);
// Create the projectionconst orderSummaryProjection = createOrderSummaryProjection(readModelStore);
// Register it -- every appendToStream now runs this projectionexport const eventStore = deltabase.getEventStore('orders', { projections: projections.inline([orderSummaryProjection]),});
export { readModelStore };Step 5: Use in a Command Handler
Section titled “Step 5: Use in a Command Handler”import { Hono } from 'hono';import { handleCommandWithDecider } from '@delta-base/toolkit';import { eventStore, readModelStore } from '../shared/event-store';import { Orders } from '../core/orders';import type { OrderSummaryReadModel } from '../projections/order-summary';
const app = new Hono();
app.post('/orders/:id/place', async (c) => { const orderId = c.req.param('id'); const body = await c.req.json();
await handleCommandWithDecider( eventStore, orderId, { type: 'order.place', data: { orderId, ...body } }, Orders.decider, );
// Read model is already up-to-date! const order = await readModelStore.get<OrderSummaryReadModel>( `order-summary:${orderId}` );
return c.json(order);});
app.post('/orders/:id/ship', async (c) => { const orderId = c.req.param('id'); const body = await c.req.json();
await handleCommandWithDecider( eventStore, orderId, { type: 'order.ship', data: { orderId, ...body } }, Orders.decider, );
const order = await readModelStore.get<OrderSummaryReadModel>( `order-summary:${orderId}` );
return c.json(order);});
// Queries just read from the store -- no event processing neededapp.get('/orders/:id', async (c) => { const order = await readModelStore.get<OrderSummaryReadModel>( `order-summary:${c.req.param('id')}` );
if (!order) return c.json({ error: 'Order not found' }, 404); return c.json(order);});Multi-Stream Projections
Section titled “Multi-Stream Projections”For read models that aggregate across multiple streams, use multiStreamProjection. A common example: a customer dashboard that tracks orders across all order streams.
import { multiStreamProjection } from '@delta-base/toolkit';
type CustomerDashboard = { totalOrders: number; totalSpent: number; lastOrderAt: string | null;};
const customerDashboard = multiStreamProjection<CustomerDashboard, OrderEvent>({ projectionName: 'customer-dashboard', store: readModelStore, canHandle: ['order.placed', 'order.cancelled'],
// Key derived from the event, not the stream getDocumentId: (event) => `customer:${event.data.customerId}`,
initialState: () => ({ totalOrders: 0, totalSpent: 0, lastOrderAt: null }),
evolve: (doc, event) => { switch (event.type) { case 'order.placed': return { ...doc, totalOrders: doc.totalOrders + 1, totalSpent: doc.totalSpent + event.data.totalAmount, lastOrderAt: event.createdAt, }; case 'order.cancelled': return { ...doc, totalOrders: doc.totalOrders - 1, }; default: return doc; } },});Register multiple projections together:
const eventStore = deltabase.getEventStore('orders', { projections: projections.inline([ orderSummaryProjection, // single-stream customerDashboard, // multi-stream ]),});Imperative Projections
Section titled “Imperative Projections”Declarative projections are great when your read model lives in a KV store. But sometimes you want to write to SQL, use Drizzle, or structure your storage differently. That’s what InlineProjection<T> is for.
Instead of an evolve function, you implement processEvents — the SDK calls it with batches of ReadEvent<T>[] and you do whatever you want with them.
import type { InlineProjection } from '@delta-base/toolkit';import { eq } from 'drizzle-orm';import { db, orderSummaries, projectionCursors } from '../db/schema';
const orderProjection: InlineProjection<OrderEvent> = { supportedEventTypes: ['order.placed', 'order.shipped', 'order.cancelled'],
async processEvents(events) { // Wrap the whole batch in a transaction await db.transaction(async (tx) => { for (const event of events) { switch (event.type) { case 'order.placed': await tx.insert(orderSummaries).values({ orderId: event.data.orderId, customerId: event.data.customerId, totalAmount: event.data.totalAmount, status: 'pending', }).onConflictDoNothing(); break; case 'order.shipped': await tx.update(orderSummaries) .set({ status: 'shipped', trackingNumber: event.data.trackingNumber }) .where(eq(orderSummaries.orderId, event.data.orderId)); break; case 'order.cancelled': await tx.update(orderSummaries) .set({ status: 'cancelled' }) .where(eq(orderSummaries.orderId, event.data.orderId)); break; } }
// Update cursor in the same transaction const last = events.at(-1); if (last) { await tx.insert(projectionCursors) .values({ name: 'order-summary', position: last.globalPosition }) .onConflictDoUpdate({ target: projectionCursors.name, set: { position: last.globalPosition }, }); } }); },
// Tell the SDK where you left off -- it handles the rest async lastProcessedPosition() { const row = await db.select() .from(projectionCursors) .where(eq(projectionCursors.name, 'order-summary')) .get(); return row?.position ?? 0; },};Register it the same way:
const eventStore = deltabase.getEventStore('orders', { projections: projections.inline([orderProjection]),});How catch-up works
Section titled “How catch-up works”When lastProcessedPosition is defined, the SDK automatically:
- Reads events from
(lastProcessedPosition + 1)in batches of 100 - Calls
processEventsfor each batch (your transaction boundary) - Skips already-processed events in
handleAppend(idempotency guard)
The SDK never persists the cursor — that’s your job, inside processEvents. This means your cursor update and your read model update happen in the same transaction. No split-brain.
Custom catch-up
Section titled “Custom catch-up”If you need a different replay strategy (e.g. bulk inserts without per-event processing), implement catchUp instead:
const bulkProjection: InlineProjection<OrderEvent> = { supportedEventTypes: ['order.placed'],
async processEvents(events) { // Normal per-batch processing for live events await db.insert(orderSummaries) .values(events.map(toRow)) .onConflictDoNothing(); },
async catchUp(eventStore) { // Custom: bulk-load everything in one shot const result = await eventStore.queryEvents({ type: ['order.placed'], sortBy: 'globalPosition', sortDirection: 'asc', }); await db.insert(orderSummaries) .values(result.events.map(toRow)) .onConflictDoNothing(); },};When catchUp is defined, the SDK delegates to it entirely and doesn’t use lastProcessedPosition for the replay loop. lastProcessedPosition still guards handleAppend for live events though.
DCB (Dynamic Consistency Boundary) Example
Section titled “DCB (Dynamic Consistency Boundary) Example”With DCB, events are appended to a single global stream using append() instead of appendToStream(). Events use tags to identify the entities they belong to. Inline projections work identically — they use multiStreamProjection with getDocumentId deriving keys from event tags.
import { multiStreamProjection, projections } from '@delta-base/toolkit';import { DeltaBase } from '@delta-base/server';
// DCB events use tags instead of stream IDstype CourseEvent = | { type: 'CourseDefined'; data: { courseId: string; title: string; capacity: number } } | { type: 'CourseCapacityChanged'; data: { courseId: string; newCapacity: number } } | { type: 'CourseArchived'; data: { courseId: string } };
const courseProjection = multiStreamProjection< { exists: boolean; title: string | null; capacity: number }, CourseEvent>({ projectionName: 'course-summary', store: readModelStore, canHandle: ['CourseDefined', 'CourseCapacityChanged', 'CourseArchived'],
// Derive document key from event tags (DCB pattern) getDocumentId: (event) => { const courseTag = event.tags?.find((t) => t.startsWith('course:')); return courseTag ?? `course:${event.data.courseId}`; },
initialState: () => ({ exists: false, title: null, capacity: 0 }),
evolve: (doc, event) => { switch (event.type) { case 'CourseDefined': return { exists: true, title: event.data.title, capacity: event.data.capacity }; case 'CourseCapacityChanged': return { ...doc, capacity: event.data.newCapacity }; case 'CourseArchived': return { ...doc, exists: false }; default: return doc; } },});
const eventStore = deltabase.getEventStore('courses', { projections: projections.inline([courseProjection]),});
// DCB command handlingapp.post('/courses/:id/change-capacity', async (c) => { const courseId = c.req.param('id'); const { newCapacity } = await c.req.json();
// 1. Build decision model const { state, appendCondition } = await eventStore.buildDecisionModel({ query: { items: [{ types: ['CourseDefined', 'CourseArchived', 'CourseCapacityChanged'], tags: [`course:${courseId}`], }], }, initialState: () => ({ exists: false, capacity: 0 }), evolve: (state, event) => { switch (event.type) { case 'CourseDefined': return { exists: true, capacity: event.data.capacity }; case 'CourseArchived': return { ...state, exists: false }; case 'CourseCapacityChanged': return { ...state, capacity: event.data.newCapacity }; default: return state; } }, });
// 2. Validate if (!state.exists) throw new Error('Course not found'); if (state.capacity === newCapacity) throw new Error('Capacity unchanged');
// 3. Append with condition -- projection runs automatically! await eventStore.append( [{ type: 'CourseCapacityChanged', data: { courseId, newCapacity }, tags: [`course:${courseId}`], }], appendCondition, );
// Read model is already consistent const course = await readModelStore.get(`course:${courseId}`); return c.json(course);});Catch-Up: Handling Existing Events
Section titled “Catch-Up: Handling Existing Events”When you register a projection on an event store that already has events, the projection needs to process historical events before it’s current. This happens automatically.
How It Works
Section titled “How It Works”Single-stream projections catch up per-stream. When an appendToStream happens for stream order-123:
- Check the document’s
_versionagainst the first appended event’sstreamPosition - If there’s a gap, read missing events for that stream only
- Process gap events, then the newly appended events
- Update
_version
Multi-stream projections catch up on the first append call:
- Read the cursor from the store (
__cursor:{projectionName}) - Query historical events from that position, in batches of 100
- Process each batch, advancing the cursor
- Then process the newly appended events
Imperative projections (with lastProcessedPosition) catch up on the first append call:
- Call
lastProcessedPosition()to find the starting point - Query events from
(position + 1)in batches of 100 - Call
processEventsfor each batch - The projection updates its own cursor inside
processEvents
Explicit Catch-Up
Section titled “Explicit Catch-Up”For production deployments, trigger catch-up during Worker startup to avoid slow first requests:
// In your Worker's fetch handler or startup logicconst eventStore = deltabase.getEventStore('orders', { projections: projections.inline([orderSummaryProjection]),});
// Pre-warm the projectionsawait eventStore.catchUpProjections();Error Handling
Section titled “Error Handling”Events are facts. They’re never rolled back if a projection fails.
Error Modes
Section titled “Error Modes”// Default: projection failure throws (fails the request)projections.inline([orderSummaryProjection], { onError: 'throw',});
// Log and continue: request succeeds, projection catches up next timeprojections.inline([orderSummaryProjection], { onError: 'log',});
// Custom handler: report to monitoring, etc.projections.inline([orderSummaryProjection], { onError: (error, projection, events) => { monitoring.captureException(error, { projection: projection.projectionName, eventCount: events.length, }); },});What Happens After a Failure
Section titled “What Happens After a Failure”- Events are safely persisted in the event store
- The projection’s
_version/ cursor was NOT advanced - On the next
appendToStreamcall, the projection detects the gap and catches up - No manual intervention needed
Idempotency: Why It Matters and How to Get It Right
Section titled “Idempotency: Why It Matters and How to Get It Right”In concurrent environments (e.g., multiple Cloudflare Worker isolates), the same event may be processed more than once. Inline projections handle this automatically through version tracking, but your evolve function needs to cooperate.
The Rules
Section titled “The Rules”-
evolvemust be a pure function. Given the same document and event, it must always return the same result. -
No side effects in
evolve. Don’t call APIs, write logs, or generate IDs insideevolve. All data should come from the event. -
Don’t use
Date.now()orMath.random(). Use timestamps from the event (event.createdAt) instead. -
Prefer set-semantics over increment-semantics when possible. Setting
status: 'shipped'is inherently idempotent. IncrementingorderCount += 1is not — if the same event is processed twice, the count is wrong. The framework’s_versiontracking prevents this, but defensive evolve functions make the system more robust.
// Good: Idempotent evolveevolve: (doc, event) => { switch (event.type) { case 'order.placed': return { ...doc, orderId: event.data.orderId, // set, not increment status: 'pending', // set, not toggle placedAt: event.createdAt, // from event, not Date.now() }; default: return doc; }},// Avoid: Non-deterministic evolveevolve: (doc, event) => { switch (event.type) { case 'order.placed': return { ...doc, orderId: event.data.orderId, status: 'pending', placedAt: new Date().toISOString(), // Non-deterministic! processedBy: getWorkerId(), // Side effect! }; default: return doc; }},Testing
Section titled “Testing”Inline projections work with InMemoryEventStore and InMemoryReadModelStore — no HTTP calls needed:
import { describe, it, expect, beforeEach } from 'vitest';import { InMemoryEventStore, InMemoryReadModelStore, projections,} from '@delta-base/toolkit';import { createOrderSummaryProjection } from '../projections/order-summary';import type { OrderSummaryReadModel } from '../projections/order-summary';
describe('Order Summary Projection', () => { let eventStore: InMemoryEventStore; let readModelStore: InMemoryReadModelStore;
beforeEach(() => { readModelStore = new InMemoryReadModelStore(); const projection = createOrderSummaryProjection(readModelStore);
eventStore = new InMemoryEventStore({ projections: projections.inline([projection]), }); });
it('creates order summary on order.placed', async () => { await eventStore.appendToStream('order-1', [{ type: 'order.placed', data: { orderId: 'order-1', customerId: 'cust-1', items: [{ productId: 'p1', name: 'Widget', quantity: 1, price: 9.99 }], totalAmount: 9.99, }, }]);
const summary = await readModelStore.get<OrderSummaryReadModel>( 'order-summary:order-1' );
expect(summary).toBeDefined(); expect(summary?.status).toBe('pending'); expect(summary?.totalAmount).toBe(9.99); });
it('updates status on order.shipped', async () => { await eventStore.appendToStream('order-1', [{ type: 'order.placed', data: { orderId: 'order-1', customerId: 'cust-1', items: [{ productId: 'p1', name: 'Widget', quantity: 1, price: 9.99 }], totalAmount: 9.99, }, }]);
await eventStore.appendToStream('order-1', [{ type: 'order.shipped', data: { orderId: 'order-1', trackingNumber: 'TRK-123', shippedAt: '2026-01-15T10:00:00Z', }, }]);
const summary = await readModelStore.get<OrderSummaryReadModel>( 'order-summary:order-1' );
expect(summary?.status).toBe('shipped'); expect(summary?.trackingNumber).toBe('TRK-123'); });
it('catches up on gap between projection registration and existing events', async () => { // Append events without projection const plainStore = new InMemoryEventStore(); await plainStore.appendToStream('order-1', [{ type: 'order.placed', data: { orderId: 'order-1', customerId: 'cust-1', items: [], totalAmount: 0, }, }]);
// Now create a new event store with the projection // The projection will catch up when the next append happens const projection = createOrderSummaryProjection(readModelStore); const storeWithProjection = new InMemoryEventStore({ projections: projections.inline([projection]), });
// Trigger catch-up explicitly await storeWithProjection.catchUpProjections(); });});Hybrid: Inline + Async Together
Section titled “Hybrid: Inline + Async Together”You can run critical projections inline and keep others as async webhooks. This is the recommended approach for most production systems.
// Inline: user-facing read models that need read-after-write consistencyconst eventStore = deltabase.getEventStore('orders', { projections: projections.inline([ orderSummaryProjection, customerDashboardProjection, ]),});
// Async: analytics, notifications, cross-service sync// These still use webhook subscriptions in deltabase.config.tsThe async subscriptions continue to work as before. Inline projections are additive — they don’t replace webhooks.
What’s Next?
Section titled “What’s Next?”- Inline Projections Concepts — How inline projections work under the hood
- Projections & Read Models — General projection concepts
- CQRS Implementation — End-to-end CQRS walkthrough with command handling
- DCB Unique Constraints — Using Dynamic Consistency Boundary for uniqueness checks