1. Introduction to Kafka Streams
Kafka Streams is a client library for building applications and microservices that process and analyze data stored in Apache Kafka. It combines the simplicity of writing and deploying standard Java and Scala applications with the benefits of Kafka's server-side cluster technology.
What is Stream Processing?
Stream processing continuously captures and processes data in real-time as it flows from various sources. Unlike traditional batch processing, stream processing handles data as a continuous, unbounded sequence of events.
Common Use Cases:
- Real-time analytics and monitoring
- Fraud detection and security
- Recommendation engines
- IoT data processing
- Financial trading systems
- Log processing and analysis
Why Kafka Streams?
Advantages:
- No separate processing cluster needed
- Exactly-once processing semantics
- Stateful processing with local stores
- Integration with Kafka ecosystem
- Scalable and fault-tolerant
Key Features:
- Event-time processing
- Windowing operations
- Interactive queries
- Join operations
- Rich DSL and Processor API
2. Core Concepts
Kafka Streams Architecture
Key Abstractions
Explanation:
KStream: Represents an unbounded, continuous stream of records. Each record is independent.
Example Data:
Key: 101, Value: {"product":"Laptop", "price":1200}
Key: 102, Value: {"product":"Phone", "price":800}
KTable: Represents a changelog stream where records with the same key are interpreted as updates.
Example Data:
Key: 101, Value: {"product":"Laptop", "price":1200} -> Initial insert
Key: 102, Value: {"product":"Phone", "price":800} -> Initial insert
Key: 101, Value: {"product":"Laptop", "price":1150} -> Update for key 101
GlobalKTable: A KTable that is fully replicated on each Kafka Streams instance.
Example Data:
Key: "US", Value: {"currency":"USD", "tax":0.07}
Key: "EU", Value: {"currency":"EUR", "tax":0.20}
Key: "JP", Value: {"currency":"JPY", "tax":0.10}
3. Setup & Configuration
Create Kafka Topics
Download Kafka: https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgzExample
- Consume messages from an input Kafka topic.
- Transform the messages (e.g., convert them to uppercase).
- Publish the transformed messages to an output Kafka topic.
# Start Zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
# Start Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
2. Word Count Example
The classic "Hello World" of stream processing - counting words in real-time.
Architecture Diagram
Producer Code
package com.example.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class WordCountProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Scanner scanner = new Scanner(System.in);
System.out.println("Enter sentences (type 'exit' to quit):");
while (true) {
System.out.print("> ");
String line = scanner.nextLine();
if ("exit".equalsIgnoreCase(line)) {
break;
}
ProducerRecord<String, String> record = new ProducerRecord<>("text-lines", line);
producer.send(record);
System.out.println("Sent: " + line);
}
producer.close();
scanner.close();
}
}
Kafka Streams Application
package com.example.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class WordCountStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read input topic
KStream<String, String> textLines = builder.stream("text-lines");
// Process: split, filter, count
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
// Write to output topic
wordCounts.toStream().to("word-counts", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Consumer Code
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class WordCountConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("word-counts"));
System.out.println("Waiting for word count results...");
while (true) {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Long> record : records) {
System.out.printf("Word: %-20s Count: %d%n", record.key(), record.value());
}
}
}
}
How it works:
- Producer: Sends text lines to "text-lines" topic
- Streams App: Processes text, splits into words, and counts occurrences
- Consumer: Receives word counts from "word-counts" topic
3. Real-Time Analytics
Building a real-time dashboard for website click analytics.
Use Case Description
Scenario:
An e-commerce website wants to track real-time metrics like page views per user, popular products, and session durations.
Input: Click events with user ID, page URL, timestamp
Output: Aggregated metrics updated in real-time
Architecture Diagram
Click Event Producer
package com.example.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class ClickEventProducer {
private static final String[] USERS = {"user1", "user2", "user3", "user4", "user5"};
private static final String[] PAGES = {"/home", "/products", "/cart", "/checkout", "/profile"};
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Random random = new Random();
System.out.println("Sending click events... Press Ctrl+C to stop.");
while (true) {
String userId = USERS[random.nextInt(USERS.length)];
String pageUrl = PAGES[random.nextInt(PAGES.length)];
String timestamp = Instant.now().toString();
String clickEvent = String.format("%s,%s,%s", userId, pageUrl, timestamp);
ProducerRecord<String, String> record = new ProducerRecord<>("click-events", userId, clickEvent);
producer.send(record);
System.out.println("Sent: " + clickEvent);
Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds delay
}
}
}
Analytics Streams Application
package com.example.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class ClickAnalyticsStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read click events
KStream<String, String> clickEvents = builder.stream("click-events");
// Count clicks per user (tumbling window of 1 minute)
KTable<Windowed<String>, Long> userClickCounts = clickEvents
.selectKey((key, value) -> {
String[] parts = value.split(",");
return parts[0]; // userId
})
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count();
// Count clicks per page
KTable<String, Long> pageClickCounts = clickEvents
.selectKey((key, value) -> {
String[] parts = value.split(",");
return parts[1]; // pageUrl
})
.groupByKey()
.count();
// Output results
userClickCounts.toStream()
.map((windowedKey, value) ->
new KeyValue<>(windowedKey.key(), windowedKey.key() + ":" + value))
.to("user-click-counts", Produced.with(Serdes.String(), Serdes.String()));
pageClickCounts.toStream()
.to("page-click-counts", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Analytics Consumers
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AnalyticsConsumers {
public static void consumeUserClickCounts() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-analytics-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-click-counts"));
System.out.println("=== User Click Counts ===");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String[] parts = record.value().split(":");
System.out.printf("User: %-10s Clicks: %s%n", parts[0], parts[1]);
}
}
}
public static void consumePageClickCounts() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "page-analytics-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("page-click-counts"));
System.out.println("=== Page Click Counts ===");
while (true) {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Long> record : records) {
System.out.printf("Page: %-15s Views: %d%n", record.key(), record.value());
}
}
}
public static void main(String[] args) {
// Run both consumers in separate threads
Thread userConsumerThread = new Thread(() -> consumeUserClickCounts());
Thread pageConsumerThread = new Thread(() -> consumePageClickCounts());
userConsumerThread.start();
pageConsumerThread.start();
}
}
4. Fraud Detection
Detecting potentially fraudulent transactions in real-time.
Use Case Description
Scenario:
A payment system needs to detect suspicious transactions based on patterns like rapid successive transactions from the same account or unusually high amounts.
Input: Transaction events with account ID, amount, timestamp, location
Logic: Flag transactions if more than 3 occur within 5 minutes or amount exceeds $10,000
Architecture Diagram
Transaction Producer
package com.example.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
public class FraudDetectionProducer {
private static final String[] ACCOUNTS = {"ACC001", "ACC002", "ACC003", "ACC004", "ACC005"};
private static final String[] LOCATIONS = {"NYC", "LON", "PAR", "TOK", "SYD"};
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Random random = new Random();
System.out.println("Sending transaction events... Press Ctrl+C to stop.");
while (true) {
String accountId = ACCOUNTS[random.nextInt(ACCOUNTS.length)];
int amount = 100 + random.nextInt(15000); // $100-$15,000
String location = LOCATIONS[random.nextInt(LOCATIONS.length)];
String timestamp = Instant.now().toString();
String transaction = String.format("%s,%d,%s,%s", accountId, amount, location, timestamp);
ProducerRecord<String, String> record = new ProducerRecord<>("transactions", accountId, transaction);
producer.send(record);
System.out.println("Sent: " + transaction);
Thread.sleep(500 + random.nextInt(1500)); // 0.5-2 seconds delay
}
}
}
Fraud Detection Streams Application
package com.example.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class FraudDetectionStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-detection-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read transaction events
KStream<String, String> transactions = builder.stream("transactions");
// Count transactions per account in sliding window (5 minutes)
KTable<Windowed<String>, Long> transactionCounts = transactions
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count();
// Join original transactions with counts
KStream<String, String> flaggedTransactions = transactions
.join(
transactionCounts,
(transaction, count) -> transaction + ",count:" + count,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long())
)
.filter((key, value) -> {
// Parse and check if count > 3 or amount > 10000
String[] parts = value.split(",");
int amount = Integer.parseInt(parts[1]);
String countStr = parts[parts.length - 1].split(":")[1];
int count = Integer.parseInt(countStr);
return count > 3 || amount > 10000;
});
// Send flagged transactions to alert topic
flaggedTransactions.to("fraud-alerts", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Fraud Alert Consumer
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class FraudAlertConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-alert-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("fraud-alerts"));
System.out.println("=== FRAUD ALERTS ===");
System.out.println("Monitoring for suspicious transactions...");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String[] parts = record.value().split(",");
String accountId = parts[0];
int amount = Integer.parseInt(parts[1]);
String countInfo = parts[parts.length - 1];
System.out.printf("🚨 FRAUD ALERT! Account: %s, Amount: $%d, %s%n",
accountId, amount, countInfo);
}
}
}
}
5. Stream-Table Joins
Enriching stream data with reference data from tables.
Use Case Description
Scenario:
Enrich order events with customer information stored in a customer table.
Order Stream: Order events with customer ID
Customer Table: Customer details (name, address, tier)
Output: Enriched orders with customer details
Architecture Diagram
Order Producer
package com.example.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
public class OrderProducer {
private static final String[] CUSTOMERS = {"CUST001", "CUST002", "CUST003", "CUST004", "CUST005"};
private static final String[] PRODUCTS = {"ProductA", "ProductB", "ProductC", "ProductD"};
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Random random = new Random();
System.out.println("Sending order events... Press Ctrl+C to stop.");
while (true) {
String orderId = "ORDER" + System.currentTimeMillis();
String customerId = CUSTOMERS[random.nextInt(CUSTOMERS.length)];
String product = PRODUCTS[random.nextInt(PRODUCTS.length)];
int quantity = 1 + random.nextInt(10);
double price = 10.0 + (random.nextDouble() * 1000.0);
String timestamp = Instant.now().toString();
String order = String.format("%s,%s,%s,%d,%.2f,%s",
orderId, customerId, product, quantity, price, timestamp);
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, order);
producer.send(record);
System.out.println("Sent Order: " + order);
Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds delay
}
}
}
Customer Data Producer
package com.example.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomerProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send customer data (this would normally come from a database)
String[] customers = {
"CUST001,John Doe,123 Main St,Premium",
"CUST002,Jane Smith,456 Oak Ave,Standard",
"CUST003,Bob Johnson,789 Pine Rd,Premium",
"CUST004,Alice Brown,321 Elm St,Standard",
"CUST005,Charlie Wilson,654 Maple Dr,VIP"
};
for (String customer : customers) {
String[] parts = customer.split(",");
String customerId = parts[0];
ProducerRecord<String, String> record = new ProducerRecord<>("customers", customerId, customer);
producer.send(record);
System.out.println("Sent Customer: " + customer);
}
producer.close();
System.out.println("Customer data sent. Application will exit.");
}
}
Stream-Table Join Streams Application
package com.example.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class StreamTableJoinStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read order stream
KStream<String, String> orders = builder.stream("orders");
// Read customer table (changelog topic)
KTable<String, String> customers = builder.table("customers");
// Join stream with table
KStream<String, String> enrichedOrders = orders
.selectKey((key, value) -> {
// Extract customer ID from order (format: "orderId,customerId,...")
return value.split(",")[1];
})
.join(
customers,
(order, customer) -> order + "," + customer, // Combine order and customer data
Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
// Output enriched orders
enrichedOrders.to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Enriched Orders Consumer
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class EnrichedOrdersConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "enriched-orders-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("enriched-orders"));
System.out.println("=== ENRICHED ORDERS ===");
System.out.println("Waiting for enriched order data...");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String[] parts = record.value().split(",");
String orderId = parts[0];
String customerId = parts[1];
String productName = parts[2];
String customerName = parts[7];
String customerTier = parts[9];
System.out.printf("Order: %-12s Customer: %-15s Product: %-12s Tier: %s%n",
orderId, customerName, productName, customerTier);
}
}
}
}
6. Best Practices
Performance Optimization
Cache Configuration
Configure cache sizes for state stores
Partitioning
Ensure even distribution across partitions
State Cleanup
Regular cleanup of old state data
Deployment Best Practices
Configuration Recommendations
# Performance tuning
cache.max.bytes.buffering=10485760
commit.interval.ms=1000
replication.factor=3
state.dir=/var/lib/kafka-streams
# Resilience
processing.guarantee=exactly_once_v2
num.standby.replicas=1
acceptable.recovery.lag=10000
# Resource management
num.stream.threads=8
max.task.idle.ms=0
Essential Best Practices:
- Exactly-Once Semantics: Use exactly_once_v2 for strong consistency
- Proper Keying: Design keys for optimal partition distribution
- State Store Management: Monitor and tune state store configurations
- Graceful Shutdown: Always implement proper shutdown hooks
- Testing: Unit test processors and integration test topologies
- Monitoring: Implement comprehensive metrics and alerting