CQRS and Event Sourcing Patterns

Command Query Responsibility Segregation and Event Sourcing

1. Introduction to CQRS & Event Sourcing

CQRS (Command Query Responsibility Segregation) and Event Sourcing are architectural patterns that help build scalable, maintainable, and auditable systems by separating concerns and leveraging event-driven architecture.

What is CQRS?

CQRS separates reading and writing operations into different models. This segregation allows each side to be scaled, optimized, and evolved independently.

Key Principles:

  • Separation of Concerns: Commands (writes) and Queries (reads) use different models
  • Independent Scaling: Read and write sides can scale separately
  • Optimized Models: Each model can be optimized for its specific purpose
  • Evolution: Models can evolve independently over time

What is Event Sourcing?

Event Sourcing persists the state of a business entity as a sequence of state-changing events. Instead of storing just the current state, we store the series of events that led to that state.

Key Benefits:

  • Audit Trail: Complete history of all changes
  • Temporal Queries: Ability to query state at any point in time
  • Reproducibility: Recreate state by replaying events
  • Event-Driven: Natural fit for event-driven architectures

2. CQRS Pattern

Traditional vs CQRS Architecture

graph TD A[Traditional CRUD] --> B[Single Model] B --> C[Database] C --> B D[CQRS Architecture] --> E[Write Model] D --> F[Read Model] E --> G[Write Database] F --> H[Read Database] G -.->|Events| I[Event Store] I -.->|Projection| H

CQRS Components

graph LR A[Client] --> B[Command] A --> C[Query] B --> D[Command Handler] C --> E[Query Handler] D --> F[Write Model] F --> G[Event Store] E --> H[Read Model] G --> I[Read Model Projection] I --> H

CQRS Flow Explanation

Write Side (Command):

  1. Client sends command to change state
  2. Command handler validates and processes command
  3. Write model generates events
  4. Events are persisted to event store

Read Side (Query):

  1. Client sends query to read data
  2. Query handler reads from optimized read model
  3. Read model is updated via event projections
  4. Query results are returned to client

3. Event Sourcing

Event Sourcing Architecture

graph TD A[Command] --> B[Aggregate] B --> C[Generate Events] C --> D[Event Store] D --> E[Event Stream] E --> F[Read Model Projections] F --> G[Read Database] D --> H[Replay Events] H --> B

Event Structure

package com.example.cqrs.event;

import java.time.Instant;
import java.util.UUID;

public abstract class DomainEvent {
    private final UUID eventId;
    private final String aggregateId;
    private final Instant timestamp;
    private final int version;
    
    protected DomainEvent(String aggregateId) {
        this.eventId = UUID.randomUUID();
        this.aggregateId = aggregateId;
        this.timestamp = Instant.now();
        this.version = 0;
    }
    
    protected DomainEvent(UUID eventId, String aggregateId, Instant timestamp, int version) {
        this.eventId = eventId;
        this.aggregateId = aggregateId;
        this.timestamp = timestamp;
        this.version = version;
    }
    
    // Getters
    public UUID getEventId() { return eventId; }
    public String getAggregateId() { return aggregateId; }
    public Instant getTimestamp() { return timestamp; }
    public int getVersion() { return version; }
}

Sample Events

package com.example.cqrs.event;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;

public class AccountCreatedEvent extends DomainEvent {
    private final String accountHolder;
    private final String accountNumber;
    
    public AccountCreatedEvent(String aggregateId, String accountHolder, String accountNumber) {
        super(aggregateId);
        this.accountHolder = accountHolder;
        this.accountNumber = accountNumber;
    }
    
    // Constructor for event replay
    public AccountCreatedEvent(UUID eventId, String aggregateId, Instant timestamp, 
                              int version, String accountHolder, String accountNumber) {
        super(eventId, aggregateId, timestamp, version);
        this.accountHolder = accountHolder;
        this.accountNumber = accountNumber;
    }
    
    // Getters
    public String getAccountHolder() { return accountHolder; }
    public String getAccountNumber() { return accountNumber; }
}

