Stream Processing
Stream processing involves processing continuous flows of data (streams) in real time or near-real time. It contrasts with traditional batch processing, where data is collected, stored, and processed in discrete chunks.
Create Kafka Topics
Download Kafka: https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgzExample
- Consume messages from an input Kafka topic.
- Transform the messages (e.g., convert them to uppercase).
- Publish the transformed messages to an output Kafka topic.
# Start Zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
# Start Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
# Create input-topic
.\bin\windows\kafka-topics.bat --create --topic inputtopic --bootstrap-server localhost:9092
# Create output-topic
.\bin\windows\kafka-topics.bat --create --topic outputtopic --bootstrap-server localhost:9092
#list topic List
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
Zookeeper in Kafka
- Broker Metadata: Keeps track of brokers (Kafka servers) in the cluster.
- Leader Election: Elects leaders for Kafka partitions.
- Topic Management: Maintains metadata about topics and partitions.
- Configuration Management: Stores configuration for Kafka brokers and topics.
graph TD
A[Kafka Producers] -->|Send Data| B[Kafka Brokers]
B -->|Coordinate| C[Zookeeper Cluster]
C -->|Metadata| D[Kafka Consumers]
Create a Spring Boot Project :Producer
Dependencies
Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.application.propertie
# Kafka Producer Configuration
spring.application.name=ProducerApplication
server.port=8085
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
KafkaProducerConfig.java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
MessageController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestBody String message) {
kafkaProducerService.sendMessage("inputtopic", message);
return "Message sent to input-topic";
}
}
ProcessorApplication
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
application.properties
spring.application.name=ProcessorApplication
spring.kafka.streams.application-id=kafka-streams-demo
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
server.port=8082
KafkaStreamsConfig.java
package com.example.ProcessorApplication;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-demo");
props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder builder) {
// Define a list of unwanted words
List<String> unwantedWords = Arrays.asList("hello", "unwanted", "test");
KStream<String, String> source = builder.stream("inputtopic");
KStream<String, String> processed = source.mapValues(value -> {
for (String word : unwantedWords) {
value = value.replaceAll("\\b" + word + "\\b", "").trim();
}
return value;
});
processed.to("outputtopic");
return source;
}
}
ProcessorApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
@EnableKafkaStreams
public class ProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
}
ConsumerApplication
Dependencies
Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.Application.properties
spring.application.name=ConsumerApplication
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-streams-demo-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
server.port=8083a
KafkaConsumerListener.java
package com.example.ConsumerApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "outputtopic", groupId = "kafka-streams-demo-group")
public void listen(String message) {
System.out.println("Received message from output-topic: " + message);
}
}
RUN ALL SERVICE AND SEND A MESSAGE :http://localhost:8085/api/messages/send
process customer reviews
Suppose we have an e-commerce platform where we want to process customer reviews. Specifically, we want to:- Filter out reviews with inappropriate language.
- Count the number of positive and negative reviews.
- Store the processed reviews in separate topics.
Dependencies:
Spring WebSpring Boot DevTools
Spring Kafka
Application.properties
# Kafka Producer Configuration
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Kafka Consumer Configuration
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=ecommerce-review-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
# Kafka Streams Configuration
spring.kafka.streams.application-id=ecommerce-review-processing
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties.spring.json.trusted.packages=*
Define the Review Model
public class Review {
private String id;
private String productId;
private String customerId;
private String text;
private String sentiment; // Positive, Negative, Neutral
// Getters and Setters
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getCustomerId() {
return customerId;
}
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public String getSentiment() {
return sentiment;
}
public void setSentiment(String sentiment) {
this.sentiment = sentiment;
}
@Override
public String toString() {
return "Review{" +
"id='" + id + '\'' +
", productId='" + productId + '\'' +
", customerId='" + customerId + '\'' +
", text='" + text + '\'' +
", sentiment='" + sentiment + '\'' +
'}';
}
}
Create a Kafka Producer
KafkaProducerConfig.java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Review> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Review> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerService.java
import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Review> kafkaTemplate;
public void sendReview(String topic, Review review) {
kafkaTemplate.send(topic, review.getId(), review);
}
}
ReviewController.java
import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
@RequestMapping("/api/reviews")
public class ReviewController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendReview(@RequestBody Review review) {
review.setId(UUID.randomUUID().toString());
kafkaProducerService.sendReview("input-topic", review);
return "Review sent to input-topic";
}
}
Kafka Stream Processor
Create a Kafka stream processor that listens to the input topic, filters out inappropriate language, determines sentiment, and writes to the appropriate output topics.Define a List of Inappropriate Words
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@Configuration
public class AppConfig {
@Bean
public List<String> inappropriateWords() {
return Arrays.asList("bad", "horrible", "terrible", "awful", "poor", "stupid");
}
}
Create a Sentiment Analysis Utility
import com.example.ecommercereviewprocessing.model.Review;
public class SentimentAnalyzer {
public static String analyzeSentiment(String text) {
if (text.toLowerCase().contains("good") || text.toLowerCase().contains("great") || text.toLowerCase().contains("excellent")) {
return "Positive";
} else if (text.toLowerCase().contains("bad") || text.toLowerCase().contains("horrible") || text.toLowerCase().contains("terrible") || text.toLowerCase().contains("awful") || text.toLowerCase().contains("poor") || text.toLowerCase().contains("stupid")) {
return "Negative";
} else {
return "Neutral";
}
}
public static Review processReview(Review review, List<String> inappropriateWords) {
String cleanedText = review.getText();
for (String word : inappropriateWords) {
cleanedText = cleanedText.replaceAll("\\b" + word + "\\b", "").trim();
}
review.setText(cleanedText);
review.setSentiment(analyzeSentiment(cleanedText));
return review;
}
}
Create a Kafka Streams Configuration Class
import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.util.SentimentAnalyzer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Autowired
private List<String> inappropriateWords;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-review-processing");
props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonSerde.TRUSTED_PACKAGES, "*");
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, Review> kStream(StreamsBuilder builder) {
KStream<String, Review> source = builder.stream("input-topic");
// Filter out reviews with inappropriate language
KStream<String, Review> filteredReviews = source.filter((key, review) -> {
for (String word : inappropriateWords) {
if (review.getText().toLowerCase().contains(word)) {
return false;
}
}
return true;
});
// Process reviews to determine sentiment
KStream<String, Review> processedReviews = filteredReviews.mapValues(review -> SentimentAnalyzer.processReview(review, inappropriateWords));
// Split processed reviews into positive and negative topics
Predicate<String, Review> isPositive = (key, review) -> "Positive".equals(review.getSentiment());
Predicate<String, Review> isNegative = (key, review) -> "Negative".equals(review.getSentiment());
// [0]:Accesses the first branch of the split streams.
processedReviews.branch(
(key, review) -> isPositive.test(key, review),
(key, review) -> isNegative.test(key, review)
)[0].to("positive-reviews-topic");
processedReviews.branch(
(key, review) -> isPositive.test(key, review),
(key, review) -> isNegative.test(key, review)
)[1].to("negative-reviews-topic");
return source;
}
}
Create Kafka Consumers
Create Kafka consumers to listen to the positive-reviews-topic and negative-reviews-topic.Kafka Consumer Listener for Positive Reviews
import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class PositiveReviewConsumerListener {
@KafkaListener(topics = "positive-reviews-topic", groupId = "ecommerce-review-group")
public void listen(Review review) {
System.out.println("Received positive review: " + review);
}
}
Kafka Consumer Listener for Negative Reviews
import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class NegativeReviewConsumerListener {
@KafkaListener(topics = "negative-reviews-topic", groupId = "ecommerce-review-group")
public void listen(Review review) {
System.out.println("Received negative review: " + review);
}
}
Run the Application
curl -X POST http://localhost:8080/api/reviews/send -H "Content-Type: application/json" -d '{
"productId": "123",
"customerId": "456",
"text": "This product is great and excellent!"
}'