1. Introduction to Kafka Streams

Kafka Streams is a client library for building applications and microservices that process and analyze data stored in Apache Kafka. It combines the simplicity of writing and deploying standard Java and Scala applications with the benefits of Kafka's server-side cluster technology.

What is Stream Processing?

Stream processing continuously captures and processes data in real-time as it flows from various sources. Unlike traditional batch processing, stream processing handles data as a continuous, unbounded sequence of events.

Common Use Cases:

  • Real-time analytics and monitoring
  • Fraud detection and security
  • Recommendation engines
  • IoT data processing
  • Financial trading systems
  • Log processing and analysis

Why Kafka Streams?

Advantages:

  • No separate processing cluster needed
  • Exactly-once processing semantics
  • Stateful processing with local stores
  • Integration with Kafka ecosystem
  • Scalable and fault-tolerant

Key Features:

  • Event-time processing
  • Windowing operations
  • Interactive queries
  • Join operations
  • Rich DSL and Processor API

2. Core Concepts

Kafka Streams Architecture

graph TD A[Kafka Cluster] --> B[Producer] C[Consumer] --> A D[Kafka Streams App] --> A A --> E[Kafka Streams App] E --> F[Output Topic] D --> G[Input Topic]

Key Abstractions

graph LR A[KStream] --> B[Continuous Record Stream] C[KTable] --> D[Changelog Stream] E[GlobalKTable] --> F[Replicated Changelog]

Explanation:

KStream: Represents an unbounded, continuous stream of records. Each record is independent.

Example Data:


Key: 101, Value: {"product":"Laptop", "price":1200}
Key: 102, Value: {"product":"Phone", "price":800}
    

KTable: Represents a changelog stream where records with the same key are interpreted as updates.

Example Data:


Key: 101, Value: {"product":"Laptop", "price":1200}  -> Initial insert
Key: 102, Value: {"product":"Phone", "price":800}   -> Initial insert
Key: 101, Value: {"product":"Laptop", "price":1150} -> Update for key 101
      

GlobalKTable: A KTable that is fully replicated on each Kafka Streams instance.

Example Data:


Key: "US", Value: {"currency":"USD", "tax":0.07}
Key: "EU", Value: {"currency":"EUR", "tax":0.20}
Key: "JP", Value: {"currency":"JPY", "tax":0.10}
      

3. Setup & Configuration

Create Kafka Topics

Download Kafka: https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

Example

  • Consume messages from an input Kafka topic.
  • Transform the messages (e.g., convert them to uppercase).
  • Publish the transformed messages to an output Kafka topic.

# Start Zookeeper
.\bin\windows\zookeeper-server-start.bat  .\config\zookeeper.properties

# Start Kafka
.\bin\windows\kafka-server-start.bat  .\config\server.properties
  

2. Word Count Example

The classic "Hello World" of stream processing - counting words in real-time.

Architecture Diagram

graph LR A[Producer] --> B[text-lines Topic] B --> C[Kafka Streams App] C --> D[word-counts Topic] D --> E[Consumer]

Producer Code

package com.example.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;

public class WordCountProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        Scanner scanner = new Scanner(System.in);
        System.out.println("Enter sentences (type 'exit' to quit):");
        
        while (true) {
            System.out.print("> ");
            String line = scanner.nextLine();
            
            if ("exit".equalsIgnoreCase(line)) {
                break;
            }
            
            ProducerRecord<String, String> record = new ProducerRecord<>("text-lines", line);
            producer.send(record);
            System.out.println("Sent: " + line);
        }
        
        producer.close();
        scanner.close();
    }
}

Kafka Streams Application

package com.example.kafka.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;

public class WordCountStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read input topic
        KStream<String, String> textLines = builder.stream("text-lines");
        
        // Process: split, filter, count
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count();
        
        // Write to output topic
        wordCounts.toStream().to("word-counts", Produced.with(Serdes.String(), Serdes.Long()));
        
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Consumer Code