public class MoneyDepositedEvent extends DomainEvent {
    private final BigDecimal amount;
    private final String description;
    
    public MoneyDepositedEvent(String aggregateId, BigDecimal amount, String description) {
        super(aggregateId);
        this.amount = amount;
        this.description = description;
    }
    
    // Constructor for event replay
    public MoneyDepositedEvent(UUID eventId, String aggregateId, Instant timestamp, 
                              int version, BigDecimal amount, String description) {
        super(eventId, aggregateId, timestamp, version);
        this.amount = amount;
        this.description = description;
    }
    
    // Getters
    public BigDecimal getAmount() { return amount; }
    public String getDescription() { return description; }
}

public class MoneyWithdrawnEvent extends DomainEvent {
    private final BigDecimal amount;
    private final String description;
    
    public MoneyWithdrawnEvent(String aggregateId, BigDecimal amount, String description) {
        super(aggregateId);
        this.amount = amount;
        this.description = description;
    }
    
    // Constructor for event replay
    public MoneyWithdrawnEvent(UUID eventId, String aggregateId, Instant timestamp, 
                              int version, BigDecimal amount, String description) {
        super(eventId, aggregateId, timestamp, version);
        this.amount = amount;
        this.description = description;
    }
    
    // Getters
    public BigDecimal getAmount() { return amount; }
    public String getDescription() { return description; }
}

4. Banking System Example

Implementing a banking system using CQRS and Event Sourcing patterns.

Use Case Description

Scenario:

A bank needs to manage customer accounts with operations like creating accounts, depositing money, withdrawing money, and generating statements. The system must provide audit trails and support complex queries.

Commands: CreateAccount, DepositMoney, WithdrawMoney

Queries: GetAccountBalance, GetAccountStatement, GetAllAccounts

Aggregate Root

package com.example.cqrs.bank.aggregate;

import com.example.cqrs.event.AccountCreatedEvent;
import com.example.cqrs.event.MoneyDepositedEvent;
import com.example.cqrs.event.MoneyWithdrawnEvent;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

public class BankAccount {
    private String accountId;
    private String accountHolder;
    private String accountNumber;
    private BigDecimal balance;
    private List<DomainEvent> changes;
    
    public BankAccount() {
        this.balance = BigDecimal.ZERO;
        this.changes = new ArrayList<>();
    }
    
    // Constructor for reconstruction from events
    public BankAccount(List<DomainEvent> events) {
        this();
        for (DomainEvent event : events) {
            applyEvent(event);
        }
    }
    
    // Command methods
    public void createAccount(String accountId, String accountHolder, String accountNumber) {
        if (this.accountId != null) {
            throw new IllegalStateException("Account already exists");
        }
        AccountCreatedEvent event = new AccountCreatedEvent(accountId, accountHolder, accountNumber);
        apply(event);
    }
    
    public void deposit(BigDecimal amount, String description) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Deposit amount must be positive");
        }
        MoneyDepositedEvent event = new MoneyDepositedEvent(accountId, amount, description);
        apply(event);
    }
    
    public void withdraw(BigDecimal amount, String description) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Withdrawal amount must be positive");
        }
        if (balance.compareTo(amount) < 0) {
            throw new IllegalStateException("Insufficient funds");
        }
        MoneyWithdrawnEvent event = new MoneyWithdrawnEvent(accountId, amount, description);
        apply(event);
    }
    
    // Apply events to update state
    private void apply(DomainEvent event) {
        applyEvent(event);
        changes.add(event);
    }
    
    private void applyEvent(DomainEvent event) {
        if (event instanceof AccountCreatedEvent) {
            AccountCreatedEvent createdEvent = (AccountCreatedEvent) event;
            this.accountId = createdEvent.getAggregateId();
            this.accountHolder = createdEvent.getAccountHolder();
            this.accountNumber = createdEvent.getAccountNumber();
        } else if (event instanceof MoneyDepositedEvent) {
            MoneyDepositedEvent depositedEvent = (MoneyDepositedEvent) event;
            this.balance = this.balance.add(depositedEvent.getAmount());
        } else if (event instanceof MoneyWithdrawnEvent) {
            MoneyWithdrawnEvent withdrawnEvent = (MoneyWithdrawnEvent) event;
            this.balance = this.balance.subtract(withdrawnEvent.getAmount());
        }
    }
    
    // Get uncommitted changes
    public List<DomainEvent> getUncommittedChanges() {
        return new ArrayList<>(changes);
    }
    
    // Mark changes as committed
    public void markChangesAsCommitted() {
        changes.clear();
    }
    
    // Getters
    public String getAccountId() { return accountId; }
    public String getAccountHolder() { return accountHolder; }
    public String getAccountNumber() { return accountNumber; }
    public BigDecimal getBalance() { return balance; }
}

