Padrões e Implementação de Event Sourcing
Event sourcing é um padrão arquitetural poderoso que armazena todas as mudanças no estado da aplicação como uma sequência de eventos, permitindo trilhas de auditoria confiáveis, consultas temporais e reconstrução do sistema. Esta abordagem fornece consistência de dados superior e capacidades de depuração comparadas às operações CRUD tradicionais.
Conceitos Fundamentais de Event Sourcing
Arquitetura do Event Store
O event store é o componente central que persiste todos os eventos de domínio:
Estrutura do Evento:
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "OrderPlaced",
"aggregateId": "order-123",
"aggregateType": "Order",
"eventVersion": 1,
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"customerId": "cust-456",
"items": [
{"productId": "prod-789", "quantity": 2, "price": 29.99}
],
"totalAmount": 59.98
},
"metadata": {
"userId": "user-123",
"correlationId": "req-abc-123",
"causationId": "cmd-place-order-456"
}
}
Operações do Event Store:
Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State
Integração com o Padrão Aggregate
Aggregates no event sourcing mantêm estado através da aplicação de eventos:
Gerenciamento de Estado do Aggregate:
class OrderAggregate {
private state: OrderState;
private uncommittedEvents: DomainEvent[] = [];
apply(event: DomainEvent): void {
this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}
private applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderPlaced':
return { ...state, status: 'PLACED', items: event.payload.items };
case 'OrderShipped':
return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
default:
return state;
}
}
}
Padrões de Event Sourcing
Replay de Eventos e Snapshots
Processo de Replay de Eventos:
Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state
Estratégia de Snapshot:
interface Snapshot {
aggregateId: string;
version: number;
state: AggregateState;
timestamp: Date;
}
class SnapshotStore {
async save(snapshot: Snapshot): Promise<void> {
// Persist snapshot for faster loading
}
async load(aggregateId: string): Promise<Snapshot | null> {
// Load most recent snapshot
}
}
Versionamento e Evolução de Eventos
Padrão Upcasting:
interface EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean;
upcast(event: StoredEvent): StoredEvent;
}
class OrderEventUpcaster implements EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean {
return eventType === 'OrderPlaced' && fromVersion === 1;
}
upcast(event: StoredEvent): StoredEvent {
if (event.payload.totalAmount === undefined) {
// Calculate total from items if missing
const total = event.payload.items.reduce(
(sum, item) => sum + (item.price * item.quantity), 0
);
return {
...event,
payload: { ...event.payload, totalAmount: total }
};
}
return event;
}
}
Integração com CQRS
Responsabilidade de Comando e Consulta Separada:
Write Side (Commands) Read Side (Queries)
│ │
▼ ▼
Event Store ──► Project Events ──► Read Models
▲ │
└───────────────────────────────┘
Eventual Consistency
Projeção do Read Model:
class OrderReadModel {
private orders: Map<string, OrderView> = new Map();
project(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this.orders.set(event.aggregateId, {
id: event.aggregateId,
customerId: event.payload.customerId,
status: 'PLACED',
totalAmount: event.payload.totalAmount,
placedAt: event.timestamp
});
break;
case 'OrderShipped':
const order = this.orders.get(event.aggregateId);
if (order) {
order.status = 'SHIPPED';
order.shippedAt = event.timestamp;
}
break;
}
}
}
Gerenciamento de Consistência Eventual
Padrão Saga para Transações Distribuídas
Saga Orquestrada:
Order Service ──► Payment Service ──► Inventory Service
│ │ │
└─ OrderPlaced ──────┼─ PaymentProcessed ─┘
│
└─ PaymentFailed (compensate)
Saga Coreografada:
Order Service ──► OrderPlaced Event ──► Payment Service
│
▼
PaymentProcessed Event ──► Inventory Service
Padrões de Consistência Eventual
Read Repair:
Client Request ──► Stale Data Detected ──► Background Update
│ │
└─────────────────────────────────────────┘
Return current data
Write-Ahead Logging:
Write Operation ──► Event Store ──► Background Projection
│ │ │
└──────────────────────┼───────────────┘
Immediate ACK Eventual consistency
Otimização de Desempenho
Indexação do Event Store
Estratégias de Indexação:
- Índice de Aggregate ID: Pesquisa rápida de eventos para aggregates específicos
- Índice de Event Type: Consulta de eventos por tipo em aggregates
- Índice de Timestamp: Consultas de eventos baseadas em tempo
- Índice de Correlation ID: Rastreamento de solicitações entre serviços
Índices Compostos:
-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);
-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);
Estratégias de Cache
Cache de Eventos:
Event Store ──► Event Cache ──► Application
▲ │
└──────────────┘
Cache Invalidation
Cache de Snapshot:
Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
│
└─► Cache Miss → Load from Store
Tratamento de Erros e Recuperação
Confiabilidade do Processamento de Eventos
Manipuladores de Eventos Idempotentes:
class IdempotentEventHandler {
private processedEvents: Set<string> = new Set();
async handle(event: DomainEvent): Promise<void> {
if (this.processedEvents.has(event.eventId)) {
return; // Already processed
}
try {
await this.processEvent(event);
this.processedEvents.add(event.eventId);
} catch (error) {
// Log error and retry logic
throw error;
}
}
}
Padrão Dead Letter Queue
Tratamento de Eventos com Falha:
Event Processor ──► Success ──► Continue
│
└─► Failure ──► Dead Letter Queue ──► Manual Review
Teste de Sistemas Event-Sourced
Teste Baseado em Eventos
Padrão Given-When-Then:
describe('Order Aggregate', () => {
it('should handle order placement', () => {
// Given
const order = new OrderAggregate();
// When
order.placeOrder({
customerId: 'cust-123',
items: [{ productId: 'prod-456', quantity: 1 }]
});
// Then
expect(order.getUncommittedEvents()).toContainEqual(
expect.objectContaining({
eventType: 'OrderPlaced',
payload: expect.objectContaining({
customerId: 'cust-123'
})
})
);
});
});
Teste de Event Stream
Verificação de Event Stream:
it('should maintain event order', async () => {
const eventStore = new InMemoryEventStore();
// Append events
await eventStore.append([
{ eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
{ eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
{ eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
]);
// Verify order
const events = await eventStore.getEvents('order-1');
expect(events.map(e => e.eventType)).toEqual([
'Created', 'Updated', 'Shipped'
]);
});
Monitoramento e Observabilidade
Métricas de Processamento de Eventos
Métricas Principais a Rastrear:
- Latência de processamento de eventos
- Taxa de processamento de eventos
- Taxas de erro por tipo de evento
- Tempos de reconstrução de aggregate
- Frequência de snapshots e tamanho
Painel de Monitoramento:
Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day
Anti-Padrões e Armadilhas
Erros Comuns
Eventos Mutáveis:
❌ Modifying event payload after creation
✅ Immutable events with new versions
Payloads de Eventos Grandes:
❌ Storing entire objects in events
✅ Reference IDs with separate data storage
Processamento Síncrono de Eventos:
❌ Blocking on event processing
✅ Asynchronous event handling
Integração com GitScrum
Gerenciamento de Tarefas Orientado a Eventos
Event Sourcing para Tarefas:
- Rastrear todas as mudanças de estado de tarefas como eventos
- Permitir reconstrução do histórico de tarefas
- Suporte a consultas temporais para análises de projeto
Implementação de Trilha de Auditoria:
- Histórico completo de modificações de tarefas
- Atribuição de usuário para todas as mudanças
- Relatórios de conformidade
Automação de Workflow
Workflows Orientados a Eventos:
Task Created ──► Assign Reviewer ──► Code Review ──► Merge
│ │ │ │
└────────────────┼─────────────────────┼──────────┘
Event Bus for Coordination