CQRS and Event Sourcing Patterns
Command Query Responsibility Segregation and Event Sourcing
1. Introduction to CQRS & Event Sourcing
CQRS (Command Query Responsibility Segregation) and Event Sourcing are architectural patterns that help build scalable, maintainable, and auditable systems by separating concerns and leveraging event-driven architecture.
What is CQRS?
CQRS separates reading and writing operations into different models. This segregation allows each side to be scaled, optimized, and evolved independently.
Key Principles:
- Separation of Concerns: Commands (writes) and Queries (reads) use different models
- Independent Scaling: Read and write sides can scale separately
- Optimized Models: Each model can be optimized for its specific purpose
- Evolution: Models can evolve independently over time
What is Event Sourcing?
Event Sourcing persists the state of a business entity as a sequence of state-changing events. Instead of storing just the current state, we store the series of events that led to that state.
Key Benefits:
- Audit Trail: Complete history of all changes
- Temporal Queries: Ability to query state at any point in time
- Reproducibility: Recreate state by replaying events
- Event-Driven: Natural fit for event-driven architectures
2. CQRS Pattern
Traditional vs CQRS Architecture
CQRS Components
CQRS Flow Explanation
Write Side (Command):
- Client sends command to change state
- Command handler validates and processes command
- Write model generates events
- Events are persisted to event store
Read Side (Query):
- Client sends query to read data
- Query handler reads from optimized read model
- Read model is updated via event projections
- Query results are returned to client
3. Event Sourcing
Event Sourcing Architecture
Event Structure
package com.example.cqrs.event;
import java.time.Instant;
import java.util.UUID;
public abstract class DomainEvent {
private final UUID eventId;
private final String aggregateId;
private final Instant timestamp;
private final int version;
protected DomainEvent(String aggregateId) {
this.eventId = UUID.randomUUID();
this.aggregateId = aggregateId;
this.timestamp = Instant.now();
this.version = 0;
}
protected DomainEvent(UUID eventId, String aggregateId, Instant timestamp, int version) {
this.eventId = eventId;
this.aggregateId = aggregateId;
this.timestamp = timestamp;
this.version = version;
}
// Getters
public UUID getEventId() { return eventId; }
public String getAggregateId() { return aggregateId; }
public Instant getTimestamp() { return timestamp; }
public int getVersion() { return version; }
}
Sample Events
package com.example.cqrs.event;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;
public class AccountCreatedEvent extends DomainEvent {
private final String accountHolder;
private final String accountNumber;
public AccountCreatedEvent(String aggregateId, String accountHolder, String accountNumber) {
super(aggregateId);
this.accountHolder = accountHolder;
this.accountNumber = accountNumber;
}
// Constructor for event replay
public AccountCreatedEvent(UUID eventId, String aggregateId, Instant timestamp,
int version, String accountHolder, String accountNumber) {
super(eventId, aggregateId, timestamp, version);
this.accountHolder = accountHolder;
this.accountNumber = accountNumber;
}
// Getters
public String getAccountHolder() { return accountHolder; }
public String getAccountNumber() { return accountNumber; }
}
public class MoneyDepositedEvent extends DomainEvent {
private final BigDecimal amount;
private final String description;
public MoneyDepositedEvent(String aggregateId, BigDecimal amount, String description) {
super(aggregateId);
this.amount = amount;
this.description = description;
}
// Constructor for event replay
public MoneyDepositedEvent(UUID eventId, String aggregateId, Instant timestamp,
int version, BigDecimal amount, String description) {
super(eventId, aggregateId, timestamp, version);
this.amount = amount;
this.description = description;
}
// Getters
public BigDecimal getAmount() { return amount; }
public String getDescription() { return description; }
}
public class MoneyWithdrawnEvent extends DomainEvent {
private final BigDecimal amount;
private final String description;
public MoneyWithdrawnEvent(String aggregateId, BigDecimal amount, String description) {
super(aggregateId);
this.amount = amount;
this.description = description;
}
// Constructor for event replay
public MoneyWithdrawnEvent(UUID eventId, String aggregateId, Instant timestamp,
int version, BigDecimal amount, String description) {
super(eventId, aggregateId, timestamp, version);
this.amount = amount;
this.description = description;
}
// Getters
public BigDecimal getAmount() { return amount; }
public String getDescription() { return description; }
}
4. Banking System Example
Implementing a banking system using CQRS and Event Sourcing patterns.
Use Case Description
Scenario:
A bank needs to manage customer accounts with operations like creating accounts, depositing money, withdrawing money, and generating statements. The system must provide audit trails and support complex queries.
Commands: CreateAccount, DepositMoney, WithdrawMoney
Queries: GetAccountBalance, GetAccountStatement, GetAllAccounts
Aggregate Root
package com.example.cqrs.bank.aggregate;
import com.example.cqrs.event.AccountCreatedEvent;
import com.example.cqrs.event.MoneyDepositedEvent;
import com.example.cqrs.event.MoneyWithdrawnEvent;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
public class BankAccount {
private String accountId;
private String accountHolder;
private String accountNumber;
private BigDecimal balance;
private List<DomainEvent> changes;
public BankAccount() {
this.balance = BigDecimal.ZERO;
this.changes = new ArrayList<>();
}
// Constructor for reconstruction from events
public BankAccount(List<DomainEvent> events) {
this();
for (DomainEvent event : events) {
applyEvent(event);
}
}
// Command methods
public void createAccount(String accountId, String accountHolder, String accountNumber) {
if (this.accountId != null) {
throw new IllegalStateException("Account already exists");
}
AccountCreatedEvent event = new AccountCreatedEvent(accountId, accountHolder, accountNumber);
apply(event);
}
public void deposit(BigDecimal amount, String description) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("Deposit amount must be positive");
}
MoneyDepositedEvent event = new MoneyDepositedEvent(accountId, amount, description);
apply(event);
}
public void withdraw(BigDecimal amount, String description) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("Withdrawal amount must be positive");
}
if (balance.compareTo(amount) < 0) {
throw new IllegalStateException("Insufficient funds");
}
MoneyWithdrawnEvent event = new MoneyWithdrawnEvent(accountId, amount, description);
apply(event);
}
// Apply events to update state
private void apply(DomainEvent event) {
applyEvent(event);
changes.add(event);
}
private void applyEvent(DomainEvent event) {
if (event instanceof AccountCreatedEvent) {
AccountCreatedEvent createdEvent = (AccountCreatedEvent) event;
this.accountId = createdEvent.getAggregateId();
this.accountHolder = createdEvent.getAccountHolder();
this.accountNumber = createdEvent.getAccountNumber();
} else if (event instanceof MoneyDepositedEvent) {
MoneyDepositedEvent depositedEvent = (MoneyDepositedEvent) event;
this.balance = this.balance.add(depositedEvent.getAmount());
} else if (event instanceof MoneyWithdrawnEvent) {
MoneyWithdrawnEvent withdrawnEvent = (MoneyWithdrawnEvent) event;
this.balance = this.balance.subtract(withdrawnEvent.getAmount());
}
}
// Get uncommitted changes
public List<DomainEvent> getUncommittedChanges() {
return new ArrayList<>(changes);
}
// Mark changes as committed
public void markChangesAsCommitted() {
changes.clear();
}
// Getters
public String getAccountId() { return accountId; }
public String getAccountHolder() { return accountHolder; }
public String getAccountNumber() { return accountNumber; }
public BigDecimal getBalance() { return balance; }
}
Commands
package com.example.cqrs.bank.command;
import java.math.BigDecimal;
public class CreateAccountCommand {
private final String accountId;
private final String accountHolder;
private final String accountNumber;
public CreateAccountCommand(String accountId, String accountHolder, String accountNumber) {
this.accountId = accountId;
this.accountHolder = accountHolder;
this.accountNumber = accountNumber;
}
// Getters
public String getAccountId() { return accountId; }
public String getAccountHolder() { return accountHolder; }
public String getAccountNumber() { return accountNumber; }
}
public class DepositMoneyCommand {
private final String accountId;
private final BigDecimal amount;
private final String description;
public DepositMoneyCommand(String accountId, BigDecimal amount, String description) {
this.accountId = accountId;
this.amount = amount;
this.description = description;
}
// Getters
public String getAccountId() { return accountId; }
public BigDecimal getAmount() { return amount; }
public String getDescription() { return description; }
}
public class WithdrawMoneyCommand {
private final String accountId;
private final BigDecimal amount;
private final String description;
public WithdrawMoneyCommand(String accountId, BigDecimal amount, String description) {
this.accountId = accountId;
this.amount = amount;
this.description = description;
}
// Getters
public String getAccountId() { return accountId; }
public BigDecimal getAmount() { return amount; }
public String getDescription() { return description; }
}
Command Handler
package com.example.cqrs.bank.handler;
import com.example.cqrs.bank.aggregate.BankAccount;
import com.example.cqrs.bank.command.CreateAccountCommand;
import com.example.cqrs.bank.command.DepositMoneyCommand;
import com.example.cqrs.bank.command.WithdrawMoneyCommand;
import com.example.cqrs.event.DomainEvent;
import com.example.cqrs.repository.EventStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class BankAccountCommandHandler {
@Autowired
private EventStore eventStore;
public void handle(CreateAccountCommand command) {
BankAccount account = new BankAccount();
account.createAccount(command.getAccountId(), command.getAccountHolder(), command.getAccountNumber());
saveEvents(account);
}
public void handle(DepositMoneyCommand command) {
List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
BankAccount account = new BankAccount(events);
account.deposit(command.getAmount(), command.getDescription());
saveEvents(account);
}
public void handle(WithdrawMoneyCommand command) {
List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
BankAccount account = new BankAccount(events);
account.withdraw(command.getAmount(), command.getDescription());
saveEvents(account);
}
private void saveEvents(BankAccount account) {
eventStore.saveEvents(account.getAccountId(), account.getUncommittedChanges());
account.markChangesAsCommitted();
}
}
Read Models
package com.example.cqrs.bank.readmodel;
import java.math.BigDecimal;
import java.time.Instant;
public class AccountSummary {
private String accountId;
private String accountHolder;
private String accountNumber;
private BigDecimal balance;
private Instant lastUpdated;
// Constructors
public AccountSummary() {}
public AccountSummary(String accountId, String accountHolder, String accountNumber,
BigDecimal balance, Instant lastUpdated) {
this.accountId = accountId;
this.accountHolder = accountHolder;
this.accountNumber = accountNumber;
this.balance = balance;
this.lastUpdated = lastUpdated;
}
// Getters and setters
public String getAccountId() { return accountId; }
public void setAccountId(String accountId) { this.accountId = accountId; }
public String getAccountHolder() { return accountHolder; }
public void setAccountHolder(String accountHolder) { this.accountHolder = accountHolder; }
public String getAccountNumber() { return accountNumber; }
public void setAccountNumber(String accountNumber) { this.accountNumber = accountNumber; }
public BigDecimal getBalance() { return balance; }
public void setBalance(BigDecimal balance) { this.balance = balance; }
public Instant getLastUpdated() { return lastUpdated; }
public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
}
public class AccountStatementEntry {
private String accountId;
private BigDecimal amount;
private String description;
private Instant timestamp;
private String type; // DEPOSIT or WITHDRAWAL
// Constructors and getters/setters...
}
Event Handlers for Read Model Projections
package com.example.cqrs.bank.projector;
import com.example.cqrs.event.AccountCreatedEvent;
import com.example.cqrs.event.MoneyDepositedEvent;
import com.example.cqrs.event.MoneyWithdrawnEvent;
import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.repository.ReadModelRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@Component
public class AccountSummaryProjector {
@Autowired
private ReadModelRepository readModelRepository;
public void on(AccountCreatedEvent event) {
AccountSummary summary = new AccountSummary(
event.getAggregateId(),
event.getAccountHolder(),
event.getAccountNumber(),
BigDecimal.ZERO,
event.getTimestamp()
);
readModelRepository.saveAccountSummary(summary);
}
public void on(MoneyDepositedEvent event) {
AccountSummary summary = readModelRepository.findAccountSummary(event.getAggregateId());
if (summary != null) {
BigDecimal newBalance = summary.getBalance().add(event.getAmount());
summary.setBalance(newBalance);
summary.setLastUpdated(event.getTimestamp());
readModelRepository.saveAccountSummary(summary);
}
}
public void on(MoneyWithdrawnEvent event) {
AccountSummary summary = readModelRepository.findAccountSummary(event.getAggregateId());
if (summary != null) {
BigDecimal newBalance = summary.getBalance().subtract(event.getAmount());
summary.setBalance(newBalance);
summary.setLastUpdated(event.getTimestamp());
readModelRepository.saveAccountSummary(summary);
}
}
}
Queries
package com.example.cqrs.bank.query;
import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.bank.readmodel.AccountStatementEntry;
import com.example.cqrs.repository.ReadModelRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.List;
@Service
public class BankAccountQueryHandler {
@Autowired
private ReadModelRepository readModelRepository;
public AccountSummary handle(GetAccountQuery query) {
return readModelRepository.findAccountSummary(query.getAccountId());
}
public List<AccountSummary> handle(GetAllAccountsQuery query) {
return readModelRepository.findAllAccounts();
}
public BigDecimal handle(GetAccountBalanceQuery query) {
AccountSummary summary = readModelRepository.findAccountSummary(query.getAccountId());
return summary != null ? summary.getBalance() : BigDecimal.ZERO;
}
public List<AccountStatementEntry> handle(GetAccountStatementQuery query) {
return readModelRepository.findAccountStatement(query.getAccountId(),
query.getFromDate(),
query.getToDate());
}
}
// Query classes
class GetAccountQuery {
private final String accountId;
public GetAccountQuery(String accountId) { this.accountId = accountId; }
public String getAccountId() { return accountId; }
}
class GetAllAccountsQuery {
public GetAllAccountsQuery() {}
}
class GetAccountBalanceQuery {
private final String accountId;
public GetAccountBalanceQuery(String accountId) { this.accountId = accountId; }
public String getAccountId() { return accountId; }
}
class GetAccountStatementQuery {
private final String accountId;
private final java.time.Instant fromDate;
private final java.time.Instant toDate;
public GetAccountStatementQuery(String accountId, java.time.Instant fromDate, java.time.Instant toDate) {
this.accountId = accountId;
this.fromDate = fromDate;
this.toDate = toDate;
}
// Getters...
}
5. E-commerce System Example
Implementing an e-commerce order management system using CQRS and Event Sourcing.
Use Case Description
Scenario:
An e-commerce platform needs to manage orders through their lifecycle: creation, payment processing, shipping, and fulfillment. The system must provide real-time inventory updates and complex reporting.
Commands: CreateOrder, ProcessPayment, ShipOrder, CancelOrder
Queries: GetOrderDetails, GetOrderHistory, GetSalesReport
E-commerce Events
package com.example.cqrs.ecommerce.event;
public class OrderCreatedEvent extends DomainEvent {
private final String customerId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
public OrderCreatedEvent(String aggregateId, String customerId, List<OrderItem> items, BigDecimal totalAmount) {
super(aggregateId);
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
}
// Getters...
}
public class PaymentProcessedEvent extends DomainEvent {
private final String paymentId;
private final BigDecimal amount;
private final String paymentMethod;
public PaymentProcessedEvent(String aggregateId, String paymentId, BigDecimal amount, String paymentMethod) {
super(aggregateId);
this.paymentId = paymentId;
this.amount = amount;
this.paymentMethod = paymentMethod;
}
// Getters...
}
public class OrderShippedEvent extends DomainEvent {
private final String trackingNumber;
private final String carrier;
public OrderShippedEvent(String aggregateId, String trackingNumber, String carrier) {
super(aggregateId);
this.trackingNumber = trackingNumber;
this.carrier = carrier;
}
// Getters...
}
public class OrderCancelledEvent extends DomainEvent {
private final String reason;
public OrderCancelledEvent(String aggregateId, String reason) {
super(aggregateId);
this.reason = reason;
}
// Getters...
}
Order Aggregate
package com.example.cqrs.ecommerce.aggregate;
import com.example.cqrs.ecommerce.event.OrderCreatedEvent;
import com.example.cqrs.ecommerce.event.PaymentProcessedEvent;
import com.example.cqrs.ecommerce.event.OrderShippedEvent;
import com.example.cqrs.ecommerce.event.OrderCancelledEvent;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
public class Order {
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private OrderStatus status;
private List<DomainEvent> changes;
public enum OrderStatus {
CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}
public Order() {
this.items = new ArrayList<>();
this.totalAmount = BigDecimal.ZERO;
this.status = OrderStatus.CREATED;
this.changes = new ArrayList<>();
}
// Constructor for reconstruction from events
public Order(List<DomainEvent> events) {
this();
for (DomainEvent event : events) {
applyEvent(event);
}
}
// Command methods
public void createOrder(String orderId, String customerId, List<OrderItem> items) {
if (this.orderId != null) {
throw new IllegalStateException("Order already exists");
}
BigDecimal total = items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
OrderCreatedEvent event = new OrderCreatedEvent(orderId, customerId, items, total);
apply(event);
}
public void processPayment(String paymentId, String paymentMethod) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Order cannot be paid in current state: " + status);
}
PaymentProcessedEvent event = new PaymentProcessedEvent(orderId, paymentId, totalAmount, paymentMethod);
apply(event);
}
public void shipOrder(String trackingNumber, String carrier) {
if (status != OrderStatus.PAID) {
throw new IllegalStateException("Order must be paid before shipping");
}
OrderShippedEvent event = new OrderShippedEvent(orderId, trackingNumber, carrier);
apply(event);
}
public void cancelOrder(String reason) {
if (status == OrderStatus.SHIPPED || status == OrderStatus.DELIVERED) {
throw new IllegalStateException("Cannot cancel shipped order");
}
OrderCancelledEvent event = new OrderCancelledEvent(orderId, reason);
apply(event);
}
// Apply events to update state
private void apply(DomainEvent event) {
applyEvent(event);
changes.add(event);
}
private void applyEvent(DomainEvent event) {
if (event instanceof OrderCreatedEvent) {
OrderCreatedEvent createdEvent = (OrderCreatedEvent) event;
this.orderId = createdEvent.getAggregateId();
this.customerId = createdEvent.getCustomerId();
this.items = createdEvent.getItems();
this.totalAmount = createdEvent.getTotalAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof PaymentProcessedEvent) {
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShippedEvent) {
this.status = OrderStatus.SHIPPED;
} else if (event instanceof OrderCancelledEvent) {
this.status = OrderStatus.CANCELLED;
}
}
// Get uncommitted changes
public List<DomainEvent> getUncommittedChanges() {
return new ArrayList<>(changes);
}
// Mark changes as committed
public void markChangesAsCommitted() {
changes.clear();
}
// Getters
public String getOrderId() { return orderId; }
public String getCustomerId() { return customerId; }
public List<OrderItem> getItems() { return items; }
public BigDecimal getTotalAmount() { return totalAmount; }
public OrderStatus getStatus() { return status; }
}
E-commerce Commands
package com.example.cqrs.ecommerce.command;
import java.math.BigDecimal;
import java.util.List;
public class CreateOrderCommand {
private final String orderId;
private final String customerId;
private final List<OrderItem> items;
public CreateOrderCommand(String orderId, String customerId, List<OrderItem> items) {
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
}
// Getters...
}
public class ProcessPaymentCommand {
private final String orderId;
private final String paymentId;
private final String paymentMethod;
public ProcessPaymentCommand(String orderId, String paymentId, String paymentMethod) {
this.orderId = orderId;
this.paymentId = paymentId;
this.paymentMethod = paymentMethod;
}
// Getters...
}
public class ShipOrderCommand {
private final String orderId;
private final String trackingNumber;
private final String carrier;
public ShipOrderCommand(String orderId, String trackingNumber, String carrier) {
this.orderId = orderId;
this.trackingNumber = trackingNumber;
this.carrier = carrier;
}
// Getters...
}
public class CancelOrderCommand {
private final String orderId;
private final String reason;
public CancelOrderCommand(String orderId, String reason) {
this.orderId = orderId;
this.reason = reason;
}
// Getters...
}
E-commerce Read Models
package com.example.cqrs.ecommerce.readmodel;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
public class OrderDetails {
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String status;
private Instant createdAt;
private Instant updatedAt;
private String paymentId;
private String trackingNumber;
private String carrier;
// Constructors, getters, and setters...
}
public class SalesReport {
private Instant periodStart;
private Instant periodEnd;
private BigDecimal totalRevenue;
private int totalOrders;
private List<ProductSales> productSales;
// Constructors, getters, and setters...
}
public class ProductSales {
private String productId;
private String productName;
private int quantitySold;
private BigDecimal revenue;
// Constructors, getters, and setters...
}
6. Implementation Details
Event Store Implementation
package com.example.cqrs.repository;
import com.example.cqrs.event.DomainEvent;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Repository
public class InMemoryEventStore implements EventStore {
private final ConcurrentHashMap<String, List<DomainEvent>> eventStreams = new ConcurrentHashMap<>();
private final Object lock = new Object();
@Override
public void saveEvents(String aggregateId, List<DomainEvent> events) {
synchronized (lock) {
List<DomainEvent> stream = eventStreams.computeIfAbsent(aggregateId, k -> new CopyOnWriteArrayList<>());
stream.addAll(events);
}
}
@Override
public List<DomainEvent> getEventsForAggregate(String aggregateId) {
List<DomainEvent> stream = eventStreams.get(aggregateId);
if (stream == null) {
throw new RuntimeException("Aggregate not found: " + aggregateId);
}
return stream;
}
@Override
public List<DomainEvent> getAllEvents() {
return eventStreams.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
}
}
Read Model Repository
package com.example.cqrs.repository;
import com.example.cqrs.bank.readmodel.AccountSummary;
import com.example.cqrs.bank.readmodel.AccountStatementEntry;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@Repository
public class InMemoryReadModelRepository implements ReadModelRepository {
private final ConcurrentHashMap<String, AccountSummary> accountSummaries = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<AccountStatementEntry>> accountStatements = new ConcurrentHashMap<>();
@Override
public void saveAccountSummary(AccountSummary summary) {
accountSummaries.put(summary.getAccountId(), summary);
}
@Override
public AccountSummary findAccountSummary(String accountId) {
return accountSummaries.get(accountId);
}
@Override
public List<AccountSummary> findAllAccounts() {
return new CopyOnWriteArrayList<>(accountSummaries.values());
}
@Override
public void saveAccountStatementEntry(String accountId, AccountStatementEntry entry) {
List<AccountStatementEntry> entries = accountStatements.computeIfAbsent(
accountId, k -> new CopyOnWriteArrayList<>());
entries.add(entry);
}
@Override
public List<AccountStatementEntry> findAccountStatement(String accountId, Instant fromDate, Instant toDate) {
List<AccountStatementEntry> entries = accountStatements.get(accountId);
if (entries == null) {
return new CopyOnWriteArrayList<>();
}
return entries.stream()
.filter(entry -> !entry.getTimestamp().isBefore(fromDate) && !entry.getTimestamp().isAfter(toDate))
.collect(Collectors.toList());
}
}
Event Bus Implementation
package com.example.cqrs.infrastructure;
import com.example.cqrs.event.DomainEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@Component
public class SimpleEventBus {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void publish(DomainEvent event) {
eventPublisher.publishEvent(event);
}
public void subscribe(Object listener) {
// In a real implementation, you would register listeners
// This is simplified for demonstration
}
}
Application Bootstrap
package com.example.cqrs;
import com.example.cqrs.bank.command.CreateAccountCommand;
import com.example.cqrs.bank.command.DepositMoneyCommand;
import com.example.cqrs.bank.command.WithdrawMoneyCommand;
import com.example.cqrs.bank.handler.BankAccountCommandHandler;
import com.example.cqrs.bank.query.BankAccountQueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import java.math.BigDecimal;
@SpringBootApplication
@ComponentScan(basePackages = "com.example.cqrs")
public class CqrsApplication implements CommandLineRunner {
@Autowired
private BankAccountCommandHandler commandHandler;
@Autowired
private BankAccountQueryHandler queryHandler;
public static void main(String[] args) {
SpringApplication.run(CqrsApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Demo the banking system
System.out.println("=== CQRS & Event Sourcing Demo ===");
// Create account
String accountId = "ACC-001";
commandHandler.handle(new CreateAccountCommand(accountId, "John Doe", "123456789"));
System.out.println("Account created for John Doe");
// Deposit money
commandHandler.handle(new DepositMoneyCommand(accountId, new BigDecimal("1000.00"), "Initial deposit"));
System.out.println("Deposited $1000.00");
// Withdraw money
commandHandler.handle(new WithdrawMoneyCommand(accountId, new BigDecimal("250.00"), "ATM withdrawal"));
System.out.println("Withdrew $250.00");
// Query account balance
BigDecimal balance = queryHandler.handle(new GetAccountBalanceQuery(accountId));
System.out.println("Current balance: $" + balance);
// Query account details
AccountSummary summary = queryHandler.handle(new GetAccountQuery(accountId));
System.out.println("Account holder: " + summary.getAccountHolder());
System.out.println("Account number: " + summary.getAccountNumber());
}
}
7. Best Practices
CQRS Best Practices
Eventual Consistency
Accept that read models may lag behind write models
Immutable Events
Events should never be modified once stored
Event Versioning
Plan for event schema evolution
Implementation Guidelines
Anti-Patterns to Avoid
// DON'T: Mix commands and queries in the same handler
public class BadBankAccountHandler {
public AccountSummary handle(GetAccountQuery query) {
// Query logic
return accountRepository.findById(query.getAccountId());
}
public void handle(DepositMoneyCommand command) {
// Command logic mixed with query
Account account = accountRepository.findById(command.getAccountId());
account.setBalance(account.getBalance().add(command.getAmount()));
accountRepository.save(account); // Direct mutation - no events!
}
}
// DO: Separate commands and queries properly
public class GoodCommandHandler {
public void handle(DepositMoneyCommand command) {
// Load aggregate from events
List<DomainEvent> events = eventStore.getEventsForAggregate(command.getAccountId());
BankAccount account = new BankAccount(events);
// Apply business logic
account.deposit(command.getAmount(), command.getDescription());
// Store events
eventStore.saveEvents(account.getAccountId(), account.getUncommittedChanges());
}
}
public class GoodQueryHandler {
public AccountSummary handle(GetAccountQuery query) {
// Query optimized read model
return readModelRepository.findAccountSummary(query.getAccountId());
}
}
Performance Considerations
Optimization Strategies:
- Snapshotting: Periodically save aggregate state to avoid replaying all events
- Read Model Caching: Cache frequently accessed read models
- Projection Parallelization: Process events in parallel for different aggregates
- Event Compaction: Remove obsolete events that no longer affect current state
- Indexing: Create indexes on event store for efficient querying
Monitoring and Debugging
package com.example.cqrs.monitoring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class CqrsMetrics {
private static final Logger logger = LoggerFactory.getLogger(CqrsMetrics.class);
public void logCommandEvent(String aggregateId, String eventType, long processingTime) {
logger.info("Command processed - Aggregate: {}, Event: {}, Time: {}ms",
aggregateId, eventType, processingTime);
}
public void logQuery(String queryType, long executionTime) {
logger.info("Query executed - Type: {}, Time: {}ms", queryType, executionTime);
}
public void logProjection(String eventType, long projectionTime) {
logger.info("Projection applied - Event: {}, Time: {}ms", eventType, projectionTime);
}
}
When to Use CQRS & Event Sourcing:
- Complex Business Logic: Domains with intricate business rules
- Audit Requirements: Systems requiring complete change history
- High Scalability: Applications needing independent scaling of reads and writes
- Collaborative Domains: Multiple users modifying the same data
- Temporal Queries: Need to query historical states