Commands

package com.example.cqrs.bank.command;

import java.math.BigDecimal;

public class CreateAccountCommand {
    private final String accountId;
    private final String accountHolder;
    private final String accountNumber;
    
    public CreateAccountCommand(String accountId, String accountHolder, String accountNumber) {
        this.accountId = accountId;
        this.accountHolder = accountHolder;
        this.accountNumber = accountNumber;
    }
    
    // Getters
    public String getAccountId() { return accountId; }
    public String getAccountHolder() { return accountHolder; }
    public String getAccountNumber() { return accountNumber; }
}

public class DepositMoneyCommand {
    private final String accountId;
    private final BigDecimal amount;
    private final String description;
    
    public DepositMoneyCommand(String accountId, BigDecimal amount, String description) {
        this.accountId = accountId;
        this.amount = amount;
        this.description = description;
    }
    
    // Getters
    public String getAccountId() { return accountId; }
    public BigDecimal getAmount() { return amount; }
    public String getDescription() { return description; }
}

public class WithdrawMoneyCommand {
    private final String accountId;
    private final BigDecimal amount;
    private final String description;
    
    public WithdrawMoneyCommand(String accountId, BigDecimal amount, String description) {
        this.accountId = accountId;
        this.amount = amount;
        this.description = description;
    }
    
    // Getters
    public String getAccountId() { return accountId; }
    public BigDecimal getAmount() { return amount; }
    public String getDescription() { return description; }
}

Command Handler

package com.example.cqrs.bank.handler;

import com.example.cqrs.bank.aggregate.BankAccount;
import com.example.cqrs.bank.command.CreateAccountCommand;
import com.example.cqrs.bank.command.DepositMoneyCommand;
import com.example.cqrs.bank.command.WithdrawMoneyCommand;
import com.example.cqrs.event.DomainEvent;
import com.example.cqrs.repository.EventStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class BankAccountCommandHandler {
    
    @Autowired
    private EventStore eventStore;
    
    public void handle(CreateAccountCommand command) {
        BankAccount account = new BankAccount();
        account.createAccount(command.getAccountId(), command.getAccountHolder(), command.getAccountNumber());
        saveEvents(account);
    }
    
    public void handle(DepositMoneyCommand command) {
        List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
        BankAccount account = new BankAccount(events);
        account.deposit(command.getAmount(), command.getDescription());
        saveEvents(account);
    }
    
    public void handle(WithdrawMoneyCommand command) {
        List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
        BankAccount account = new BankAccount(events);
        account.withdraw(command.getAmount(), command.getDescription());
        saveEvents(account);
    }
    
    private void saveEvents(BankAccount account) {
        eventStore.saveEvents(account.getAccountId(), account.getUncommittedChanges());
        account.markChangesAsCommitted();
    }
}

Read Models

package com.example.cqrs.bank.readmodel;

import java.math.BigDecimal;
import java.time.Instant;

public class AccountSummary {
    private String accountId;
    private String accountHolder;
    private String accountNumber;
    private BigDecimal balance;
    private Instant lastUpdated;
    
    // Constructors
    public AccountSummary() {}
    
