Event Sourcing Patterns and Implementation
Event sourcing is a powerful architectural pattern that stores all changes to application state as a sequence of events, enabling reliable audit trails, temporal queries, and system reconstruction. This approach provides superior data consistency and debugging capabilities compared to traditional CRUD operations.
Core Event Sourcing Concepts
Event Store Architecture
The event store is the central component that persists all domain events:
Event Structure:
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "OrderPlaced",
"aggregateId": "order-123",
"aggregateType": "Order",
"eventVersion": 1,
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"customerId": "cust-456",
"items": [
{"productId": "prod-789", "quantity": 2, "price": 29.99}
],
"totalAmount": 59.98
},
"metadata": {
"userId": "user-123",
"correlationId": "req-abc-123",
"causationId": "cmd-place-order-456"
}
}
Event Store Operations:
Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State
Aggregate Pattern Integration
Aggregates in event sourcing maintain state through event application:
Aggregate State Management:
class OrderAggregate {
private state: OrderState;
private uncommittedEvents: DomainEvent[] = [];
apply(event: DomainEvent): void {
this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}
private applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderPlaced':
return { ...state, status: 'PLACED', items: event.payload.items };
case 'OrderShipped':
return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
default:
return state;
}
}
}
Event Sourcing Patterns
Event Replay and Snapshots
Event Replay Process:
Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state
Snapshot Strategy:
interface Snapshot {
aggregateId: string;
version: number;
state: AggregateState;
timestamp: Date;
}
class SnapshotStore {
async save(snapshot: Snapshot): Promise<void> {
// Persist snapshot for faster loading
}
async load(aggregateId: string): Promise<Snapshot | null> {
// Load most recent snapshot
}
}
Event Versioning and Schema Evolution
Upcasting Pattern:
interface EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean;
upcast(event: StoredEvent): StoredEvent;
}
class OrderEventUpcaster implements EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean {
return eventType === 'OrderPlaced' && fromVersion === 1;
}
upcast(event: StoredEvent): StoredEvent {
if (event.payload.totalAmount === undefined) {
// Calculate total from items if missing
const total = event.payload.items.reduce(
(sum, item) => sum + (item.price * item.quantity), 0
);
return {
...event,
payload: { ...event.payload, totalAmount: total }
};
}
return event;
}
}
CQRS Integration
Command Query Responsibility Segregation:
Write Side (Commands) Read Side (Queries)
│ │
▼ ▼
Event Store ──► Project Events ──► Read Models
▲ │
└───────────────────────────────┘
Eventual Consistency
Read Model Projection:
class OrderReadModel {
private orders: Map<string, OrderView> = new Map();
project(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this.orders.set(event.aggregateId, {
id: event.aggregateId,
customerId: event.payload.customerId,
status: 'PLACED',
totalAmount: event.payload.totalAmount,
placedAt: event.timestamp
});
break;
case 'OrderShipped':
const order = this.orders.get(event.aggregateId);
if (order) {
order.status = 'SHIPPED';
order.shippedAt = event.timestamp;
}
break;
}
}
}
Eventual Consistency Management
Saga Pattern for Distributed Transactions
Orchestration-Based Saga:
Order Service ──► Payment Service ──► Inventory Service
│ │ │
└─ OrderPlaced ──────┼─ PaymentProcessed ─┘
│
└─ PaymentFailed (compensate)
Choreography-Based Saga:
Order Service ──► OrderPlaced Event ──► Payment Service
│
▼
PaymentProcessed Event ──► Inventory Service
Eventual Consistency Patterns
Read Repair:
Client Request ──► Stale Data Detected ──► Background Update
│ │
└─────────────────────────────────────────┘
Return current data
Write-Ahead Logging:
Write Operation ──► Event Store ──► Background Projection
│ │ │
└──────────────────────┼───────────────┘
Immediate ACK Eventual consistency
Performance Optimization
Event Store Indexing
Indexing Strategies:
- Aggregate ID Index: Fast lookup of events for specific aggregates
- Event Type Index: Query events by type across aggregates
- Timestamp Index: Time-based event queries
- Correlation ID Index: Trace requests across services
Composite Indexes:
-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);
-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);
Caching Strategies
Event Cache:
Event Store ──► Event Cache ──► Application
▲ │
└──────────────┘
Cache Invalidation
Snapshot Cache:
Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
│
└─► Cache Miss → Load from Store
Error Handling and Recovery
Event Processing Reliability
Idempotent Event Handlers:
class IdempotentEventHandler {
private processedEvents: Set<string> = new Set();
async handle(event: DomainEvent): Promise<void> {
if (this.processedEvents.has(event.eventId)) {
return; // Already processed
}
try {
await this.processEvent(event);
this.processedEvents.add(event.eventId);
} catch (error) {
// Log error and retry logic
throw error;
}
}
}
Dead Letter Queue Pattern
Failed Event Handling:
Event Processor ──► Success ──► Continue
│
└─► Failure ──► Dead Letter Queue ──► Manual Review
Testing Event-Sourced Systems
Event-Based Testing
Given-When-Then Pattern:
describe('Order Aggregate', () => {
it('should handle order placement', () => {
// Given
const order = new OrderAggregate();
// When
order.placeOrder({
customerId: 'cust-123',
items: [{ productId: 'prod-456', quantity: 1 }]
});
// Then
expect(order.getUncommittedEvents()).toContainEqual(
expect.objectContaining({
eventType: 'OrderPlaced',
payload: expect.objectContaining({
customerId: 'cust-123'
})
})
);
});
});
Event Stream Testing
Event Stream Verification:
it('should maintain event order', async () => {
const eventStore = new InMemoryEventStore();
// Append events
await eventStore.append([
{ eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
{ eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
{ eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
]);
// Verify order
const events = await eventStore.getEvents('order-1');
expect(events.map(e => e.eventType)).toEqual([
'Created', 'Updated', 'Shipped'
]);
});
Monitoring and Observability
Event Processing Metrics
Key Metrics to Track:
- Event processing latency
- Event processing throughput
- Error rates by event type
- Aggregate rebuild times
- Snapshot frequency and size
Monitoring Dashboard:
Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day
Anti-Patterns and Pitfalls
Common Mistakes
Mutable Events:
❌ Modifying event payload after creation
✅ Immutable events with new versions
Large Event Payloads:
❌ Storing entire objects in events
✅ Reference IDs with separate data storage
Synchronous Event Processing:
❌ Blocking on event processing
✅ Asynchronous event handling
GitScrum Integration
Event-Driven Task Management
Event Sourcing for Tasks:
- Track all task state changes as events
- Enable task history reconstruction
- Support temporal queries for project analytics
Audit Trail Implementation:
- Complete history of task modifications
- User attribution for all changes
- Compliance reporting capabilities
Workflow Automation
Event-Driven Workflows:
Task Created ──► Assign Reviewer ──► Code Review ──► Merge
│ │ │ │
└────────────────┼─────────────────────┼──────────┘
Event Bus for Coordination