Testar grátis
7 min leitura Guide 854 of 877

Padrões e Implementação de Event Sourcing

Event sourcing é um padrão arquitetural poderoso que armazena todas as mudanças no estado da aplicação como uma sequência de eventos, permitindo trilhas de auditoria confiáveis, consultas temporais e reconstrução do sistema. Esta abordagem fornece consistência de dados superior e capacidades de depuração comparadas às operações CRUD tradicionais.

Conceitos Fundamentais de Event Sourcing

Arquitetura do Event Store

O event store é o componente central que persiste todos os eventos de domínio:

Estrutura do Evento:

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "eventType": "OrderPlaced",
  "aggregateId": "order-123",
  "aggregateType": "Order",
  "eventVersion": 1,
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "customerId": "cust-456",
    "items": [
      {"productId": "prod-789", "quantity": 2, "price": 29.99}
    ],
    "totalAmount": 59.98
  },
  "metadata": {
    "userId": "user-123",
    "correlationId": "req-abc-123",
    "causationId": "cmd-place-order-456"
  }
}

Operações do Event Store:

Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State

Integração com o Padrão Aggregate

Aggregates no event sourcing mantêm estado através da aplicação de eventos:

Gerenciamento de Estado do Aggregate:

class OrderAggregate {
  private state: OrderState;
  private uncommittedEvents: DomainEvent[] = [];

  apply(event: DomainEvent): void {
    this.state = this.applyEvent(this.state, event);
    this.uncommittedEvents.push(event);
  }

  private applyEvent(state: OrderState, event: DomainEvent): OrderState {
    switch (event.eventType) {
      case 'OrderPlaced':
        return { ...state, status: 'PLACED', items: event.payload.items };
      case 'OrderShipped':
        return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
      default:
        return state;
    }
  }
}

Padrões de Event Sourcing

Replay de Eventos e Snapshots

Processo de Replay de Eventos:

Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state

Estratégia de Snapshot:

interface Snapshot {
  aggregateId: string;
  version: number;
  state: AggregateState;
  timestamp: Date;
}

class SnapshotStore {
  async save(snapshot: Snapshot): Promise<void> {
    // Persist snapshot for faster loading
  }

  async load(aggregateId: string): Promise<Snapshot | null> {
    // Load most recent snapshot
  }
}

Versionamento e Evolução de Eventos

Padrão Upcasting:

interface EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean;
  upcast(event: StoredEvent): StoredEvent;
}

class OrderEventUpcaster implements EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean {
    return eventType === 'OrderPlaced' && fromVersion === 1;
  }

  upcast(event: StoredEvent): StoredEvent {
    if (event.payload.totalAmount === undefined) {
      // Calculate total from items if missing
      const total = event.payload.items.reduce(
        (sum, item) => sum + (item.price * item.quantity), 0
      );
      return {
        ...event,
        payload: { ...event.payload, totalAmount: total }
      };
    }
    return event;
  }
}

Integração com CQRS

Responsabilidade de Comando e Consulta Separada:

Write Side (Commands)          Read Side (Queries)
    │                               │
    ▼                               ▼
Event Store ──► Project Events ──► Read Models
    ▲                               │
    └───────────────────────────────┘
            Eventual Consistency

Projeção do Read Model:

class OrderReadModel {
  private orders: Map<string, OrderView> = new Map();

  project(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this.orders.set(event.aggregateId, {
          id: event.aggregateId,
          customerId: event.payload.customerId,
          status: 'PLACED',
          totalAmount: event.payload.totalAmount,
          placedAt: event.timestamp
        });
        break;
      case 'OrderShipped':
        const order = this.orders.get(event.aggregateId);
        if (order) {
          order.status = 'SHIPPED';
          order.shippedAt = event.timestamp;
        }
        break;
    }
  }
}

Gerenciamento de Consistência Eventual

Padrão Saga para Transações Distribuídas

Saga Orquestrada:

Order Service ──► Payment Service ──► Inventory Service
     │                    │                    │
     └─ OrderPlaced ──────┼─ PaymentProcessed ─┘
                          │
                          └─ PaymentFailed (compensate)

Saga Coreografada:

Order Service ──► OrderPlaced Event ──► Payment Service
                                                │
                                                ▼
                                     PaymentProcessed Event ──► Inventory Service