    public AccountSummary(String accountId, String accountHolder, String accountNumber, 
                         BigDecimal balance, Instant lastUpdated) {
        this.accountId = accountId;
        this.accountHolder = accountHolder;
        this.accountNumber = accountNumber;
        this.balance = balance;
        this.lastUpdated = lastUpdated;
    }
    
    // Getters and setters
    public String getAccountId() { return accountId; }
    public void setAccountId(String accountId) { this.accountId = accountId; }
    
    public String getAccountHolder() { return accountHolder; }
    public void setAccountHolder(String accountHolder) { this.accountHolder = accountHolder; }
    
    public String getAccountNumber() { return accountNumber; }
    public void setAccountNumber(String accountNumber) { this.accountNumber = accountNumber; }
    
    public BigDecimal getBalance() { return balance; }
    public void setBalance(BigDecimal balance) { this.balance = balance; }
    
    public Instant getLastUpdated() { return lastUpdated; }
    public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
}

public class AccountStatementEntry {
    private String accountId;
    private BigDecimal amount;
    private String description;
    private Instant timestamp;
    private String type; // DEPOSIT or WITHDRAWAL
    
    // Constructors and getters/setters...
}

Event Handlers for Read Model Projections

package com.example.cqrs.bank.projector;

import com.example.cqrs.event.AccountCreatedEvent;
import com.example.cqrs.event.MoneyDepositedEvent;
import com.example.cqrs.event.MoneyWithdrawnEvent;
import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.repository.ReadModelRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;

@Component
public class AccountSummaryProjector {
    
    @Autowired
    private ReadModelRepository readModelRepository;
    
    public void on(AccountCreatedEvent event) {
        AccountSummary summary = new AccountSummary(
            event.getAggregateId(),
            event.getAccountHolder(),
            event.getAccountNumber(),
            BigDecimal.ZERO,
            event.getTimestamp()
        );
        readModelRepository.saveAccountSummary(summary);
    }
    
    public void on(MoneyDepositedEvent event) {
        AccountSummary summary = readModelRepository.findAccountSummary(event.getAggregateId());
        if (summary != null) {
            BigDecimal newBalance = summary.getBalance().add(event.getAmount());
            summary.setBalance(newBalance);
            summary.setLastUpdated(event.getTimestamp());
            readModelRepository.saveAccountSummary(summary);
        }
    }
    
    public void on(MoneyWithdrawnEvent event) {
        AccountSummary summary = readModelRepository.findAccountSummary(event.getAggregateId());
        if (summary != null) {
            BigDecimal newBalance = summary.getBalance().subtract(event.getAmount());
            summary.setBalance(newBalance);
            summary.setLastUpdated(event.getTimestamp());
            readModelRepository.saveAccountSummary(summary);
        }
    }
}

Queries

package com.example.cqrs.bank.query;

import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.bank.readmodel.AccountStatementEntry;
import com.example.cqrs.repository.ReadModelRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.List;

@Service
public class BankAccountQueryHandler {
    
    @Autowired
    private ReadModelRepository readModelRepository;
    
    public AccountSummary handle(GetAccountQuery query) {
        return readModelRepository.findAccountSummary(query.getAccountId());
    }
    
    public List<AccountSummary> handle(GetAllAccountsQuery query) {
        return readModelRepository.findAllAccounts();
    }
    
    public BigDecimal handle(GetAccountBalanceQuery query) {
        AccountSummary summary = readModelRepository.findAccountSummary(query.getAccountId());
        return summary != null ? summary.getBalance() : BigDecimal.ZERO;
    }
    
    public List<AccountStatementEntry> handle(GetAccountStatementQuery query) {
        return readModelRepository.findAccountStatement(query.getAccountId(), 
                                                       query.getFromDate(), 
                                                       query.getToDate());
    }
}

// Query classes
class GetAccountQuery {
    private final String accountId;
    public GetAccountQuery(String accountId) { this.accountId = accountId; }
    public String getAccountId() { return accountId; }
}

class GetAllAccountsQuery {
    public GetAllAccountsQuery() {}
}

