Introduction
Event-driven architecture (EDA) has become the backbone of modern distributed systems. Unlike traditional request-response patterns where Service A directly calls Service B and waits for a response, EDA enables services to communicate asynchronously through events. This fundamental shift in how services interact leads to better scalability, loose coupling, and resilience.
In this comprehensive deep dive, we'll build a production-ready event-driven system using NestJS and Apache Kafka, implementing patterns like Event Sourcing and CQRS (Command Query Responsibility Segregation). By the end of this article, you'll understand not just the "how" but also the "why" behind each architectural decision.
Why Event-Driven Architecture?
Before diving into implementation, let's understand the problems that event-driven architecture solves. Traditional monolithic and synchronous microservice architectures face several challenges at scale:
The Problems with Synchronous Communication
Tight Coupling: In a synchronous system, when Service A needs data from Service B, it makes a direct HTTP call. This creates a compile-time and runtime dependency. If Service B changes its API, Service A must be updated. If Service B is slow, Service A is slow. If Service B is down, Service A fails.
Cascading Failures: Imagine a checkout flow: Order Service → Payment Service → Inventory Service → Notification Service. If Notification Service is down, the entire checkout fails, even though the payment was successful. This is the infamous "distributed monolith" anti-pattern.
Scaling Limitations: In monolithic systems, you must scale the entire application even if only one component is under load. In synchronous microservices, scaling one service often requires scaling its dependencies too.
Temporal Coupling: Both services must be available at the same time. This seems obvious, but in distributed systems, "at the same time" is harder than it sounds due to network partitions, deployments, and failures.
How Event-Driven Architecture Solves These Problems
Event-driven architecture addresses these challenges by introducing a message broker (like Kafka) between services. Instead of Service A calling Service B directly:
This simple change has profound implications:
- Loose Coupling: Services don't know about each other; they only know about events
- Resilience: If Service B is down, events queue up and are processed when it recovers
- Scalability: Each service scales independently based on its own load
- Temporal Decoupling: Services don't need to be available simultaneously
The diagram above shows how a single event from Service A can trigger reactions in multiple downstream services through Kafka, without Service A knowing or caring about who consumes its events.
Understanding Apache Kafka
Before we write code, let's understand Kafka's core concepts that make it ideal for event-driven systems:
Topics: Named channels where events are published. Think of them as categories: "order-events", "user-events", "payment-events".
Partitions: Topics are divided into partitions for parallelism. Events with the same key (e.g., order ID) always go to the same partition, ensuring ordering.
Consumer Groups: Multiple instances of a service form a consumer group. Kafka ensures each partition is consumed by only one instance in the group, enabling horizontal scaling.
Offsets: Each message has an offset (position) in its partition. Consumers track their offset, enabling replay and exactly-once processing.
Retention: Unlike traditional queues, Kafka retains messages for a configurable period (days/weeks). This enables event replay and rebuilding state.
Setting Up NestJS with Kafka
First, let's set up our NestJS project with Kafka integration:
npm install @nestjs/microservices kafkajsKafka Module Configuration
class="code-comment">// src/kafka/kafka.module.ts
class="code-keyword">import { Module } class="code-keyword">from class="code-string">'@nestjs/common';
class="code-keyword">import { ClientsModule, Transport } class="code-keyword">from class="code-string">'@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: class="code-string">'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: class="code-string">'order-service',
brokers: [class="code-string">'localhost:class="code-number">9092'],
ssl: process.env.NODE_ENV === class="code-string">'production',
sasl: process.env.NODE_ENV === class="code-string">'production' ? {
mechanism: class="code-string">'scram-sha-class="code-number">256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
} : undefined,
},
consumer: {
groupId: class="code-string">'order-consumer-group',
sessionTimeout: class="code-number">30000,
heartbeatInterval: class="code-number">3000,
},
producer: {
allowAutoTopicCreation: false,
transactionTimeout: class="code-number">30000,
},
},
},
]),
],
exports: [ClientsModule],
})
class="code-keyword">export class KafkaModule {}
Implementing Event Sourcing
Event Sourcing is a powerful pattern that fundamentally changes how we think about data storage. Instead of storing the current state of an entity (like "Order status = SHIPPED"), we store every change that happened to it as an immutable sequence of events:
Why Event Sourcing?
Complete Audit Trail: You have a complete history of every change. This is crucial for financial systems, healthcare, and any domain requiring compliance.
Temporal Queries: You can answer questions like "What was the order status at 3 PM yesterday?" by replaying events up to that point.
Debugging: When something goes wrong, you can replay events to understand exactly what happened and in what order.
Event Replay: You can rebuild read models, fix bugs in projections, or create entirely new views by replaying the event history.
Domain Modeling: Events naturally align with business language. "OrderShipped" is more meaningful than "UPDATE orders SET status = 'shipped'".
The Trade-offs
Event Sourcing adds complexity. Consider it when you need:
- Complete audit trails (financial, healthcare, legal)
- Temporal queries or "time travel"
- Complex domains where events capture intent
- CQRS (separating read/write models)
Event Store Implementation
class="code-comment">// src/event-store/event-store.service.ts
class="code-keyword">import { Injectable } class="code-keyword">from class="code-string">'@nestjs/common';
class="code-keyword">import { InjectRepository } class="code-keyword">from class="code-string">'@nestjs/typeorm';
class="code-keyword">import { Repository } class="code-keyword">from class="code-string">'typeorm';
class="code-keyword">import { StoredEvent } class="code-keyword">from class="code-string">'./stored-event.entity';
class="code-keyword">export interface DomainEvent {
eventType: string;
aggregateId: string;
aggregateType: string;
payload: Record<string, any>;
metadata: {
correlationId: string;
causationId: string;
userId?: string;
timestamp: Date;
version: number;
};
}
@Injectable()
class="code-keyword">export class EventStoreService {
constructor(
@InjectRepository(StoredEvent)
private readonly eventRepository: Repository<StoredEvent>,
) {}
async append(event: DomainEvent): Promise<StoredEvent> {
class="code-comment">// Optimistic locking - check version
class="code-keyword">const lastEvent = await this.eventRepository.findOne({
where: { aggregateId: event.aggregateId },
order: { version: class="code-string">'DESC' },
});
class="code-keyword">const expectedVersion = lastEvent ? lastEvent.version + class="code-number">1 : class="code-number">1;
class="code-keyword">if (event.metadata.version !== expectedVersion) {
throw new ConcurrencyException(
class="code-string">Expected version ${expectedVersion}, got ${event.metadata.version}
);
}
class="code-keyword">const storedEvent = this.eventRepository.create({
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload,
metadata: event.metadata,
version: expectedVersion,
createdAt: new Date(),
});
class="code-keyword">return this.eventRepository.save(storedEvent);
}
async getEvents(
aggregateId: string,
fromVersion?: number
): Promise<StoredEvent[]> {
class="code-keyword">const query = this.eventRepository
.createQueryBuilder(class="code-string">'event')
.where(class="code-string">'event.aggregateId = :aggregateId', { aggregateId })
.orderBy(class="code-string">'event.version', class="code-string">'ASC');
class="code-keyword">if (fromVersion) {
query.andWhere(class="code-string">'event.version > :fromVersion', { fromVersion });
}
class="code-keyword">return query.getMany();
}
async replayEvents<T>(
aggregateId: string,
reducer: (state: T, event: StoredEvent) => T,
initialState: T,
): Promise<T> {
class="code-keyword">const events = await this.getEvents(aggregateId);
class="code-keyword">return events.reduce(reducer, initialState);
}
}
Aggregate Root Pattern
class="code-comment">// src/domain/aggregate-root.ts
class="code-keyword">export abstract class AggregateRoot {
private _uncommittedEvents: DomainEvent[] = [];
private _version: number = class="code-number">0;
get uncommittedEvents(): DomainEvent[] {
class="code-keyword">return [...this._uncommittedEvents];
}
get version(): number {
class="code-keyword">return this._version;
}
protected apply(event: DomainEvent): void {
this.when(event);
this._uncommittedEvents.push(event);
this._version++;
}
protected abstract when(event: DomainEvent): void;
public loadFromHistory(events: DomainEvent[]): void {
events.forEach(event => {
this.when(event);
this._version++;
});
}
public clearUncommittedEvents(): void {
this._uncommittedEvents = [];
}
}
class="code-comment">// src/domain/order/order.aggregate.ts
class="code-keyword">export class OrderAggregate extends AggregateRoot {
private _id: string;
private _status: OrderStatus;
private _items: OrderItem[] = [];
private _totalAmount: number = class="code-number">0;
static create(command: CreateOrderCommand): OrderAggregate {
class="code-keyword">const order = new OrderAggregate();
order.apply({
eventType: class="code-string">'OrderCreated',
aggregateId: command.orderId,
aggregateType: class="code-string">'Order',
payload: {
customerId: command.customerId,
items: command.items,
},
metadata: {
correlationId: command.correlationId,
causationId: command.commandId,
userId: command.userId,
timestamp: new Date(),
version: class="code-number">1,
},
});
class="code-keyword">return order;
}
confirm(): void {
class="code-keyword">if (this._status !== OrderStatus.PENDING) {
throw new InvalidOperationException(class="code-string">'Order must be pending to confirm');
}
this.apply({
eventType: class="code-string">'OrderConfirmed',
aggregateId: this._id,
aggregateType: class="code-string">'Order',
payload: { confirmedAt: new Date() },
metadata: {
correlationId: generateId(),
causationId: generateId(),
timestamp: new Date(),
version: this.version + class="code-number">1,
},
});
}
protected when(event: DomainEvent): void {
switch (event.eventType) {
case class="code-string">'OrderCreated':
this._id = event.aggregateId;
this._status = OrderStatus.PENDING;
this._items = event.payload.items;
this._totalAmount = this.calculateTotal(event.payload.items);
break;
case class="code-string">'OrderConfirmed':
this._status = OrderStatus.CONFIRMED;
break;
case class="code-string">'OrderShipped':
this._status = OrderStatus.SHIPPED;
break;
}
}
}
CQRS Implementation
CQRS (Command Query Responsibility Segregation) is a pattern that separates read and write operations into different models. This might seem like unnecessary complexity at first, but it solves real problems in complex systems.
Why Separate Reads and Writes?
In traditional architectures, the same model serves both reads and writes. This creates several challenges:
Different Optimization Needs: Writes need to enforce business rules, validate data, and maintain consistency. Reads need to be fast, often joining data from multiple entities. Optimizing for one often hurts the other.
Scaling Asymmetry: Most systems have far more reads than writes (often 100:1 or more). With a single model, you can't scale them independently.
Complex Queries: Read models often need denormalized data (user name + order count + last purchase date). Normalizing for writes means expensive JOINs for reads.
Event Sourcing Compatibility: If you're using Event Sourcing, you don't have a traditional database to query. You need projections (read models) built from events.
How CQRS Works
Command Side (Write Model):
- Receives commands (CreateOrder, ConfirmOrder)
- Validates business rules
- Persists events to the event store
- Publishes events to Kafka
- Subscribes to events
- Updates denormalized read models (projections)
- Serves fast queries without complex JOINs
Command Side
class="code-comment">// src/order/commands/handlers/create-order.handler.ts
class="code-keyword">import { CommandHandler, ICommandHandler, EventBus } class="code-keyword">from class="code-string">'@nestjs/cqrs';
class="code-keyword">import { Inject } class="code-keyword">from class="code-string">'@nestjs/common';
@CommandHandler(CreateOrderCommand)
class="code-keyword">export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly eventStore: EventStoreService,
="code-keyword">private readonly eventBus: EventBus,
@Inject(class="code-string">'KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
class="code-comment">// Create aggregate
class="code-keyword">const order = OrderAggregate.create(command);
class="code-comment">// Persist events
class="code-keyword">for (class="code-keyword">const event of order.uncommittedEvents) {
await this.eventStore.append(event);
class="code-comment">// Publish to Kafka class="code-keyword">for other services
await this.kafkaClient.emit(class="code-string">'order.events', {
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
class="code-string">'event-type': event.eventType,
class="code-string">'correlation-id': event.metadata.correlationId,
},
});
}
order.clearUncommittedEvents();
class="code-keyword">return order.id;
}
}
Query Side with Projections
class="code-comment">// src/order/projections/order-list.projection.ts
@Injectable()
class="code-keyword">export class OrderListProjection {
constructor(
@InjectRepository(OrderReadModel)
private readonly readRepository: Repository<OrderReadModel>,
) {}
@EventsHandler(OrderCreatedEvent, OrderConfirmedEvent, OrderShippedEvent)
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case class="code-string">'OrderCreated':
await this.readRepository.save({
id: event.aggregateId,
customerId: event.payload.customerId,
status: class="code-string">'PENDING',
totalAmount: this.calculateTotal(event.payload.items),
itemCount: event.payload.items.length,
createdAt: event.metadata.timestamp,
updatedAt: event.metadata.timestamp,
});
break;
case class="code-string">'OrderConfirmed':
await this.readRepository.update(
{ id: event.aggregateId },
{
status: class="code-string">'CONFIRMED',
updatedAt: event.metadata.timestamp
}
);
break;
}
}
}
Handling Eventual Consistency
In distributed systems, eventual consistency is not just a limitation—it's a fundamental property we must embrace. The CAP theorem tells us we can't have perfect consistency and availability during network partitions. In practice, most systems choose availability and eventual consistency.
What is Eventual Consistency?
When you create an order, the write model is updated immediately. But the read model (the dashboard showing "10 orders today") might take a few hundred milliseconds to reflect the new order. During this window, different parts of your system might show different data.
This isn't a bug—it's a feature. It enables:
- Higher availability (services don't wait for each other)
- Better performance (no distributed locks)
- Simpler scaling (independent services)
Strategies for Handling Eventual Consistency
1. Optimistic UI Updates: Update the UI immediately after a command succeeds, before the read model updates. The user sees instant feedback.
2. Polling/Refresh: For critical operations, poll the read model until it reflects the expected change.
3. Event Subscriptions: Use WebSockets to push updates to the UI when projections update.
4. Compensation: If something goes wrong, publish compensating events to undo the operation.
Saga Pattern for Distributed Transactions
The Saga pattern handles distributed transactions across services. Unlike traditional database transactions (BEGIN, COMMIT, ROLLBACK), sagas use a sequence of local transactions with compensating actions for rollback.
Consider an order fulfillment flow:
Each step is a local transaction. If step 3 fails, we don't "rollback" in the database sense—we execute compensating transactions (refund, release) to undo the previous steps.
class="code-comment">// src/sagas/order-fulfillment.saga.ts
@Injectable()
class="code-keyword">export class OrderFulfillmentSaga {
constructor(
private readonly commandBus: CommandBus,
@Inject(class="code-string">'KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
) {}
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
class="code-keyword">return events$.pipe(
ofType(OrderCreatedEvent),
map(event => {
class="code-comment">// Start saga - reserve inventory
class="code-keyword">return new ReserveInventoryCommand({
orderId: event.aggregateId,
items: event.payload.items,
correlationId: event.metadata.correlationId,
});
}),
);
};
@EventPattern(class="code-string">'inventory.reserved')
async onInventoryReserved(data: InventoryReservedEvent): Promise<void> {
class="code-comment">// Continue saga - process payment
await this.commandBus.execute(
new ProcessPaymentCommand({
orderId: data.orderId,
amount: data.totalAmount,
correlationId: data.correlationId,
})
);
}
@EventPattern(class="code-string">'payment.failed')
async onPaymentFailed(data: PaymentFailedEvent): Promise<void> {
class="code-comment">// Compensating transaction - release inventory
await this.commandBus.execute(
new ReleaseInventoryCommand({
orderId: data.orderId,
reason: class="code-string">'Payment failed',
correlationId: data.correlationId,
})
);
class="code-comment">// Mark order as failed
await this.commandBus.execute(
new FailOrderCommand({
orderId: data.orderId,
reason: data.failureReason,
})
);
}
}
Idempotency Handling
class="code-comment">// src/common/decorators/idempotent.decorator.ts
@Injectable()
class="code-keyword">export class IdempotencyGuard implements CanActivate {
constructor(
@InjectRedis() private readonly redis: Redis,
) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
class="code-keyword">const request = context.switchToHttp().getRequest();
class="code-keyword">const idempotencyKey = request.headers[class="code-string">'idempotency-key'];
class="code-keyword">if (!idempotencyKey) {
class="code-keyword">return true; class="code-comment">// No idempotency required
}
class="code-keyword">const existing = await this.redis.get(class="code-string">idempotency:${idempotencyKey});
class="code-keyword">if (existing) {
class="code-comment">// Return cached response
class="code-keyword">const response = context.switchToHttp().getResponse();
response.status(class="code-number">200).json(JSON.parse(existing));
class="code-keyword">return false;
}
class="code-keyword">return true;
}
}
class="code-comment">// Store response after successful operation
@Injectable()
class="code-keyword">export class IdempotencyInterceptor implements NestInterceptor {
constructor(@InjectRedis() private readonly redis: Redis) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
class="code-keyword">const request = context.switchToHttp().getRequest();
class="code-keyword">const idempotencyKey = request.headers[class="code-string">'idempotency-key'];
class="code-keyword">return next.handle().pipe(
tap(async (response) => {
class="code-keyword">if (idempotencyKey) {
await this.redis.setex(
class="code-string">idempotency:${idempotencyKey},
class="code-number">86400, class="code-comment">// class="code-number">24 hours
JSON.stringify(response)
);
}
}),
);
}
}
Production Considerations
Dead Letter Queue (DLQ)
class="code-comment">// Handle failed messages
@Injectable()
class="code-keyword">export class DeadLetterQueueHandler {
@EventPattern(class="code-string">'order.events.dlq')
async handleDeadLetter(
@Payload() message: any,
@Ctx() context: KafkaContext,
): Promise<void> {
class="code-keyword">const originalTopic = context.getMessage().headers[class="code-string">'original-topic'];
class="code-keyword">const failureReason = context.getMessage().headers[class="code-string">'failure-reason'];
class="code-keyword">const retryCount = parseInt(
context.getMessage().headers[class="code-string">'retry-count'] || class="code-string">'class="code-number">0'
);
class="code-comment">// Log class="code-keyword">for investigation
this.logger.error(class="code-string">'Dead letter received', {
originalTopic,
failureReason,
retryCount,
message,
});
class="code-comment">// Store in database class="code-keyword">for manual review
await this.deadLetterRepository.save({
originalTopic,
message: JSON.stringify(message),
failureReason,
retryCount,
createdAt: new Date(),
});
class="code-comment">// Alert class="code-keyword">if critical
class="code-keyword">if (this.isCriticalEvent(message)) {
await this.alertService.sendAlert({
severity: class="code-string">'high',
message: class="code-string">Critical event failed: ${message.eventType},
});
}
}
}
Conclusion
Event-driven architecture with NestJS and Kafka provides a robust foundation for building scalable microservices. Let's summarize what we've learned and when to apply these patterns.
Key Takeaways
Event Sourcing gives you a complete audit trail and enables temporal queries. Use it when you need compliance, debugging capabilities, or when your domain is naturally event-oriented. Avoid it for simple CRUD applications.
CQRS allows independent scaling of read and write workloads. Use it when reads and writes have different performance requirements, or when you need complex read models. It pairs naturally with Event Sourcing.
Sagas handle distributed transactions with compensating actions. They're essential for any multi-service workflow where atomicity across services is required. Design your compensating actions carefully—they're as important as the happy path.
Idempotency ensures exactly-once processing semantics. Always implement idempotency for event handlers. Network failures will cause retries, and without idempotency, you'll process events multiple times.
When to Use Event-Driven Architecture
Good fit:
- High-scale systems with many services
- Systems requiring loose coupling and independent deployability
- Domains with complex business workflows (e-commerce, finance, logistics)
- Systems requiring audit trails and compliance
- Simple CRUD applications
- Systems requiring strong consistency (banking ledgers)
- Small teams without operational capacity for distributed systems
- Prototypes and MVPs (start simple, evolve later)
Production Considerations
Before going to production, ensure you have:
- Monitoring: Track consumer lag, message throughput, and processing errors
- Alerting: Alert on consumer group lag exceeding thresholds
- Dead Letter Queues: Capture and investigate failed messages
- Schema Registry: Version your event schemas to handle evolution
- Exactly-Once Semantics: Use Kafka transactions for critical workflows