Building Scalable Event-Driven Architecture with NestJS and Kafka
Back to Blog
NestJSKafkaMicroservicesEvent SourcingCQRS

Building Scalable Event-Driven Architecture with NestJS and Kafka

Bao Trong
Bao Trong
January 20, 2026
25 min read

Introduction

Event-driven architecture (EDA) has become the backbone of modern distributed systems. Unlike traditional request-response patterns where Service A directly calls Service B and waits for a response, EDA enables services to communicate asynchronously through events. This fundamental shift in how services interact leads to better scalability, loose coupling, and resilience.

In this comprehensive deep dive, we'll build a production-ready event-driven system using NestJS and Apache Kafka, implementing patterns like Event Sourcing and CQRS (Command Query Responsibility Segregation). By the end of this article, you'll understand not just the "how" but also the "why" behind each architectural decision.

Why Event-Driven Architecture?

Before diving into implementation, let's understand the problems that event-driven architecture solves. Traditional monolithic and synchronous microservice architectures face several challenges at scale:

The Problems with Synchronous Communication

Tight Coupling: In a synchronous system, when Service A needs data from Service B, it makes a direct HTTP call. This creates a compile-time and runtime dependency. If Service B changes its API, Service A must be updated. If Service B is slow, Service A is slow. If Service B is down, Service A fails.

Cascading Failures: Imagine a checkout flow: Order Service → Payment Service → Inventory Service → Notification Service. If Notification Service is down, the entire checkout fails, even though the payment was successful. This is the infamous "distributed monolith" anti-pattern.

Scaling Limitations: In monolithic systems, you must scale the entire application even if only one component is under load. In synchronous microservices, scaling one service often requires scaling its dependencies too.

Temporal Coupling: Both services must be available at the same time. This seems obvious, but in distributed systems, "at the same time" is harder than it sounds due to network partitions, deployments, and failures.

How Event-Driven Architecture Solves These Problems