class GetAccountBalanceQuery {
    private final String accountId;
    public GetAccountBalanceQuery(String accountId) { this.accountId = accountId; }
    public String getAccountId() { return accountId; }
}

class GetAccountStatementQuery {
    private final String accountId;
    private final java.time.Instant fromDate;
    private final java.time.Instant toDate;
    
    public GetAccountStatementQuery(String accountId, java.time.Instant fromDate, java.time.Instant toDate) {
        this.accountId = accountId;
        this.fromDate = fromDate;
        this.toDate = toDate;
    }
    
    // Getters...
}

5. E-commerce System Example

Implementing an e-commerce order management system using CQRS and Event Sourcing.

Use Case Description

Scenario:

An e-commerce platform needs to manage orders through their lifecycle: creation, payment processing, shipping, and fulfillment. The system must provide real-time inventory updates and complex reporting.

Commands: CreateOrder, ProcessPayment, ShipOrder, CancelOrder

Queries: GetOrderDetails, GetOrderHistory, GetSalesReport

E-commerce Events

package com.example.cqrs.ecommerce.event;

public class OrderCreatedEvent extends DomainEvent {
    private final String customerId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    
    public OrderCreatedEvent(String aggregateId, String customerId, List<OrderItem> items, BigDecimal totalAmount) {
        super(aggregateId);
        this.customerId = customerId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
    
    // Getters...
}

public class PaymentProcessedEvent extends DomainEvent {
    private final String paymentId;
    private final BigDecimal amount;
    private final String paymentMethod;
    
    public PaymentProcessedEvent(String aggregateId, String paymentId, BigDecimal amount, String paymentMethod) {
        super(aggregateId);
        this.paymentId = paymentId;
        this.amount = amount;
        this.paymentMethod = paymentMethod;
    }
    
    // Getters...
}

public class OrderShippedEvent extends DomainEvent {
    private final String trackingNumber;
    private final String carrier;
    
    public OrderShippedEvent(String aggregateId, String trackingNumber, String carrier) {
        super(aggregateId);
        this.trackingNumber = trackingNumber;
        this.carrier = carrier;
    }
    
    // Getters...
}

public class OrderCancelledEvent extends DomainEvent {
    private final String reason;
    
    public OrderCancelledEvent(String aggregateId, String reason) {
        super(aggregateId);
        this.reason = reason;
    }
    
    // Getters...
}

Order Aggregate

package com.example.cqrs.ecommerce.aggregate;

import com.example.cqrs.ecommerce.event.OrderCreatedEvent;
import com.example.cqrs.ecommerce.event.PaymentProcessedEvent;
import com.example.cqrs.ecommerce.event.OrderShippedEvent;
import com.example.cqrs.ecommerce.event.OrderCancelledEvent;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

public class Order {
    private String orderId;
    private String customerId;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private OrderStatus status;
    private List<DomainEvent> changes;
    
    public enum OrderStatus {
        CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
    }
    
    public Order() {
        this.items = new ArrayList<>();
        this.totalAmount = BigDecimal.ZERO;
        this.status = OrderStatus.CREATED;
        this.changes = new ArrayList<>();
    }
    
    // Constructor for reconstruction from events
    public Order(List<DomainEvent> events) {
        this();
        for (DomainEvent event : events) {
            applyEvent(event);
        }
    }
    
    // Command methods
    public void createOrder(String orderId, String customerId, List<OrderItem> items) {
        if (this.orderId != null) {
            throw new IllegalStateException("Order already exists");
        }
        BigDecimal total = items.stream()
            .map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);
        
        OrderCreatedEvent event = new OrderCreatedEvent(orderId, customerId, items, total);
        apply(event);
    }
    
    public void processPayment(String paymentId, String paymentMethod) {
        if (status != OrderStatus.CREATED) {
            throw new IllegalStateException("Order cannot be paid in current state: " + status);
        }
        PaymentProcessedEvent event = new PaymentProcessedEvent(orderId, paymentId, totalAmount, paymentMethod);
        apply(event);
    }
    
