Stream Processing

Stream processing involves processing continuous flows of data (streams) in real time or near-real time. It contrasts with traditional batch processing, where data is collected, stored, and processed in discrete chunks.

Create Kafka Topics

Download Kafka: https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

Example

  • Consume messages from an input Kafka topic.
  • Transform the messages (e.g., convert them to uppercase).
  • Publish the transformed messages to an output Kafka topic.

# Start Zookeeper
.\bin\windows\zookeeper-server-start.bat  .\config\zookeeper.properties

# Start Kafka
.\bin\windows\kafka-server-start.bat  .\config\server.properties

# Create input-topic
.\bin\windows\kafka-topics.bat --create --topic inputtopic --bootstrap-server localhost:9092

# Create output-topic
.\bin\windows\kafka-topics.bat --create --topic outputtopic --bootstrap-server localhost:9092
#list topic List
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

Zookeeper in Kafka

  1. Broker Metadata: Keeps track of brokers (Kafka servers) in the cluster.
  2. Leader Election: Elects leaders for Kafka partitions.
  3. Topic Management: Maintains metadata about topics and partitions.
  4. Configuration Management: Stores configuration for Kafka brokers and topics.
graph TD A[Kafka Producers] -->|Send Data| B[Kafka Brokers] B -->|Coordinate| C[Zookeeper Cluster] C -->|Metadata| D[Kafka Consumers]

Create a Spring Boot Project :Producer

Dependencies

Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.

application.propertie

# Kafka Producer Configuration
spring.application.name=ProducerApplication
server.port=8085
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

KafkaProducerConfig.java


import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerService.java


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

MessageController.java


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendMessage(@RequestBody String message) {
        kafkaProducerService.sendMessage("inputtopic", message);
        return "Message sent to input-topic";
    }
}

ProcessorApplication

	<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

application.properties

spring.application.name=ProcessorApplication
spring.kafka.streams.application-id=kafka-streams-demo
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.apache.kafka.common.serialization.Serdes$StringSerde

server.port=8082

KafkaStreamsConfig.java


package com.example.ProcessorApplication;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

   

	 @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	    public KafkaStreamsConfiguration kStreamsConfigs() {
	        Map<String, Object> props = new HashMap<>();
	        props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-demo");
	        props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	        return new KafkaStreamsConfiguration(props);
	    }

	    @Bean
	    public KStream<String, String> kStream(StreamsBuilder builder) {
	        // Define a list of unwanted words
	        List<String> unwantedWords = Arrays.asList("hello", "unwanted", "test");

	        KStream<String, String> source = builder.stream("inputtopic");
	        KStream<String, String> processed = source.mapValues(value -> {
	            for (String word : unwantedWords) {
	                value = value.replaceAll("\\b" + word + "\\b", "").trim();
	            }
	            return value;
	        });
	        processed.to("outputtopic");
	        return source;
	    }
	}

ProcessorApplication.java


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class ProcessorApplication {

	public static void main(String[] args) {
		SpringApplication.run(ProcessorApplication.class, args);
	}

}

ConsumerApplication

Dependencies

Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.

Application.properties

spring.application.name=ConsumerApplication
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-streams-demo-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
server.port=8083a

KafkaConsumerListener.java

package com.example.ConsumerApplication;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "outputtopic", groupId = "kafka-streams-demo-group")
    public void listen(String message) {
        System.out.println("Received message from output-topic: " + message);
    }
}

RUN ALL SERVICE AND SEND A MESSAGE :http://localhost:8085/api/messages/send

process customer reviews

Suppose we have an e-commerce platform where we want to process customer reviews. Specifically, we want to:
  • Filter out reviews with inappropriate language.
  • Count the number of positive and negative reviews.
  • Store the processed reviews in separate topics.
Step 1: Set Up Your Spring Boot Project Follow the same steps as before to set up your Spring Boot project.

Dependencies:

Spring Web
Spring Boot DevTools
Spring Kafka

Application.properties


# Kafka Producer Configuration
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Kafka Consumer Configuration
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=ecommerce-review-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

# Kafka Streams Configuration
spring.kafka.streams.application-id=ecommerce-review-processing
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties.spring.json.trusted.packages=*

Define the Review Model


public class Review {
    private String id;
    private String productId;
    private String customerId;
    private String text;
    private String sentiment; // Positive, Negative, Neutral

    // Getters and Setters
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getCustomerId() {
        return customerId;
    }

