Probar gratis
7 min lectura Guide 854 of 877

Patrones e Implementación de Event Sourcing

Event sourcing es un patrón arquitectónico poderoso que almacena todos los cambios en el estado de la aplicación como una secuencia de eventos, permitiendo pistas de auditoría confiables, consultas temporales y reconstrucción del sistema. Este enfoque proporciona consistencia de datos superior y capacidades de depuración comparadas con las operaciones CRUD tradicionales.

Conceptos Fundamentales de Event Sourcing

Arquitectura del Event Store

El event store es el componente central que persiste todos los eventos de dominio:

Estructura del Evento:

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "eventType": "OrderPlaced",
  "aggregateId": "order-123",
  "aggregateType": "Order",
  "eventVersion": 1,
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "customerId": "cust-456",
    "items": [
      {"productId": "prod-789", "quantity": 2, "price": 29.99}
    ],
    "totalAmount": 59.98
  },
  "metadata": {
    "userId": "user-123",
    "correlationId": "req-abc-123",
    "causationId": "cmd-place-order-456"
  }
}

Operaciones del Event Store:

Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State

Integración con el Patrón Aggregate

Aggregates en event sourcing mantienen estado a través de la aplicación de eventos:

Gestión de Estado del Aggregate:

class OrderAggregate {
  private state: OrderState;
  private uncommittedEvents: DomainEvent[] = [];

  apply(event: DomainEvent): void {
    this.state = this.applyEvent(this.state, event);
    this.uncommittedEvents.push(event);
  }

  private applyEvent(state: OrderState, event: DomainEvent): OrderState {
    switch (event.eventType) {
      case 'OrderPlaced':
        return { ...state, status: 'PLACED', items: event.payload.items };
      case 'OrderShipped':
        return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
      default:
        return state;
    }
  }
}

Patrones de Event Sourcing

Replay de Eventos y Snapshots

Proceso de Replay de Eventos:

Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state

Estrategia de Snapshot:

interface Snapshot {
  aggregateId: string;
  version: number;
  state: AggregateState;
  timestamp: Date;
}

class SnapshotStore {
  async save(snapshot: Snapshot): Promise<void> {
    // Persist snapshot for faster loading
  }

  async load(aggregateId: string): Promise<Snapshot | null> {
    // Load most recent snapshot
  }
}

Versionado y Evolución de Eventos

Patrón Upcasting:

interface EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean;
  upcast(event: StoredEvent): StoredEvent;
}

class OrderEventUpcaster implements EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean {
    return eventType === 'OrderPlaced' && fromVersion === 1;
  }

  upcast(event: StoredEvent): StoredEvent {
    if (event.payload.totalAmount === undefined) {
      // Calculate total from items if missing
      const total = event.payload.items.reduce(
        (sum, item) => sum + (item.price * item.quantity), 0
      );
      return {
        ...event,
        payload: { ...event.payload, totalAmount: total }
      };
    }
    return event;
  }
}

Integración con CQRS

Separación de Responsabilidades de Comando y Consulta:

Write Side (Commands)          Read Side (Queries)
    │                               │
    ▼                               ▼
Event Store ──► Project Events ──► Read Models
    ▲                               │
    └───────────────────────────────┘
            Eventual Consistency

Proyección del Read Model:

class OrderReadModel {
  private orders: Map<string, OrderView> = new Map();

  project(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this.orders.set(event.aggregateId, {
          id: event.aggregateId,
          customerId: event.payload.customerId,
          status: 'PLACED',
          totalAmount: event.payload.totalAmount,
          placedAt: event.timestamp
        });
        break;
      case 'OrderShipped':
        const order = this.orders.get(event.aggregateId);
        if (order) {
          order.status = 'SHIPPED';
          order.shippedAt = event.timestamp;
        }
        break;
    }
  }
}

Gestión de Consistencia Eventual

Patrón Saga para Transacciones Distribuidas

Saga Orquestada:

Order Service ──► Payment Service ──► Inventory Service
     │                    │                    │
     └─ OrderPlaced ──────┼─ PaymentProcessed ─┘
                          │
                          └─ PaymentFailed (compensate)

Saga Coreografiada:

Order Service ──► OrderPlaced Event ──► Payment Service
                                                │
                                                ▼
                                     PaymentProcessed Event ──► Inventory Service