package com.example.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class WordCountConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("word-counts"));
        
        System.out.println("Waiting for word count results...");
        
        while (true) {
            ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, Long> record : records) {
                System.out.printf("Word: %-20s Count: %d%n", record.key(), record.value());
            }
        }
    }
}

How it works:

  1. Producer: Sends text lines to "text-lines" topic
  2. Streams App: Processes text, splits into words, and counts occurrences
  3. Consumer: Receives word counts from "word-counts" topic

3. Real-Time Analytics

Building a real-time dashboard for website click analytics.

Use Case Description

Scenario:

An e-commerce website wants to track real-time metrics like page views per user, popular products, and session durations.

Input: Click events with user ID, page URL, timestamp

Output: Aggregated metrics updated in real-time

Architecture Diagram

graph LR A[Click Producer] --> B[click-events Topic] B --> C[Kafka Streams App] C --> D[user-click-counts Topic] C --> E[page-click-counts Topic] D --> F[User Analytics Consumer] E --> G[Page Analytics Consumer]

Click Event Producer

package com.example.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

public class ClickEventProducer {
    private static final String[] USERS = {"user1", "user2", "user3", "user4", "user5"};
    private static final String[] PAGES = {"/home", "/products", "/cart", "/checkout", "/profile"};
    
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        Random random = new Random();
        
        System.out.println("Sending click events... Press Ctrl+C to stop.");
        
        while (true) {
            String userId = USERS[random.nextInt(USERS.length)];
            String pageUrl = PAGES[random.nextInt(PAGES.length)];
            String timestamp = Instant.now().toString();
            
            String clickEvent = String.format("%s,%s,%s", userId, pageUrl, timestamp);
            ProducerRecord<String, String> record = new ProducerRecord<>("click-events", userId, clickEvent);
            
            producer.send(record);
            System.out.println("Sent: " + clickEvent);
            
            Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds delay
        }
    }
}

Analytics Streams Application

package com.example.kafka.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class ClickAnalyticsStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-analytics-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read click events
        KStream<String, String> clickEvents = builder.stream("click-events");
        
        // Count clicks per user (tumbling window of 1 minute)
        KTable<Windowed<String>, Long> userClickCounts = clickEvents
            .selectKey((key, value) -> {
                String[] parts = value.split(",");
                return parts[0]; // userId
            })
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count();
        
        // Count clicks per page
        KTable<String, Long> pageClickCounts = clickEvents
            .selectKey((key, value) -> {
                String[] parts = value.split(",");
                return parts[1]; // pageUrl
            })
            .groupByKey()
            .count();
        
        // Output results
        userClickCounts.toStream()
            .map((windowedKey, value) -> 
                new KeyValue<>(windowedKey.key(), windowedKey.key() + ":" + value))
            .to("user-click-counts", Produced.with(Serdes.String(), Serdes.String()));
            
        pageClickCounts.toStream()
            .to("page-click-counts", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Analytics Consumers

package com.example.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class AnalyticsConsumers {
    
    public static void consumeUserClickCounts() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-analytics-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("user-click-counts"));
        
        System.out.println("=== User Click Counts ===");
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String[] parts = record.value().split(":");
                System.out.printf("User: %-10s Clicks: %s%n", parts[0], parts[1]);
            }
        }
    }
    
    public static void consumePageClickCounts() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "page-analytics-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("page-click-counts"));
        
        System.out.println("=== Page Click Counts ===");
        
        while (true) {
            ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, Long> record : records) {
                System.out.printf("Page: %-15s Views: %d%n", record.key(), record.value());
            }
        }
    }
    
    public static void main(String[] args) {
        // Run both consumers in separate threads
        Thread userConsumerThread = new Thread(() -> consumeUserClickCounts());
        Thread pageConsumerThread = new Thread(() -> consumePageClickCounts());
        
        userConsumerThread.start();
        pageConsumerThread.start();
    }
}

4. Fraud Detection

Detecting potentially fraudulent transactions in real-time.

Use Case Description

Scenario:

A payment system needs to detect suspicious transactions based on patterns like rapid successive transactions from the same account or unusually high amounts.

