Skip to content

EventStore

API reference for EventStore

@delta-base/server


Implementation of the EventStore interface for DeltaBase

  • EventStore

new EventStore(http, eventStoreId): EventStore

Creates a new EventStore client instance

HttpClient

The HTTP client to use for API requests

string

The ID of the event store to interact with

EventStore

aggregateStream<State, EventType>(streamId, options): Promise<AggregateStreamResult<State>>

Aggregate events from a stream and compute a state

State

EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }>

string

The ID of the stream to aggregate events from

AggregateStreamOptions<State, EventType>

Configuration options for the aggregation process

Promise<AggregateStreamResult<State>>

Promise resolving to the aggregation result with the computed state and stream metadata

// Define your state type and event types
type UserState = { email: string, isVerified: boolean };
type UserEvent =
| { type: 'user.created', data: { email: string } }
| { type: 'user.verified', data: { verifiedAt: string } };
// Aggregate the stream into a state
const result = await eventStore.aggregateStream<UserState, UserEvent>(
'user-123',
{
initialState: () => ({ email: '', isVerified: false }),
evolve: (state, event) => {
switch (event.type) {
case 'user.created':
return { ...state, email: event.data.email };
case 'user.verified':
return { ...state, isVerified: true };
default:
return state;
}
},
read: { from: 0 }
}
);

EventStoreInterface.aggregateStream


appendToStream<EventType>(streamId, events, options?): Promise<AppendToStreamResult>

Append events to a stream

EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }>

string

The ID of the stream to append events to

EventType[]

Array of events to append to the stream

AppendToStreamOptions

Optional parameters for the append operation

Promise<AppendToStreamResult>

Promise resolving to the append result with the next expected version

When expectedStreamVersion doesn’t match current stream version

When request validation fails

When the event store doesn’t exist

When authentication fails

import {
isVersionConflictError,
isValidationError
} from '@delta-base/server';
try {
// Append with optimistic concurrency control
await eventStore.appendToStream(
'user-123',
[{
type: 'user.updated',
data: { email: 'updated@example.com' }
}],
{ expectedStreamVersion: 0n }
);
} catch (error) {
if (isVersionConflictError(error)) {
console.log(`Version conflict: expected ${error.expectedVersion}, got ${error.currentVersion}`);
// Handle concurrency conflict
} else if (isValidationError(error)) {
console.log('Validation errors:', error.validationErrors);
// Handle validation failures
} else {
throw error; // Re-throw unknown errors
}
}

EventStoreInterface.appendToStream


listStreams(options?): Promise<{ streams: string[]; total: number; }>

Get a list of stream IDs in an event store

Optional parameters for listing streams

number

Maximum number of stream IDs to return

number

Number of stream IDs to skip

string

Pattern to match stream IDs (e.g., ‘user-*‘)

Promise<{ streams: string[]; total: number; }>

Promise resolving to an object containing stream IDs and total count

// List all streams
const { streams, total } = await eventStore.listStreams();
// List streams with pagination
const { streams, total } = await eventStore.listStreams({
limit: 50,
offset: 100
});
// List streams matching a pattern
const { streams, total } = await eventStore.listStreams({
pattern: 'user-*'
});

queryEvents<EventType>(options): Promise<QueryEventsResult<EventType>>

Query events with flexible filtering options

EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }> = Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }>

QueryEventsOptions = {}

Query parameters for filtering events

Promise<QueryEventsResult<EventType>>

Promise resolving to the query result with events and pagination info

// Query all events of a specific type
const result = await eventStore.queryEvents({
type: 'user.created'
});
// Query events with pagination
const result = await eventStore.queryEvents({
limit: 20,
offset: 40,
includeCount: true
});
// Query events within a time range
const result = await eventStore.queryEvents({
fromDate: '2023-01-01T00:00:00Z',
toDate: '2023-01-31T23:59:59Z'
});

EventStoreInterface.queryEvents


queryStreamEvents<EventType>(streamId, options): Promise<QueryEventsResult<EventType>>

Query events for a specific stream with filtering options

EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }> = Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }>

string

The ID of the stream to query events from

Omit<QueryEventsOptions, "streamId"> = {}

Query parameters for filtering events

Promise<QueryEventsResult<EventType>>

Promise resolving to the query result with events and pagination info

// Query events for a specific stream
const result = await eventStore.queryStreamEvents('user-123', {
type: 'user.updated'
});

queryStreams(options): Promise<QueryStreamsResult>

Query streams with filtering options

QueryStreamsOptions = {}

Query parameters for filtering streams

Promise<QueryStreamsResult>

Promise resolving to the query result with streams and pagination info

// Query all streams
const result = await eventStore.queryStreams();
// Query streams by type
const result = await eventStore.queryStreams({
streamType: 'user'
});
// Query streams with a pattern match
const result = await eventStore.queryStreams({
streamIdPattern: 'user-%'
});

queryStreamsByType(streamType, options): Promise<QueryStreamsResult>

Query streams of a specific type with filtering options

string

The stream type to filter by

Omit<QueryStreamsOptions, "streamType"> = {}

Query parameters for filtering streams

Promise<QueryStreamsResult>

Promise resolving to the query result with streams and pagination info

// Query all user streams
const result = await eventStore.queryStreamsByType('user');
// Query user streams with pagination
const result = await eventStore.queryStreamsByType('user', {
limit: 20,
offset: 0
});

readStream<EventType>(streamId, options?): Promise<ReadStreamResult<EventType>>

Read events from a stream

EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; type: string; }>

string

The ID of the stream to read events from

ReadStreamOptions

The options for reading events

Promise<ReadStreamResult<EventType>>

Promise resolving to the read result containing events and stream metadata

// Read all events from a stream
const result = await eventStore.readStream('user-123');
// Read events with a specific starting position
const result = await eventStore.readStream('user-123', { from: 5 });
// Read a specific range of events
const result = await eventStore.readStream('user-123', { from: 5, to: 10 });
// Read a limited number of events
const result = await eventStore.readStream('user-123', { maxCount: 100 });

EventStoreInterface.readStream