Essayer gratuitement
7 min lecture Guide 854 of 877

Patrons et Implémentation d'Event Sourcing

L'event sourcing est un patron architectural puissant qui stocke tous les changements d'état de l'application comme une séquence d'événements, permettant des pistes d'audit fiables, des requêtes temporelles et la reconstruction du système. Cette approche fournit une cohérence de données supérieure et des capacités de débogage comparées aux opérations CRUD traditionnelles.

Concepts Fondamentaux d'Event Sourcing

Architecture de l'Event Store

L'event store est le composant central qui persiste tous les événements de domaine :

Structure de l'Événement :

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "eventType": "OrderPlaced",
  "aggregateId": "order-123",
  "aggregateType": "Order",
  "eventVersion": 1,
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "customerId": "cust-456",
    "items": [
      {"productId": "prod-789", "quantity": 2, "price": 29.99}
    ],
    "totalAmount": 59.98
  },
  "metadata": {
    "userId": "user-123",
    "correlationId": "req-abc-123",
    "causationId": "cmd-place-order-456"
  }
}

Opérations de l'Event Store :

Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State

Intégration avec le Patron Aggregate

Les aggregates dans l'event sourcing maintiennent l'état à travers l'application d'événements :

Gestion d'État de l'Aggregate :

class OrderAggregate {
  private state: OrderState;
  private uncommittedEvents: DomainEvent[] = [];

  apply(event: DomainEvent): void {
    this.state = this.applyEvent(this.state, event);
    this.uncommittedEvents.push(event);
  }

  private applyEvent(state: OrderState, event: DomainEvent): OrderState {
    switch (event.eventType) {
      case 'OrderPlaced':
        return { ...state, status: 'PLACED', items: event.payload.items };
      case 'OrderShipped':
        return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
      default:
        return state;
    }
  }
}

Patrons d'Event Sourcing

Replay d'Événements et Snapshots

Processus de Replay d'Événements :

Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state

Stratégie de Snapshot :

interface Snapshot {
  aggregateId: string;
  version: number;
  state: AggregateState;
  timestamp: Date;
}

class SnapshotStore {
  async save(snapshot: Snapshot): Promise<void> {
    // Persist snapshot for faster loading
  }

  async load(aggregateId: string): Promise<Snapshot | null> {
    // Load most recent snapshot
  }
}

Versionnement et Évolution des Événements

Patron Upcasting :

interface EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean;
  upcast(event: StoredEvent): StoredEvent;
}

class OrderEventUpcaster implements EventUpcaster {
  canUpcast(eventType: string, fromVersion: number): boolean {
    return eventType === 'OrderPlaced' && fromVersion === 1;
  }

  upcast(event: StoredEvent): StoredEvent {
    if (event.payload.totalAmount === undefined) {
      // Calculate total from items if missing
      const total = event.payload.items.reduce(
        (sum, item) => sum + (item.price * item.quantity), 0
      );
      return {
        ...event,
        payload: { ...event.payload, totalAmount: total }
      };
    }
    return event;
  }
}

Intégration avec CQRS

Séparation des Responsabilités Commande et Requête :

Write Side (Commands)          Read Side (Queries)
    │                               │
    ▼                               ▼
Event Store ──► Project Events ──► Read Models
    ▲                               │
    └───────────────────────────────┘
            Eventual Consistency

Projection du Read Model :

class OrderReadModel {
  private orders: Map<string, OrderView> = new Map();

  project(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this.orders.set(event.aggregateId, {
          id: event.aggregateId,
          customerId: event.payload.customerId,
          status: 'PLACED',
          totalAmount: event.payload.totalAmount,
          placedAt: event.timestamp
        });
        break;
      case 'OrderShipped':
        const order = this.orders.get(event.aggregateId);
        if (order) {
          order.status = 'SHIPPED';
          order.shippedAt = event.timestamp;
        }
        break;
    }
  }
}

Gestion de la Cohérence Éventuelle

Patron Saga pour les Transactions Distribuées

Saga Orchstrée :

Order Service ──► Payment Service ──► Inventory Service
     │                    │                    │
     └─ OrderPlaced ──────┼─ PaymentProcessed ─┘
                          │
                          └─ PaymentFailed (compensate)

Saga Chorégraphiée :