    public void shipOrder(String trackingNumber, String carrier) {
        if (status != OrderStatus.PAID) {
            throw new IllegalStateException("Order must be paid before shipping");
        }
        OrderShippedEvent event = new OrderShippedEvent(orderId, trackingNumber, carrier);
        apply(event);
    }
    
    public void cancelOrder(String reason) {
        if (status == OrderStatus.SHIPPED || status == OrderStatus.DELIVERED) {
            throw new IllegalStateException("Cannot cancel shipped order");
        }
        OrderCancelledEvent event = new OrderCancelledEvent(orderId, reason);
        apply(event);
    }
    
    // Apply events to update state
    private void apply(DomainEvent event) {
        applyEvent(event);
        changes.add(event);
    }
    
    private void applyEvent(DomainEvent event) {
        if (event instanceof OrderCreatedEvent) {
            OrderCreatedEvent createdEvent = (OrderCreatedEvent) event;
            this.orderId = createdEvent.getAggregateId();
            this.customerId = createdEvent.getCustomerId();
            this.items = createdEvent.getItems();
            this.totalAmount = createdEvent.getTotalAmount();
            this.status = OrderStatus.CREATED;
        } else if (event instanceof PaymentProcessedEvent) {
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShippedEvent) {
            this.status = OrderStatus.SHIPPED;
        } else if (event instanceof OrderCancelledEvent) {
            this.status = OrderStatus.CANCELLED;
        }
    }
    
    // Get uncommitted changes
    public List<DomainEvent> getUncommittedChanges() {
        return new ArrayList<>(changes);
    }
    
    // Mark changes as committed
    public void markChangesAsCommitted() {
        changes.clear();
    }
    
    // Getters
    public String getOrderId() { return orderId; }
    public String getCustomerId() { return customerId; }
    public List<OrderItem> getItems() { return items; }
    public BigDecimal getTotalAmount() { return totalAmount; }
    public OrderStatus getStatus() { return status; }
}

E-commerce Commands

package com.example.cqrs.ecommerce.command;

import java.math.BigDecimal;
import java.util.List;

public class CreateOrderCommand {
    private final String orderId;
    private final String customerId;
    private final List<OrderItem> items;
    
    public CreateOrderCommand(String orderId, String customerId, List<OrderItem> items) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.items = items;
    }
    
    // Getters...
}

public class ProcessPaymentCommand {
    private final String orderId;
    private final String paymentId;
    private final String paymentMethod;
    
    public ProcessPaymentCommand(String orderId, String paymentId, String paymentMethod) {
        this.orderId = orderId;
        this.paymentId = paymentId;
        this.paymentMethod = paymentMethod;
    }
    
    // Getters...
}

public class ShipOrderCommand {
    private final String orderId;
    private final String trackingNumber;
    private final String carrier;
    
    public ShipOrderCommand(String orderId, String trackingNumber, String carrier) {
        this.orderId = orderId;
        this.trackingNumber = trackingNumber;
        this.carrier = carrier;
    }
    
    // Getters...
}

public class CancelOrderCommand {
    private final String orderId;
    private final String reason;
    
    public CancelOrderCommand(String orderId, String reason) {
        this.orderId = orderId;
        this.reason = reason;
    }
    
    // Getters...
}

E-commerce Read Models

package com.example.cqrs.ecommerce.readmodel;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;

public class OrderDetails {
    private String orderId;
    private String customerId;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private String status;
    private Instant createdAt;
    private Instant updatedAt;
    private String paymentId;
    private String trackingNumber;
    private String carrier;
    
    // Constructors, getters, and setters...
}

public class SalesReport {
    private Instant periodStart;
    private Instant periodEnd;
    private BigDecimal totalRevenue;
    private int totalOrders;
    private List<ProductSales> productSales;
    
    // Constructors, getters, and setters...
}

public class ProductSales {
    private String productId;
    private String productName;
    private int quantitySold;
    private BigDecimal revenue;
    
    // Constructors, getters, and setters...
}