Event-driven architecture addresses these challenges by introducing a message broker (like Kafka) between services. Instead of Service A calling Service B directly:

  • Service A publishes an event to the broker: "OrderCreated"
  • The broker stores and distributes this event
  • Service B (and C, D, E...) subscribe to this event and react asynchronously
  • This simple change has profound implications:

    • Loose Coupling: Services don't know about each other; they only know about events
    • Resilience: If Service B is down, events queue up and are processed when it recovers
    • Scalability: Each service scales independently based on its own load
    • Temporal Decoupling: Services don't need to be available simultaneously
    Event-Driven Architecture Flow

    The diagram above shows how a single event from Service A can trigger reactions in multiple downstream services through Kafka, without Service A knowing or caring about who consumes its events.

    Understanding Apache Kafka

    Before we write code, let's understand Kafka's core concepts that make it ideal for event-driven systems:

    Topics: Named channels where events are published. Think of them as categories: "order-events", "user-events", "payment-events".

    Partitions: Topics are divided into partitions for parallelism. Events with the same key (e.g., order ID) always go to the same partition, ensuring ordering.

    Consumer Groups: Multiple instances of a service form a consumer group. Kafka ensures each partition is consumed by only one instance in the group, enabling horizontal scaling.

    Offsets: Each message has an offset (position) in its partition. Consumers track their offset, enabling replay and exactly-once processing.

    Retention: Unlike traditional queues, Kafka retains messages for a configurable period (days/weeks). This enables event replay and rebuilding state.

    Setting Up NestJS with Kafka

    First, let's set up our NestJS project with Kafka integration:

    npm install @nestjs/microservices kafkajs
    bash

    Kafka Module Configuration

    class="code-comment">// src/kafka/kafka.module.ts
    class="code-keyword">import { Module } class="code-keyword">from class="code-string">'@nestjs/common';
    class="code-keyword">import { ClientsModule, Transport } class="code-keyword">from class="code-string">'@nestjs/microservices';
    

    @Module({ imports: [ ClientsModule.register([ { name: class="code-string">'KAFKA_SERVICE', transport: Transport.KAFKA, options: { client: { clientId: class="code-string">'order-service', brokers: [class="code-string">'localhost:class="code-number">9092'], ssl: process.env.NODE_ENV === class="code-string">'production', sasl: process.env.NODE_ENV === class="code-string">'production' ? { mechanism: class="code-string">'scram-sha-class="code-number">256', username: process.env.KAFKA_USERNAME, password: process.env.KAFKA_PASSWORD, } : undefined, }, consumer: { groupId: class="code-string">'order-consumer-group', sessionTimeout: class="code-number">30000, heartbeatInterval: class="code-number">3000, }, producer: { allowAutoTopicCreation: false, transactionTimeout: class="code-number">30000, }, }, }, ]), ], exports: [ClientsModule], }) class="code-keyword">export class KafkaModule {}

    typescript

    Implementing Event Sourcing

    Event Sourcing is a powerful pattern that fundamentally changes how we think about data storage. Instead of storing the current state of an entity (like "Order status = SHIPPED"), we store every change that happened to it as an immutable sequence of events:

  • OrderCreated (items: [...], customer: "John")
  • OrderConfirmed (confirmedAt: "2024-01-20")
  • PaymentReceived (amount: 150.00)
  • OrderShipped (trackingNumber: "ABC123")
  • Why Event Sourcing?

    Complete Audit Trail: You have a complete history of every change. This is crucial for financial systems, healthcare, and any domain requiring compliance.

    Temporal Queries: You can answer questions like "What was the order status at 3 PM yesterday?" by replaying events up to that point.

    Debugging: When something goes wrong, you can replay events to understand exactly what happened and in what order.

    Event Replay: You can rebuild read models, fix bugs in projections, or create entirely new views by replaying the event history.

    Domain Modeling: Events naturally align with business language. "OrderShipped" is more meaningful than "UPDATE orders SET status = 'shipped'".

    The Trade-offs

    Event Sourcing adds complexity. Consider it when you need:

    • Complete audit trails (financial, healthcare, legal)
    • Temporal queries or "time travel"
    • Complex domains where events capture intent
    • CQRS (separating read/write models)
    Avoid it for simple CRUD applications where the additional complexity isn't justified.

    Event Store Implementation

    class="code-comment">// src/event-store/event-store.service.ts
    class="code-keyword">import { Injectable } class="code-keyword">from class="code-string">'@nestjs/common';
    class="code-keyword">import { InjectRepository } class="code-keyword">from class="code-string">'@nestjs/typeorm';
    class="code-keyword">import { Repository } class="code-keyword">from class="code-string">'typeorm';
    class="code-keyword">import { StoredEvent } class="code-keyword">from class="code-string">'./stored-event.entity';
    

    class="code-keyword">export interface DomainEvent { eventType: string; aggregateId: string; aggregateType: string; payload: Record<string, any>; metadata: { correlationId: string; causationId: string; userId?: string; timestamp: Date; version: number; }; }

    @Injectable() class="code-keyword">export class EventStoreService { constructor( @InjectRepository(StoredEvent) private readonly eventRepository: Repository<;StoredEvent>, ) {}

    async append(event: DomainEvent): Promise<StoredEvent> { class="code-comment">// Optimistic locking - check version class="code-keyword">const lastEvent = await this.eventRepository.findOne({ where: { aggregateId: event.aggregateId }, order: { version: class="code-string">'DESC' }, });

    class="code-keyword">const expectedVersion = lastEvent ? lastEvent.version + class="code-number">1 : class="code-number">1;

    class="code-keyword">if (event.metadata.version !== expectedVersion) { throw new ConcurrencyException( class="code-string">Expected version ${expectedVersion}, got ${event.metadata.version} ); }

    class="code-keyword">const storedEvent = this.eventRepository.create({ eventType: event.eventType, aggregateId: event.aggregateId, aggregateType: event.aggregateType, payload: event.payload, metadata: event.metadata, version: expectedVersion, createdAt: new Date(), });

    class="code-keyword">return this.eventRepository.save(storedEvent); }

    async getEvents( aggregateId: string, fromVersion?: number ): Promise<StoredEvent[]> { class="code-keyword">const query = this.eventRepository .createQueryBuilder(class="code-string">'event') .where(class="code-string">'event.aggregateId = :aggregateId', { aggregateId }) .orderBy(class="code-string">'event.version', class="code-string">'ASC');

    class="code-keyword">if (fromVersion) { query.andWhere(class="code-string">'event.version > :fromVersion', { fromVersion }); }

    class="code-keyword">return query.getMany(); }

    async replayEvents<T>( aggregateId: string, reducer: (state: T, event: StoredEvent) => T, initialState: T, ): Promise<T> { class="code-keyword">const events = await this.getEvents(aggregateId); class="code-keyword">return events.reduce(reducer, initialState); } }

    typescript

    Aggregate Root Pattern

    class="code-comment">// src/domain/aggregate-root.ts
    class="code-keyword">export abstract class AggregateRoot {
      private _uncommittedEvents: DomainEvent[] = [];
      private _version: number = class="code-number">0;
    

    get uncommittedEvents(): DomainEvent[] { class="code-keyword">return [...this._uncommittedEvents]; }

    get version(): number { class="code-keyword">return this._version; }

    protected apply(event: DomainEvent): void { this.when(event); this._uncommittedEvents.push(event); this._version++; }

    protected abstract when(event: DomainEvent): void;

    public loadFromHistory(events: DomainEvent[]): void { events.forEach(event => { this.when(event); this._version++; }); }

    public clearUncommittedEvents(): void { this._uncommittedEvents = []; } }

    class="code-comment">// src/domain/order/order.aggregate.ts class="code-keyword">export class OrderAggregate extends AggregateRoot { private _id: string; private _status: OrderStatus; private _items: OrderItem[] = []; private _totalAmount: number = class="code-number">0;

    static create(command: CreateOrderCommand): OrderAggregate { class="code-keyword">const order = new OrderAggregate(); order.apply({ eventType: class="code-string">'OrderCreated', aggregateId: command.orderId, aggregateType: class="code-string">'Order', payload: { customerId: command.customerId, items: command.items, }, metadata: { correlationId: command.correlationId, causationId: command.commandId, userId: command.userId, timestamp: new Date(), version: class="code-number">1, }, }); class="code-keyword">return order; }

    confirm(): void { class="code-keyword">if (this._status !== OrderStatus.PENDING) { throw new InvalidOperationException(class="code-string">'Order must be pending to confirm'); }

    this.apply({ eventType: class="code-string">'OrderConfirmed', aggregateId: this._id, aggregateType: class="code-string">'Order', payload: { confirmedAt: new Date() }, metadata: { correlationId: generateId(), causationId: generateId(), timestamp: new Date(), version: this.version + class="code-number">1, }, }); }

    protected when(event: DomainEvent): void { switch (event.eventType) { case class="code-string">'OrderCreated': this._id = event.aggregateId; this._status = OrderStatus.PENDING; this._items = event.payload.items; this._totalAmount = this.calculateTotal(event.payload.items); break; case class="code-string">'OrderConfirmed': this._status = OrderStatus.CONFIRMED; break; case class="code-string">'OrderShipped': this._status = OrderStatus.SHIPPED; break; } } }

    typescript

    CQRS Implementation

    CQRS (Command Query Responsibility Segregation) is a pattern that separates read and write operations into different models. This might seem like unnecessary complexity at first, but it solves real problems in complex systems.

    Why Separate Reads and Writes?

    In traditional architectures, the same model serves both reads and writes. This creates several challenges:

    Different Optimization Needs: Writes need to enforce business rules, validate data, and maintain consistency. Reads need to be fast, often joining data from multiple entities. Optimizing for one often hurts the other.

    Scaling Asymmetry: Most systems have far more reads than writes (often 100:1 or more). With a single model, you can't scale them independently.

    Complex Queries: Read models often need denormalized data (user name + order count + last purchase date). Normalizing for writes means expensive JOINs for reads.

    Event Sourcing Compatibility: If you're using Event Sourcing, you don't have a traditional database to query. You need projections (read models) built from events.

    How CQRS Works

    Command Side (Write Model):

    • Receives commands (CreateOrder, ConfirmOrder)
    • Validates business rules
    • Persists events to the event store
    • Publishes events to Kafka
    Query Side (Read Model):
    • Subscribes to events
    • Updates denormalized read models (projections)
    • Serves fast queries without complex JOINs
    The key insight is that the read model is eventually consistent with the write model. After a command is processed, there's a brief delay before the read model reflects the change. For most applications, this delay (typically milliseconds to seconds) is acceptable.

    Command Side

    class="code-comment">// src/order/commands/handlers/create-order.handler.ts
    class="code-keyword">import { CommandHandler, ICommandHandler, EventBus } class="code-keyword">from class="code-string">'@nestjs/cqrs';
    class="code-keyword">import { Inject } class="code-keyword">from class="code-string">'@nestjs/common';
    

    @CommandHandler(CreateOrderCommand) class="code-keyword">export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> { constructor( private readonly eventStore: EventStoreService, ="code-keyword">private readonly eventBus: EventBus, @Inject(class="code-string">'KAFKA_SERVICE') private readonly kafkaClient: ClientKafka, ) {}

    async execute(command: CreateOrderCommand): Promise<string> { class="code-comment">// Create aggregate class="code-keyword">const order = OrderAggregate.create(command);

    class="code-comment">// Persist events class="code-keyword">for (class="code-keyword">const event of order.uncommittedEvents) { await this.eventStore.append(event);

    class="code-comment">// Publish to Kafka class="code-keyword">for other services await this.kafkaClient.emit(class="code-string">'order.events', { key: event.aggregateId, value: JSON.stringify(event), headers: { class="code-string">'event-type': event.eventType, class="code-string">'correlation-id': event.metadata.correlationId, }, }); }

    order.clearUncommittedEvents(); class="code-keyword">return order.id; } }

    typescript

    Query Side with Projections

    class="code-comment">// src/order/projections/order-list.projection.ts
    @Injectable()
    class="code-keyword">export class OrderListProjection {
      constructor(
        @InjectRepository(OrderReadModel)
        private readonly readRepository: Repository<;OrderReadModel>,
      ) {}
    

    @EventsHandler(OrderCreatedEvent, OrderConfirmedEvent, OrderShippedEvent) async handle(event: DomainEvent): Promise<void> { switch (event.eventType) { case class="code-string">'OrderCreated': await this.readRepository.save({ id: event.aggregateId, customerId: event.payload.customerId, status: class="code-string">'PENDING', totalAmount: this.calculateTotal(event.payload.items), itemCount: event.payload.items.length, createdAt: event.metadata.timestamp, updatedAt: event.metadata.timestamp, }); break;

    case class="code-string">'OrderConfirmed': await this.readRepository.update( { id: event.aggregateId }, { status: class="code-string">'CONFIRMED', updatedAt: event.metadata.timestamp } ); break; } } }

    typescript

    Handling Eventual Consistency

    In distributed systems, eventual consistency is not just a limitation—it's a fundamental property we must embrace. The CAP theorem tells us we can't have perfect consistency and availability during network partitions. In practice, most systems choose availability and eventual consistency.

    What is Eventual Consistency?

    When you create an order, the write model is updated immediately. But the read model (the dashboard showing "10 orders today") might take a few hundred milliseconds to reflect the new order. During this window, different parts of your system might show different data.

    This isn't a bug—it's a feature. It enables:

    • Higher availability (services don't wait for each other)
    • Better performance (no distributed locks)
    • Simpler scaling (independent services)

    Strategies for Handling Eventual Consistency

    1. Optimistic UI Updates: Update the UI immediately after a command succeeds, before the read model updates. The user sees instant feedback.

    2. Polling/Refresh: For critical operations, poll the read model until it reflects the expected change.

    3. Event Subscriptions: Use WebSockets to push updates to the UI when projections update.

    4. Compensation: If something goes wrong, publish compensating events to undo the operation.

    Saga Pattern for Distributed Transactions

    The Saga pattern handles distributed transactions across services. Unlike traditional database transactions (BEGIN, COMMIT, ROLLBACK), sagas use a sequence of local transactions with compensating actions for rollback.

    Consider an order fulfillment flow:

  • Reserve Inventory → If fails, stop
  • Process Payment → If fails, release inventory
  • Ship Order → If fails, refund payment and release inventory
  • Send Notification → If fails, log and retry (non-critical)
  • Each step is a local transaction. If step 3 fails, we don't "rollback" in the database sense—we execute compensating transactions (refund, release) to undo the previous steps.

    class="code-comment">// src/sagas/order-fulfillment.saga.ts
    @Injectable()
    class="code-keyword">export class OrderFulfillmentSaga {
      constructor(
        private readonly commandBus: CommandBus,
        @Inject(class="code-string">'KAFKA_SERVICE') private readonly kafkaClient: ClientKafka,
      ) {}
    

    @Saga() orderCreated = (events$: Observable<any>): Observable<ICommand> => { class="code-keyword">return events$.pipe( ofType(OrderCreatedEvent), map(event => { class="code-comment">// Start saga - reserve inventory class="code-keyword">return new ReserveInventoryCommand({ orderId: event.aggregateId, items: event.payload.items, correlationId: event.metadata.correlationId, }); }), ); };

    @EventPattern(class="code-string">'inventory.reserved') async onInventoryReserved(data: InventoryReservedEvent): Promise<void> { class="code-comment">// Continue saga - process payment await this.commandBus.execute( new ProcessPaymentCommand({ orderId: data.orderId, amount: data.totalAmount, correlationId: data.correlationId, }) ); }

    @EventPattern(class="code-string">'payment.failed') async onPaymentFailed(data: PaymentFailedEvent): Promise<void> { class="code-comment">// Compensating transaction - release inventory await this.commandBus.execute( new ReleaseInventoryCommand({ orderId: data.orderId, reason: class="code-string">'Payment failed', correlationId: data.correlationId, }) );

    class="code-comment">// Mark order as failed await this.commandBus.execute( new FailOrderCommand({ orderId: data.orderId, reason: data.failureReason, }) ); } }

    typescript

    Idempotency Handling

    class="code-comment">// src/common/decorators/idempotent.decorator.ts
    @Injectable()
    class="code-keyword">export class IdempotencyGuard implements CanActivate {
      constructor(
        @InjectRedis() private readonly redis: Redis,
      ) {}
    

    async canActivate(context: ExecutionContext): Promise<boolean> { class="code-keyword">const request = context.switchToHttp().getRequest(); class="code-keyword">const idempotencyKey = request.headers[class="code-string">'idempotency-key'];

    class="code-keyword">if (!idempotencyKey) { class="code-keyword">return true; class="code-comment">// No idempotency required }

    class="code-keyword">const existing = await this.redis.get(class="code-string">idempotency:${idempotencyKey});

    class="code-keyword">if (existing) { class="code-comment">// Return cached response class="code-keyword">const response = context.switchToHttp().getResponse(); response.status(class="code-number">200).json(JSON.parse(existing)); class="code-keyword">return false; }

    class="code-keyword">return true; } }

    class="code-comment">// Store response after successful operation @Injectable() class="code-keyword">export class IdempotencyInterceptor implements NestInterceptor { constructor(@InjectRedis() private readonly redis: Redis) {}

    intercept(context: ExecutionContext, next: CallHandler): Observable<any> { class="code-keyword">const request = context.switchToHttp().getRequest(); class="code-keyword">const idempotencyKey = request.headers[class="code-string">'idempotency-key'];

    class="code-keyword">return next.handle().pipe( tap(async (response) => { class="code-keyword">if (idempotencyKey) { await this.redis.setex( class="code-string">idempotency:${idempotencyKey}, class="code-number">86400, class="code-comment">// class="code-number">24 hours JSON.stringify(response) ); } }), ); } }

    typescript

    Production Considerations

    Dead Letter Queue (DLQ)

    class="code-comment">// Handle failed messages
    @Injectable()
    class="code-keyword">export class DeadLetterQueueHandler {
      @EventPattern(class="code-string">'order.events.dlq')
      async handleDeadLetter(
        @Payload() message: any,
        @Ctx() context: KafkaContext,
      ): Promise<void> {
        class="code-keyword">const originalTopic = context.getMessage().headers[class="code-string">'original-topic'];
        class="code-keyword">const failureReason = context.getMessage().headers[class="code-string">'failure-reason'];
        class="code-keyword">const retryCount = parseInt(
          context.getMessage().headers[class="code-string">'retry-count'] || class="code-string">'class="code-number">0'
        );
    

    class="code-comment">// Log class="code-keyword">for investigation this.logger.error(class="code-string">'Dead letter received', { originalTopic, failureReason, retryCount, message, });

    class="code-comment">// Store in database class="code-keyword">for manual review await this.deadLetterRepository.save({ originalTopic, message: JSON.stringify(message), failureReason, retryCount, createdAt: new Date(), });

    class="code-comment">// Alert class="code-keyword">if critical class="code-keyword">if (this.isCriticalEvent(message)) { await this.alertService.sendAlert({ severity: class="code-string">'high', message: class="code-string">Critical event failed: ${message.eventType}, }); } } }

    typescript

    Conclusion

    Event-driven architecture with NestJS and Kafka provides a robust foundation for building scalable microservices. Let's summarize what we've learned and when to apply these patterns.

    Key Takeaways

    Event Sourcing gives you a complete audit trail and enables temporal queries. Use it when you need compliance, debugging capabilities, or when your domain is naturally event-oriented. Avoid it for simple CRUD applications.

    CQRS allows independent scaling of read and write workloads. Use it when reads and writes have different performance requirements, or when you need complex read models. It pairs naturally with Event Sourcing.

    Sagas handle distributed transactions with compensating actions. They're essential for any multi-service workflow where atomicity across services is required. Design your compensating actions carefully—they're as important as the happy path.

    Idempotency ensures exactly-once processing semantics. Always implement idempotency for event handlers. Network failures will cause retries, and without idempotency, you'll process events multiple times.

    When to Use Event-Driven Architecture

    Good fit:

    • High-scale systems with many services
    • Systems requiring loose coupling and independent deployability
    • Domains with complex business workflows (e-commerce, finance, logistics)
    • Systems requiring audit trails and compliance
    Not ideal for:
    • Simple CRUD applications
    • Systems requiring strong consistency (banking ledgers)
    • Small teams without operational capacity for distributed systems
    • Prototypes and MVPs (start simple, evolve later)

    Production Considerations

    Before going to production, ensure you have:

    • Monitoring: Track consumer lag, message throughput, and processing errors
    • Alerting: Alert on consumer group lag exceeding thresholds
    • Dead Letter Queues: Capture and investigate failed messages
    • Schema Registry: Version your event schemas to handle evolution
    • Exactly-Once Semantics: Use Kafka transactions for critical workflows
    The initial complexity pays off with better scalability, maintainability, and resilience in production systems. Start with a single service and event store, prove the pattern works for your domain, then expand incrementally.