Input: Transaction events with account ID, amount, timestamp, location

Logic: Flag transactions if more than 3 occur within 5 minutes or amount exceeds $10,000

Architecture Diagram

graph LR A[Transaction Producer] --> B[transactions Topic] B --> C[Kafka Streams App] C --> D[fraud-alerts Topic] D --> E[Fraud Alert Consumer]

Transaction Producer

package com.example.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;

public class FraudDetectionProducer {
    private static final String[] ACCOUNTS = {"ACC001", "ACC002", "ACC003", "ACC004", "ACC005"};
    private static final String[] LOCATIONS = {"NYC", "LON", "PAR", "TOK", "SYD"};
    
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        Random random = new Random();
        
        System.out.println("Sending transaction events... Press Ctrl+C to stop.");
        
        while (true) {
            String accountId = ACCOUNTS[random.nextInt(ACCOUNTS.length)];
            int amount = 100 + random.nextInt(15000); // $100-$15,000
            String location = LOCATIONS[random.nextInt(LOCATIONS.length)];
            String timestamp = Instant.now().toString();
            
            String transaction = String.format("%s,%d,%s,%s", accountId, amount, location, timestamp);
            ProducerRecord<String, String> record = new ProducerRecord<>("transactions", accountId, transaction);
            
            producer.send(record);
            System.out.println("Sent: " + transaction);
            
            Thread.sleep(500 + random.nextInt(1500)); // 0.5-2 seconds delay
        }
    }
}

Fraud Detection Streams Application

package com.example.kafka.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class FraudDetectionStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-detection-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read transaction events
        KStream<String, String> transactions = builder.stream("transactions");
        
        // Count transactions per account in sliding window (5 minutes)
        KTable<Windowed<String>, Long> transactionCounts = transactions
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(
                Duration.ofMinutes(5), Duration.ofMinutes(1)))
            .count();
        
        // Join original transactions with counts
        KStream<String, String> flaggedTransactions = transactions
            .join(
                transactionCounts,
                (transaction, count) -> transaction + ",count:" + count,
                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
                StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long())
            )
            .filter((key, value) -> {
                // Parse and check if count > 3 or amount > 10000
                String[] parts = value.split(",");
                int amount = Integer.parseInt(parts[1]);
                String countStr = parts[parts.length - 1].split(":")[1];
                int count = Integer.parseInt(countStr);
                return count > 3 || amount > 10000;
            });
        
        // Send flagged transactions to alert topic
        flaggedTransactions.to("fraud-alerts", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Fraud Alert Consumer

package com.example.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class FraudAlertConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-alert-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("fraud-alerts"));
        
        System.out.println("=== FRAUD ALERTS ===");
        System.out.println("Monitoring for suspicious transactions...");
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String[] parts = record.value().split(",");
                String accountId = parts[0];
                int amount = Integer.parseInt(parts[1]);
                String countInfo = parts[parts.length - 1];
                
                System.out.printf("🚨 FRAUD ALERT! Account: %s, Amount: $%d, %s%n", 
                    accountId, amount, countInfo);
            }
        }
    }
}

5. Stream-Table Joins

Enriching stream data with reference data from tables.

Use Case Description

Scenario:

Enrich order events with customer information stored in a customer table.

Order Stream: Order events with customer ID

Customer Table: Customer details (name, address, tier)

Output: Enriched orders with customer details

Architecture Diagram

graph LR A[Order Producer] --> B[orders Topic] C[Customer Producer] --> D[customers Topic] B --> E[Kafka Streams App] D --> E E --> F[enriched-orders Topic] F --> G[Enriched Orders Consumer]

Order Producer

package com.example.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;

public class OrderProducer {
    private static final String[] CUSTOMERS = {"CUST001", "CUST002", "CUST003", "CUST004", "CUST005"};
    private static final String[] PRODUCTS = {"ProductA", "ProductB", "ProductC", "ProductD"};
    
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        Random random = new Random();
        
        System.out.println("Sending order events... Press Ctrl+C to stop.");
        