6. Implementation Details

Event Store Implementation

package com.example.cqrs.repository;

import com.example.cqrs.event.DomainEvent;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Repository
public class InMemoryEventStore implements EventStore {
    private final ConcurrentHashMap<String, List<DomainEvent>> eventStreams = new ConcurrentHashMap<>();
    private final Object lock = new Object();
    
    @Override
    public void saveEvents(String aggregateId, List<DomainEvent> events) {
        synchronized (lock) {
            List<DomainEvent> stream = eventStreams.computeIfAbsent(aggregateId, k -> new CopyOnWriteArrayList<>());
            stream.addAll(events);
        }
    }
    
    @Override
    public List<DomainEvent> getEventsForAggregate(String aggregateId) {
        List<DomainEvent> stream = eventStreams.get(aggregateId);
        if (stream == null) {
            throw new RuntimeException("Aggregate not found: " + aggregateId);
        }
        return stream;
    }
    
    @Override
    public List<DomainEvent> getAllEvents() {
        return eventStreams.values().stream()
            .flatMap(List::stream)
            .collect(Collectors.toList());
    }
}

Read Model Repository

package com.example.cqrs.repository;

import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.bank.readmodel.AccountStatementEntry;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

@Repository
public class InMemoryReadModelRepository implements ReadModelRepository {
    private final ConcurrentHashMap<String, AccountSummary> accountSummaries = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, List<AccountStatementEntry>> accountStatements = new ConcurrentHashMap<>();
    
    @Override
    public void saveAccountSummary(AccountSummary summary) {
        accountSummaries.put(summary.getAccountId(), summary);
    }
    
    @Override
    public AccountSummary findAccountSummary(String accountId) {
        return accountSummaries.get(accountId);
    }
    
    @Override
    public List<AccountSummary> findAllAccounts() {
        return new CopyOnWriteArrayList<>(accountSummaries.values());
    }
    
    @Override
    public void saveAccountStatementEntry(String accountId, AccountStatementEntry entry) {
        List<AccountStatementEntry> entries = accountStatements.computeIfAbsent(
            accountId, k -> new CopyOnWriteArrayList<>());
        entries.add(entry);
    }
    
    @Override
    public List<AccountStatementEntry> findAccountStatement(String accountId, Instant fromDate, Instant toDate) {
        List<AccountStatementEntry> entries = accountStatements.get(accountId);
        if (entries == null) {
            return new CopyOnWriteArrayList<>();
        }
        return entries.stream()
            .filter(entry -> !entry.getTimestamp().isBefore(fromDate) && !entry.getTimestamp().isAfter(toDate))
            .collect(Collectors.toList());
    }
}

Event Bus Implementation

package com.example.cqrs.infrastructure;

import com.example.cqrs.event.DomainEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@Component
public class SimpleEventBus {
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    public void publish(DomainEvent event) {
        eventPublisher.publishEvent(event);
    }
    
    public void subscribe(Object listener) {
        // In a real implementation, you would register listeners
        // This is simplified for demonstration
    }
}

Application Bootstrap

package com.example.cqrs;

import com.example.cqrs.bank.command.CreateAccountCommand;
import com.example.cqrs.bank.command.DepositMoneyCommand;
import com.example.cqrs.bank.command.WithdrawMoneyCommand;
import com.example.cqrs.bank.handler.BankAccountCommandHandler;
import com.example.cqrs.bank.query.BankAccountQueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import java.math.BigDecimal;

@SpringBootApplication
@ComponentScan(basePackages = "com.example.cqrs")
public class CqrsApplication implements CommandLineRunner {
    
    @Autowired
    private BankAccountCommandHandler commandHandler;
    
    @Autowired
    private BankAccountQueryHandler queryHandler;
    
    public static void main(String[] args) {
        SpringApplication.run(CqrsApplication.class, args);
    }
    