    public void setCustomerId(String customerId) {
        this.customerId = customerId;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getSentiment() {
        return sentiment;
    }

    public void setSentiment(String sentiment) {
        this.sentiment = sentiment;
    }

    @Override
    public String toString() {
        return "Review{" +
                "id='" + id + '\'' +
                ", productId='" + productId + '\'' +
                ", customerId='" + customerId + '\'' +
                ", text='" + text + '\'' +
                ", sentiment='" + sentiment + '\'' +
                '}';
    }
}

Create a Kafka Producer

KafkaProducerConfig.java


import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Review> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Review> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerService.java


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, Review> kafkaTemplate;

    public void sendReview(String topic, Review review) {
        kafkaTemplate.send(topic, review.getId(), review);
    }
}

ReviewController.java


import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/api/reviews")
public class ReviewController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendReview(@RequestBody Review review) {
        review.setId(UUID.randomUUID().toString());
        kafkaProducerService.sendReview("input-topic", review);
        return "Review sent to input-topic";
    }
}

Kafka Stream Processor

Create a Kafka stream processor that listens to the input topic, filters out inappropriate language, determines sentiment, and writes to the appropriate output topics.

Define a List of Inappropriate Words


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@Configuration
public class AppConfig {

    @Bean
    public List<String> inappropriateWords() {
        return Arrays.asList("bad", "horrible", "terrible", "awful", "poor", "stupid");
    }
}

Create a Sentiment Analysis Utility


import com.example.ecommercereviewprocessing.model.Review;

public class SentimentAnalyzer {

    public static String analyzeSentiment(String text) {
        if (text.toLowerCase().contains("good") || text.toLowerCase().contains("great") || text.toLowerCase().contains("excellent")) {
            return "Positive";
        } else if (text.toLowerCase().contains("bad") || text.toLowerCase().contains("horrible") || text.toLowerCase().contains("terrible") || text.toLowerCase().contains("awful") || text.toLowerCase().contains("poor") || text.toLowerCase().contains("stupid")) {
            return "Negative";
        } else {
            return "Neutral";
        }
    }

    public static Review processReview(Review review, List<String> inappropriateWords) {
        String cleanedText = review.getText();
        for (String word : inappropriateWords) {
            cleanedText = cleanedText.replaceAll("\\b" + word + "\\b", "").trim();
        }
        review.setText(cleanedText);
        review.setSentiment(analyzeSentiment(cleanedText));
        return review;
    }
}

Create a Kafka Streams Configuration Class


import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.util.SentimentAnalyzer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.serializer.JsonSerde;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Autowired
    private List<String> inappropriateWords;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-review-processing");
        props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
        props.put(JsonSerde.TRUSTED_PACKAGES, "*");
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public KStream<String, Review> kStream(StreamsBuilder builder) {
        KStream<String, Review> source = builder.stream("input-topic");

        // Filter out reviews with inappropriate language
        KStream<String, Review> filteredReviews = source.filter((key, review) -> {
            for (String word : inappropriateWords) {
                if (review.getText().toLowerCase().contains(word)) {
                    return false;
                }
            }
            return true;
        });

        // Process reviews to determine sentiment
        KStream<String, Review> processedReviews = filteredReviews.mapValues(review -> SentimentAnalyzer.processReview(review, inappropriateWords));

        // Split processed reviews into positive and negative topics
        Predicate<String, Review> isPositive = (key, review) -> "Positive".equals(review.getSentiment());
        Predicate<String, Review> isNegative = (key, review) -> "Negative".equals(review.getSentiment());

        // [0]:Accesses the first branch of the split streams.
        processedReviews.branch(
            (key, review) -> isPositive.test(key, review),
            (key, review) -> isNegative.test(key, review)
        )[0].to("positive-reviews-topic");
        processedReviews.branch(
            (key, review) -> isPositive.test(key, review),
            (key, review) -> isNegative.test(key, review)
        )[1].to("negative-reviews-topic");

        return source;
    }
}

Create Kafka Consumers

Create Kafka consumers to listen to the positive-reviews-topic and negative-reviews-topic.

Kafka Consumer Listener for Positive Reviews


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class PositiveReviewConsumerListener {

    @KafkaListener(topics = "positive-reviews-topic", groupId = "ecommerce-review-group")
    public void listen(Review review) {
        System.out.println("Received positive review: " + review);
    }
}

Kafka Consumer Listener for Negative Reviews


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class NegativeReviewConsumerListener {

    @KafkaListener(topics = "negative-reviews-topic", groupId = "ecommerce-review-group")
    public void listen(Review review) {
        System.out.println("Received negative review: " + review);
    }
}

Run the Application


curl -X POST http://localhost:8080/api/reviews/send -H "Content-Type: application/json" -d '{
    "productId": "123",
    "customerId": "456",
    "text": "This product is great and excellent!"
}'