        while (true) {
            String orderId = "ORDER" + System.currentTimeMillis();
            String customerId = CUSTOMERS[random.nextInt(CUSTOMERS.length)];
            String product = PRODUCTS[random.nextInt(PRODUCTS.length)];
            int quantity = 1 + random.nextInt(10);
            double price = 10.0 + (random.nextDouble() * 1000.0);
            String timestamp = Instant.now().toString();
            
            String order = String.format("%s,%s,%s,%d,%.2f,%s", 
                orderId, customerId, product, quantity, price, timestamp);
            ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, order);
            
            producer.send(record);
            System.out.println("Sent Order: " + order);
            
            Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds delay
        }
    }
}

Customer Data Producer

package com.example.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class CustomerProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Send customer data (this would normally come from a database)
        String[] customers = {
            "CUST001,John Doe,123 Main St,Premium",
            "CUST002,Jane Smith,456 Oak Ave,Standard",
            "CUST003,Bob Johnson,789 Pine Rd,Premium",
            "CUST004,Alice Brown,321 Elm St,Standard",
            "CUST005,Charlie Wilson,654 Maple Dr,VIP"
        };
        
        for (String customer : customers) {
            String[] parts = customer.split(",");
            String customerId = parts[0];
            ProducerRecord<String, String> record = new ProducerRecord<>("customers", customerId, customer);
            producer.send(record);
            System.out.println("Sent Customer: " + customer);
        }
        
        producer.close();
        System.out.println("Customer data sent. Application will exit.");
    }
}

Stream-Table Join Streams Application

package com.example.kafka.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class StreamTableJoinStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read order stream
        KStream<String, String> orders = builder.stream("orders");
        
        // Read customer table (changelog topic)
        KTable<String, String> customers = builder.table("customers");
        
        // Join stream with table
        KStream<String, String> enrichedOrders = orders
            .selectKey((key, value) -> {
                // Extract customer ID from order (format: "orderId,customerId,...")
                return value.split(",")[1];
            })
            .join(
                customers,
                (order, customer) -> order + "," + customer, // Combine order and customer data
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
            );
        
        // Output enriched orders
        enrichedOrders.to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Enriched Orders Consumer

package com.example.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class EnrichedOrdersConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "enriched-orders-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("enriched-orders"));
        
        System.out.println("=== ENRICHED ORDERS ===");
        System.out.println("Waiting for enriched order data...");
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String[] parts = record.value().split(",");
                String orderId = parts[0];
                String customerId = parts[1];
                String productName = parts[2];
                String customerName = parts[7];
                String customerTier = parts[9];
                
                System.out.printf("Order: %-12s Customer: %-15s Product: %-12s Tier: %s%n", 
                    orderId, customerName, productName, customerTier);
            }
        }
    }
}

6. Best Practices

Performance Optimization

Cache Configuration

Configure cache sizes for state stores

Partitioning

Ensure even distribution across partitions

State Cleanup

Regular cleanup of old state data

Deployment Best Practices

graph TD A[Development] --> B[Unit Testing] B --> C[Integration Testing] C --> D[Performance Testing] D --> E[Production Deployment] E --> F[Monitoring] F --> G[Scaling] G --> H[Fault Tolerance]

Configuration Recommendations

# Performance tuning
cache.max.bytes.buffering=10485760
commit.interval.ms=1000
replication.factor=3
state.dir=/var/lib/kafka-streams

# Resilience
processing.guarantee=exactly_once_v2
num.standby.replicas=1
acceptable.recovery.lag=10000

# Resource management
num.stream.threads=8
max.task.idle.ms=0

Essential Best Practices:

  • Exactly-Once Semantics: Use exactly_once_v2 for strong consistency
  • Proper Keying: Design keys for optimal partition distribution
  • State Store Management: Monitor and tune state store configurations
  • Graceful Shutdown: Always implement proper shutdown hooks
  • Testing: Unit test processors and integration test topologies
  • Monitoring: Implement comprehensive metrics and alerting