    @Override
    public void run(String... args) throws Exception {
        // Demo the banking system
        System.out.println("=== CQRS & Event Sourcing Demo ===");
        
        // Create account
        String accountId = "ACC-001";
        commandHandler.handle(new CreateAccountCommand(accountId, "John Doe", "123456789"));
        System.out.println("Account created for John Doe");
        
        // Deposit money
        commandHandler.handle(new DepositMoneyCommand(accountId, new BigDecimal("1000.00"), "Initial deposit"));
        System.out.println("Deposited $1000.00");
        
        // Withdraw money
        commandHandler.handle(new WithdrawMoneyCommand(accountId, new BigDecimal("250.00"), "ATM withdrawal"));
        System.out.println("Withdrew $250.00");
        
        // Query account balance
        BigDecimal balance = queryHandler.handle(new GetAccountBalanceQuery(accountId));
        System.out.println("Current balance: $" + balance);
        
        // Query account details
        AccountSummary summary = queryHandler.handle(new GetAccountQuery(accountId));
        System.out.println("Account holder: " + summary.getAccountHolder());
        System.out.println("Account number: " + summary.getAccountNumber());
    }
}

7. Best Practices

CQRS Best Practices

Eventual Consistency

Accept that read models may lag behind write models

Immutable Events

Events should never be modified once stored

Event Versioning

Plan for event schema evolution

Implementation Guidelines

graph TD A[Design Phase] --> B[Identify Aggregates] B --> C[Define Events] C --> D[Create Commands] D --> E[Build Read Models] E --> F[Implement Handlers] F --> G[Add Projections] G --> H[Test & Deploy]

Anti-Patterns to Avoid

// DON'T: Mix commands and queries in the same handler
public class BadBankAccountHandler {
    public AccountSummary handle(GetAccountQuery query) {
        // Query logic
        return accountRepository.findById(query.getAccountId());
    }
    
    public void handle(DepositMoneyCommand command) {
        // Command logic mixed with query
        Account account = accountRepository.findById(command.getAccountId());
        account.setBalance(account.getBalance().add(command.getAmount()));
        accountRepository.save(account); // Direct mutation - no events!
    }
}

// DO: Separate commands and queries properly
public class GoodCommandHandler {
    public void handle(DepositMoneyCommand command) {
        // Load aggregate from events
        List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
        BankAccount account = new BankAccount(events);
        
        // Apply business logic
        account.deposit(command.getAmount(), command.getDescription());
        
        // Store events
        eventStore.saveEvents(account.getAccountId(), account.getUncommittedChanges());
    }
}

public class GoodQueryHandler {
    public AccountSummary handle(GetAccountQuery query) {
        // Query optimized read model
        return readModelRepository.findAccountSummary(query.getAccountId());
    }
}

Performance Considerations

Optimization Strategies:

  • Snapshotting: Periodically save aggregate state to avoid replaying all events
  • Read Model Caching: Cache frequently accessed read models
  • Projection Parallelization: Process events in parallel for different aggregates
  • Event Compaction: Remove obsolete events that no longer affect current state
  • Indexing: Create indexes on event store for efficient querying

Monitoring and Debugging

package com.example.cqrs.monitoring;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class CqrsMetrics {
    private static final Logger logger = LoggerFactory.getLogger(CqrsMetrics.class);
    
    public void logCommandEvent(String aggregateId, String eventType, long processingTime) {
        logger.info("Command processed - Aggregate: {}, Event: {}, Time: {}ms", 
                   aggregateId, eventType, processingTime);
    }
    
    public void logQuery(String queryType, long executionTime) {
        logger.info("Query executed - Type: {}, Time: {}ms", queryType, executionTime);
    }
    
    public void logProjection(String eventType, long projectionTime) {
        logger.info("Projection applied - Event: {}, Time: {}ms", eventType, projectionTime);
    }
}

When to Use CQRS & Event Sourcing:

  • Complex Business Logic: Domains with intricate business rules
  • Audit Requirements: Systems requiring complete change history
  • High Scalability: Applications needing independent scaling of reads and writes
  • Collaborative Domains: Multiple users modifying the same data
  • Temporal Queries: Need to query historical states