Order Service ──► OrderPlaced Event ──► Payment Service
                                                │
                                                ▼
                                     PaymentProcessed Event ──► Inventory Service

Patrons de Cohérence Éventuelle

Read Repair :

Client Request ──► Stale Data Detected ──► Background Update
     │                                         │
     └─────────────────────────────────────────┘
                  Return current data

Write-Ahead Logging :

Write Operation ──► Event Store ──► Background Projection
     │                      │               │
     └──────────────────────┼───────────────┘
            Immediate ACK       Eventual consistency

Optimisation des Performances

Indexation de l'Event Store

Stratégies d'Indexation :

  • Indice d'Aggregate ID : Recherche rapide d'événements pour des aggregates spécifiques
  • Indice d'Event Type : Requête d'événements par type dans les aggregates
  • Indice de Timestamp : Requêtes d'événements basées sur le temps
  • Indice de Correlation ID : Suivi des requêtes entre services

Indices Composés :

-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);

-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);

Stratégies de Cache

Cache d'Événements :

Event Store ──► Event Cache ──► Application
     ▲              │
     └──────────────┘
     Cache Invalidation

Cache de Snapshot :

Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
                        │
                        └─► Cache Miss → Load from Store

Gestion d'Erreurs et Récupération

Fiabilité du Traitement d'Événements

Gestionnaires d'Événements Idempotents :

class IdempotentEventHandler {
  private processedEvents: Set<string> = new Set();

  async handle(event: DomainEvent): Promise<void> {
    if (this.processedEvents.has(event.eventId)) {
      return; // Already processed
    }

    try {
      await this.processEvent(event);
      this.processedEvents.add(event.eventId);
    } catch (error) {
      // Log error and retry logic
      throw error;
    }
  }
}

Patron Dead Letter Queue

Gestion d'Événements en Échec :

Event Processor ──► Success ──► Continue
     │
     └─► Failure ──► Dead Letter Queue ──► Manual Review

Tests de Systèmes Event-Sourced

Tests Basés sur les Événements

Patron Given-When-Then :

describe('Order Aggregate', () => {
  it('should handle order placement', () => {
    // Given
    const order = new OrderAggregate();

    // When
    order.placeOrder({
      customerId: 'cust-123',
      items: [{ productId: 'prod-456', quantity: 1 }]
    });

    // Then
    expect(order.getUncommittedEvents()).toContainEqual(
      expect.objectContaining({
        eventType: 'OrderPlaced',
        payload: expect.objectContaining({
          customerId: 'cust-123'
        })
      })
    );
  });
});

Test d'Event Stream

Vérification d'Event Stream :

it('should maintain event order', async () => {
  const eventStore = new InMemoryEventStore();

  // Append events
  await eventStore.append([
    { eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
    { eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
    { eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
  ]);

  // Verify order
  const events = await eventStore.getEvents('order-1');
  expect(events.map(e => e.eventType)).toEqual([
    'Created', 'Updated', 'Shipped'
  ]);
});

Monitoring et Observabilité

Métriques de Traitement d'Événements

Métriques Principales à Suivre :

  • Latence de traitement d'événements
  • Taux de traitement d'événements
  • Taux d'erreur par type d'événement
  • Temps de reconstruction d'aggregate
  • Fréquence de snapshots et taille

Tableau de Bord de Monitoring :

Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day

Anti-Patrons et Pièges

Erreurs Courantes

Événements Mutables :

❌ Modifying event payload after creation
✅ Immutable events with new versions

Payloads d'Événements Grandes :

❌ Storing entire objects in events
✅ Reference IDs with separate data storage

Traitement Synchrone d'Événements :

❌ Blocking on event processing
✅ Asynchronous event handling

Intégration avec GitScrum

Gestion de Tâches Orientée Événements

Event Sourcing pour les Tâches :

  • Suivre tous les changements d'état de tâches comme événements
  • Permettre la reconstruction de l'historique des tâches
  • Support des requêtes temporelles pour les analyses de projet

Implémentation de Piste d'Audit :

  • Historique complet des modifications de tâches
  • Attribution d'utilisateur pour tous les changements
  • Rapports de conformité

Automatisation de Workflow

Workflows Orientés Événements :

Task Created ──► Assign Reviewer ──► Code Review ──► Merge
     │                │                     │          │
     └────────────────┼─────────────────────┼──────────┘
              Event Bus for Coordination

Solutions Connexes