Padrões de Consistência Eventual

Read Repair:

Client Request ──► Stale Data Detected ──► Background Update
     │                                         │
     └─────────────────────────────────────────┘
                  Return current data

Write-Ahead Logging:

Write Operation ──► Event Store ──► Background Projection
     │                      │               │
     └──────────────────────┼───────────────┘
            Immediate ACK       Eventual consistency

Otimização de Desempenho

Indexação do Event Store

Estratégias de Indexação:

  • Índice de Aggregate ID: Pesquisa rápida de eventos para aggregates específicos
  • Índice de Event Type: Consulta de eventos por tipo em aggregates
  • Índice de Timestamp: Consultas de eventos baseadas em tempo
  • Índice de Correlation ID: Rastreamento de solicitações entre serviços

Índices Compostos:

-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);

-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);

Estratégias de Cache

Cache de Eventos:

Event Store ──► Event Cache ──► Application
     ▲              │
     └──────────────┘
     Cache Invalidation

Cache de Snapshot:

Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
                        │
                        └─► Cache Miss → Load from Store

Tratamento de Erros e Recuperação

Confiabilidade do Processamento de Eventos

Manipuladores de Eventos Idempotentes:

class IdempotentEventHandler {
  private processedEvents: Set<string> = new Set();

  async handle(event: DomainEvent): Promise<void> {
    if (this.processedEvents.has(event.eventId)) {
      return; // Already processed
    }

    try {
      await this.processEvent(event);
      this.processedEvents.add(event.eventId);
    } catch (error) {
      // Log error and retry logic
      throw error;
    }
  }
}

Padrão Dead Letter Queue

Tratamento de Eventos com Falha:

Event Processor ──► Success ──► Continue
     │
     └─► Failure ──► Dead Letter Queue ──► Manual Review

Teste de Sistemas Event-Sourced

Teste Baseado em Eventos

Padrão Given-When-Then:

describe('Order Aggregate', () => {
  it('should handle order placement', () => {
    // Given
    const order = new OrderAggregate();

    // When
    order.placeOrder({
      customerId: 'cust-123',
      items: [{ productId: 'prod-456', quantity: 1 }]
    });

    // Then
    expect(order.getUncommittedEvents()).toContainEqual(
      expect.objectContaining({
        eventType: 'OrderPlaced',
        payload: expect.objectContaining({
          customerId: 'cust-123'
        })
      })
    );
  });
});

Teste de Event Stream

Verificação de Event Stream:

it('should maintain event order', async () => {
  const eventStore = new InMemoryEventStore();

  // Append events
  await eventStore.append([
    { eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
    { eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
    { eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
  ]);

  // Verify order
  const events = await eventStore.getEvents('order-1');
  expect(events.map(e => e.eventType)).toEqual([
    'Created', 'Updated', 'Shipped'
  ]);
});

Monitoramento e Observabilidade

Métricas de Processamento de Eventos

Métricas Principais a Rastrear:

  • Latência de processamento de eventos
  • Taxa de processamento de eventos
  • Taxas de erro por tipo de evento
  • Tempos de reconstrução de aggregate
  • Frequência de snapshots e tamanho

Painel de Monitoramento:

Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day

Anti-Padrões e Armadilhas

Erros Comuns

Eventos Mutáveis:

❌ Modifying event payload after creation
✅ Immutable events with new versions

Payloads de Eventos Grandes:

❌ Storing entire objects in events
✅ Reference IDs with separate data storage

Processamento Síncrono de Eventos:

❌ Blocking on event processing
✅ Asynchronous event handling

Integração com GitScrum

Gerenciamento de Tarefas Orientado a Eventos

Event Sourcing para Tarefas:

  • Rastrear todas as mudanças de estado de tarefas como eventos
  • Permitir reconstrução do histórico de tarefas
  • Suporte a consultas temporais para análises de projeto

Implementação de Trilha de Auditoria:

  • Histórico completo de modificações de tarefas
  • Atribuição de usuário para todas as mudanças
  • Relatórios de conformidade

Automação de Workflow

Workflows Orientados a Eventos:

Task Created ──► Assign Reviewer ──► Code Review ──► Merge
     │                │                     │          │
     └────────────────┼─────────────────────┼──────────┘
              Event Bus for Coordination

Soluções Relacionadas