Patrones e Implementación de Event Sourcing
Event sourcing es un patrón arquitectónico poderoso que almacena todos los cambios en el estado de la aplicación como una secuencia de eventos, permitiendo pistas de auditoría confiables, consultas temporales y reconstrucción del sistema. Este enfoque proporciona consistencia de datos superior y capacidades de depuración comparadas con las operaciones CRUD tradicionales.
Conceptos Fundamentales de Event Sourcing
Arquitectura del Event Store
El event store es el componente central que persiste todos los eventos de dominio:
Estructura del Evento:
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "OrderPlaced",
"aggregateId": "order-123",
"aggregateType": "Order",
"eventVersion": 1,
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"customerId": "cust-456",
"items": [
{"productId": "prod-789", "quantity": 2, "price": 29.99}
],
"totalAmount": 59.98
},
"metadata": {
"userId": "user-123",
"correlationId": "req-abc-123",
"causationId": "cmd-place-order-456"
}
}
Operaciones del Event Store:
Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State
Integración con el Patrón Aggregate
Aggregates en event sourcing mantienen estado a través de la aplicación de eventos:
Gestión de Estado del Aggregate:
class OrderAggregate {
private state: OrderState;
private uncommittedEvents: DomainEvent[] = [];
apply(event: DomainEvent): void {
this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}
private applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderPlaced':
return { ...state, status: 'PLACED', items: event.payload.items };
case 'OrderShipped':
return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
default:
return state;
}
}
}
Patrones de Event Sourcing
Replay de Eventos y Snapshots
Proceso de Replay de Eventos:
Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state
Estrategia de Snapshot:
interface Snapshot {
aggregateId: string;
version: number;
state: AggregateState;
timestamp: Date;
}
class SnapshotStore {
async save(snapshot: Snapshot): Promise<void> {
// Persist snapshot for faster loading
}
async load(aggregateId: string): Promise<Snapshot | null> {
// Load most recent snapshot
}
}
Versionado y Evolución de Eventos
Patrón Upcasting:
interface EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean;
upcast(event: StoredEvent): StoredEvent;
}
class OrderEventUpcaster implements EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean {
return eventType === 'OrderPlaced' && fromVersion === 1;
}
upcast(event: StoredEvent): StoredEvent {
if (event.payload.totalAmount === undefined) {
// Calculate total from items if missing
const total = event.payload.items.reduce(
(sum, item) => sum + (item.price * item.quantity), 0
);
return {
...event,
payload: { ...event.payload, totalAmount: total }
};
}
return event;
}
}
Integración con CQRS
Separación de Responsabilidades de Comando y Consulta:
Write Side (Commands) Read Side (Queries)
│ │
▼ ▼
Event Store ──► Project Events ──► Read Models
▲ │
└───────────────────────────────┘
Eventual Consistency
Proyección del Read Model:
class OrderReadModel {
private orders: Map<string, OrderView> = new Map();
project(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this.orders.set(event.aggregateId, {
id: event.aggregateId,
customerId: event.payload.customerId,
status: 'PLACED',
totalAmount: event.payload.totalAmount,
placedAt: event.timestamp
});
break;
case 'OrderShipped':
const order = this.orders.get(event.aggregateId);
if (order) {
order.status = 'SHIPPED';
order.shippedAt = event.timestamp;
}
break;
}
}
}
Gestión de Consistencia Eventual
Patrón Saga para Transacciones Distribuidas
Saga Orquestada:
Order Service ──► Payment Service ──► Inventory Service
│ │ │
└─ OrderPlaced ──────┼─ PaymentProcessed ─┘
│
└─ PaymentFailed (compensate)
Saga Coreografiada:
Order Service ──► OrderPlaced Event ──► Payment Service
│
▼
PaymentProcessed Event ──► Inventory Service
Patrones de Consistencia Eventual
Read Repair:
Client Request ──► Stale Data Detected ──► Background Update
│ │
└─────────────────────────────────────────┘
Return current data
Write-Ahead Logging:
Write Operation ──► Event Store ──► Background Projection
│ │ │
└──────────────────────┼───────────────┘
Immediate ACK Eventual consistency
Optimización de Rendimiento
Indexación del Event Store
Estrategias de Indexación:
- Índice de Aggregate ID: Búsqueda rápida de eventos para aggregates específicos
- Índice de Event Type: Consulta de eventos por tipo en aggregates
- Índice de Timestamp: Consultas de eventos basadas en tiempo
- Índice de Correlation ID: Seguimiento de solicitudes entre servicios
Índices Compuestos:
-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);
-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);
Estrategias de Cache
Cache de Eventos:
Event Store ──► Event Cache ──► Application
▲ │
└──────────────┘
Cache Invalidation
Cache de Snapshot:
Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
│
└─► Cache Miss → Load from Store
Manejo de Errores y Recuperación
Confiabilidad del Procesamiento de Eventos
Manejadores de Eventos Idempotentes:
class IdempotentEventHandler {
private processedEvents: Set<string> = new Set();
async handle(event: DomainEvent): Promise<void> {
if (this.processedEvents.has(event.eventId)) {
return; // Already processed
}
try {
await this.processEvent(event);
this.processedEvents.add(event.eventId);
} catch (error) {
// Log error and retry logic
throw error;
}
}
}
Patrón Dead Letter Queue
Manejo de Eventos con Fallo:
Event Processor ──► Success ──► Continue
│
└─► Failure ──► Dead Letter Queue ──► Manual Review
Pruebas de Sistemas Event-Sourced
Pruebas Basadas en Eventos
Patrón Given-When-Then:
describe('Order Aggregate', () => {
it('should handle order placement', () => {
// Given
const order = new OrderAggregate();
// When
order.placeOrder({
customerId: 'cust-123',
items: [{ productId: 'prod-456', quantity: 1 }]
});
// Then
expect(order.getUncommittedEvents()).toContainEqual(
expect.objectContaining({
eventType: 'OrderPlaced',
payload: expect.objectContaining({
customerId: 'cust-123'
})
})
);
});
});
Prueba de Event Stream
Verificación de Event Stream:
it('should maintain event order', async () => {
const eventStore = new InMemoryEventStore();
// Append events
await eventStore.append([
{ eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
{ eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
{ eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
]);
// Verify order
const events = await eventStore.getEvents('order-1');
expect(events.map(e => e.eventType)).toEqual([
'Created', 'Updated', 'Shipped'
]);
});
Monitoreo y Observabilidad
Métricas de Procesamiento de Eventos
Métricas Principales a Rastrear:
- Latencia de procesamiento de eventos
- Tasa de procesamiento de eventos
- Tasas de error por tipo de evento
- Tiempos de reconstrucción de aggregate
- Frecuencia de snapshots y tamaño
Panel de Monitoreo:
Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day
Anti-Patrones y Trampas
Errores Comunes
Eventos Mutables:
❌ Modifying event payload after creation
✅ Immutable events with new versions
Payloads de Eventos Grandes:
❌ Storing entire objects in events
✅ Reference IDs with separate data storage
Procesamiento Síncrono de Eventos:
❌ Blocking on event processing
✅ Asynchronous event handling
Integración con GitScrum
Gestión de Tareas Orientada a Eventos
Event Sourcing para Tareas:
- Rastrear todos los cambios de estado de tareas como eventos
- Permitir reconstrucción del historial de tareas
- Soporte a consultas temporales para análisis de proyecto
Implementación de Pista de Auditoría:
- Historial completo de modificaciones de tareas
- Atribución de usuario para todos los cambios
- Reportes de cumplimiento
Automatización de Workflow
Workflows Orientados a Eventos:
Task Created ──► Assign Reviewer ──► Code Review ──► Merge
│ │ │ │
└────────────────┼─────────────────────┼──────────┘
Event Bus for Coordination