Patrones de Consistencia Eventual

Read Repair:

Client Request ──► Stale Data Detected ──► Background Update
     │                                         │
     └─────────────────────────────────────────┘
                  Return current data

Write-Ahead Logging:

Write Operation ──► Event Store ──► Background Projection
     │                      │               │
     └──────────────────────┼───────────────┘
            Immediate ACK       Eventual consistency

Optimización de Rendimiento

Indexación del Event Store

Estrategias de Indexación:

  • Índice de Aggregate ID: Búsqueda rápida de eventos para aggregates específicos
  • Índice de Event Type: Consulta de eventos por tipo en aggregates
  • Índice de Timestamp: Consultas de eventos basadas en tiempo
  • Índice de Correlation ID: Seguimiento de solicitudes entre servicios

Índices Compuestos:

-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);

-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);

Estrategias de Cache

Cache de Eventos:

Event Store ──► Event Cache ──► Application
     ▲              │
     └──────────────┘
     Cache Invalidation

Cache de Snapshot:

Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
                        │
                        └─► Cache Miss → Load from Store

Manejo de Errores y Recuperación

Confiabilidad del Procesamiento de Eventos

Manejadores de Eventos Idempotentes:

class IdempotentEventHandler {
  private processedEvents: Set<string> = new Set();

  async handle(event: DomainEvent): Promise<void> {
    if (this.processedEvents.has(event.eventId)) {
      return; // Already processed
    }

    try {
      await this.processEvent(event);
      this.processedEvents.add(event.eventId);
    } catch (error) {
      // Log error and retry logic
      throw error;
    }
  }
}

Patrón Dead Letter Queue

Manejo de Eventos con Fallo:

Event Processor ──► Success ──► Continue
     │
     └─► Failure ──► Dead Letter Queue ──► Manual Review

Pruebas de Sistemas Event-Sourced

Pruebas Basadas en Eventos

Patrón Given-When-Then:

describe('Order Aggregate', () => {
  it('should handle order placement', () => {
    // Given
    const order = new OrderAggregate();

    // When
    order.placeOrder({
      customerId: 'cust-123',
      items: [{ productId: 'prod-456', quantity: 1 }]
    });

    // Then
    expect(order.getUncommittedEvents()).toContainEqual(
      expect.objectContaining({
        eventType: 'OrderPlaced',
        payload: expect.objectContaining({
          customerId: 'cust-123'
        })
      })
    );
  });
});

Prueba de Event Stream

Verificación de Event Stream:

it('should maintain event order', async () => {
  const eventStore = new InMemoryEventStore();

  // Append events
  await eventStore.append([
    { eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
    { eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
    { eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
  ]);

  // Verify order
  const events = await eventStore.getEvents('order-1');
  expect(events.map(e => e.eventType)).toEqual([
    'Created', 'Updated', 'Shipped'
  ]);
});

Monitoreo y Observabilidad

Métricas de Procesamiento de Eventos

Métricas Principales a Rastrear:

  • Latencia de procesamiento de eventos
  • Tasa de procesamiento de eventos
  • Tasas de error por tipo de evento
  • Tiempos de reconstrucción de aggregate
  • Frecuencia de snapshots y tamaño

Panel de Monitoreo:

Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day

Anti-Patrones y Trampas

Errores Comunes

Eventos Mutables:

❌ Modifying event payload after creation
✅ Immutable events with new versions

Payloads de Eventos Grandes:

❌ Storing entire objects in events
✅ Reference IDs with separate data storage

Procesamiento Síncrono de Eventos:

❌ Blocking on event processing
✅ Asynchronous event handling

Integración con GitScrum

Gestión de Tareas Orientada a Eventos

Event Sourcing para Tareas:

  • Rastrear todos los cambios de estado de tareas como eventos
  • Permitir reconstrucción del historial de tareas
  • Soporte a consultas temporales para análisis de proyecto

Implementación de Pista de Auditoría:

  • Historial completo de modificaciones de tareas
  • Atribución de usuario para todos los cambios
  • Reportes de cumplimiento

Automatización de Workflow

Workflows Orientados a Eventos:

Task Created ──► Assign Reviewer ──► Code Review ──► Merge
     │                │                     │          │
     └────────────────┼─────────────────────┼──────────┘
              Event Bus for Coordination

Soluciones Relacionadas