Patrons et Implémentation d'Event Sourcing
L'event sourcing est un patron architectural puissant qui stocke tous les changements d'état de l'application comme une séquence d'événements, permettant des pistes d'audit fiables, des requêtes temporelles et la reconstruction du système. Cette approche fournit une cohérence de données supérieure et des capacités de débogage comparées aux opérations CRUD traditionnelles.
Concepts Fondamentaux d'Event Sourcing
Architecture de l'Event Store
L'event store est le composant central qui persiste tous les événements de domaine :
Structure de l'Événement :
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "OrderPlaced",
"aggregateId": "order-123",
"aggregateType": "Order",
"eventVersion": 1,
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"customerId": "cust-456",
"items": [
{"productId": "prod-789", "quantity": 2, "price": 29.99}
],
"totalAmount": 59.98
},
"metadata": {
"userId": "user-123",
"correlationId": "req-abc-123",
"causationId": "cmd-place-order-456"
}
}
Opérations de l'Event Store :
Append Event ──► Event Store ──► Event Stream
Read Events ──► Event Store ──► Rebuild State
Intégration avec le Patron Aggregate
Les aggregates dans l'event sourcing maintiennent l'état à travers l'application d'événements :
Gestion d'État de l'Aggregate :
class OrderAggregate {
private state: OrderState;
private uncommittedEvents: DomainEvent[] = [];
apply(event: DomainEvent): void {
this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}
private applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderPlaced':
return { ...state, status: 'PLACED', items: event.payload.items };
case 'OrderShipped':
return { ...state, status: 'SHIPPED', shippedAt: event.timestamp };
default:
return state;
}
}
}
Patrons d'Event Sourcing
Replay d'Événements et Snapshots
Processus de Replay d'Événements :
Event Stream: [E1, E2, E3, ..., E1000]
Snapshot: State at E500
Replay: Apply E501 to E1000 to current state
Stratégie de Snapshot :
interface Snapshot {
aggregateId: string;
version: number;
state: AggregateState;
timestamp: Date;
}
class SnapshotStore {
async save(snapshot: Snapshot): Promise<void> {
// Persist snapshot for faster loading
}
async load(aggregateId: string): Promise<Snapshot | null> {
// Load most recent snapshot
}
}
Versionnement et Évolution des Événements
Patron Upcasting :
interface EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean;
upcast(event: StoredEvent): StoredEvent;
}
class OrderEventUpcaster implements EventUpcaster {
canUpcast(eventType: string, fromVersion: number): boolean {
return eventType === 'OrderPlaced' && fromVersion === 1;
}
upcast(event: StoredEvent): StoredEvent {
if (event.payload.totalAmount === undefined) {
// Calculate total from items if missing
const total = event.payload.items.reduce(
(sum, item) => sum + (item.price * item.quantity), 0
);
return {
...event,
payload: { ...event.payload, totalAmount: total }
};
}
return event;
}
}
Intégration avec CQRS
Séparation des Responsabilités Commande et Requête :
Write Side (Commands) Read Side (Queries)
│ │
▼ ▼
Event Store ──► Project Events ──► Read Models
▲ │
└───────────────────────────────┘
Eventual Consistency
Projection du Read Model :
class OrderReadModel {
private orders: Map<string, OrderView> = new Map();
project(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this.orders.set(event.aggregateId, {
id: event.aggregateId,
customerId: event.payload.customerId,
status: 'PLACED',
totalAmount: event.payload.totalAmount,
placedAt: event.timestamp
});
break;
case 'OrderShipped':
const order = this.orders.get(event.aggregateId);
if (order) {
order.status = 'SHIPPED';
order.shippedAt = event.timestamp;
}
break;
}
}
}
Gestion de la Cohérence Éventuelle
Patron Saga pour les Transactions Distribuées
Saga Orchstrée :
Order Service ──► Payment Service ──► Inventory Service
│ │ │
└─ OrderPlaced ──────┼─ PaymentProcessed ─┘
│
└─ PaymentFailed (compensate)
Saga Chorégraphiée :
Order Service ──► OrderPlaced Event ──► Payment Service
│
▼
PaymentProcessed Event ──► Inventory Service
Patrons de Cohérence Éventuelle
Read Repair :
Client Request ──► Stale Data Detected ──► Background Update
│ │
└─────────────────────────────────────────┘
Return current data
Write-Ahead Logging :
Write Operation ──► Event Store ──► Background Projection
│ │ │
└──────────────────────┼───────────────┘
Immediate ACK Eventual consistency
Optimisation des Performances
Indexation de l'Event Store
Stratégies d'Indexation :
- Indice d'Aggregate ID : Recherche rapide d'événements pour des aggregates spécifiques
- Indice d'Event Type : Requête d'événements par type dans les aggregates
- Indice de Timestamp : Requêtes d'événements basées sur le temps
- Indice de Correlation ID : Suivi des requêtes entre services
Indices Composés :
-- For efficient event stream queries
CREATE INDEX idx_event_stream
ON events (aggregate_type, aggregate_id, version);
-- For temporal queries
CREATE INDEX idx_event_time
ON events (event_type, timestamp DESC);
Stratégies de Cache
Cache d'Événements :
Event Store ──► Event Cache ──► Application
▲ │
└──────────────┘
Cache Invalidation
Cache de Snapshot :
Aggregate Loader ──► Snapshot Cache ──► Rebuild from Events
│
└─► Cache Miss → Load from Store
Gestion d'Erreurs et Récupération
Fiabilité du Traitement d'Événements
Gestionnaires d'Événements Idempotents :
class IdempotentEventHandler {
private processedEvents: Set<string> = new Set();
async handle(event: DomainEvent): Promise<void> {
if (this.processedEvents.has(event.eventId)) {
return; // Already processed
}
try {
await this.processEvent(event);
this.processedEvents.add(event.eventId);
} catch (error) {
// Log error and retry logic
throw error;
}
}
}
Patron Dead Letter Queue
Gestion d'Événements en Échec :
Event Processor ──► Success ──► Continue
│
└─► Failure ──► Dead Letter Queue ──► Manual Review
Tests de Systèmes Event-Sourced
Tests Basés sur les Événements
Patron Given-When-Then :
describe('Order Aggregate', () => {
it('should handle order placement', () => {
// Given
const order = new OrderAggregate();
// When
order.placeOrder({
customerId: 'cust-123',
items: [{ productId: 'prod-456', quantity: 1 }]
});
// Then
expect(order.getUncommittedEvents()).toContainEqual(
expect.objectContaining({
eventType: 'OrderPlaced',
payload: expect.objectContaining({
customerId: 'cust-123'
})
})
);
});
});
Test d'Event Stream
Vérification d'Event Stream :
it('should maintain event order', async () => {
const eventStore = new InMemoryEventStore();
// Append events
await eventStore.append([
{ eventId: '1', aggregateId: 'order-1', eventType: 'Created' },
{ eventId: '2', aggregateId: 'order-1', eventType: 'Updated' },
{ eventId: '3', aggregateId: 'order-1', eventType: 'Shipped' }
]);
// Verify order
const events = await eventStore.getEvents('order-1');
expect(events.map(e => e.eventType)).toEqual([
'Created', 'Updated', 'Shipped'
]);
});
Monitoring et Observabilité
Métriques de Traitement d'Événements
Métriques Principales à Suivre :
- Latence de traitement d'événements
- Taux de traitement d'événements
- Taux d'erreur par type d'événement
- Temps de reconstruction d'aggregate
- Fréquence de snapshots et taille
Tableau de Bord de Monitoring :
Event Processing Health:
├── Events/sec: 1,250
├── Avg Latency: 45ms
├── Error Rate: 0.1%
├── Dead Letters: 3
└── Snapshots Created: 150/day
Anti-Patrons et Pièges
Erreurs Courantes
Événements Mutables :
❌ Modifying event payload after creation
✅ Immutable events with new versions
Payloads d'Événements Grandes :
❌ Storing entire objects in events
✅ Reference IDs with separate data storage
Traitement Synchrone d'Événements :
❌ Blocking on event processing
✅ Asynchronous event handling
Intégration avec GitScrum
Gestion de Tâches Orientée Événements
Event Sourcing pour les Tâches :
- Suivre tous les changements d'état de tâches comme événements
- Permettre la reconstruction de l'historique des tâches
- Support des requêtes temporelles pour les analyses de projet
Implémentation de Piste d'Audit :
- Historique complet des modifications de tâches
- Attribution d'utilisateur pour tous les changements
- Rapports de conformité
Automatisation de Workflow
Workflows Orientés Événements :
Task Created ──► Assign Reviewer ──► Code Review ──► Merge
│ │ │ │
└────────────────┼─────────────────────┼──────────┘
Event Bus for Coordination