Cloud computing

Introduction

Architecture de Spring Cloud

Eureka Config Serve Zuul Consul Hystrix Resilience4J

Spring Boot (BackEnd) TPs

Creation,Dépendance,Configuration Exemple Video RestController

Produit Rest API

Entity et repository Ajouter Afficher Liste Produit Détails,Supprimer,Vider Modifier

Angular (FrontEnd)

Angular Rappel CLient CRUD

Spring Security

User Auth

CRUD

Vente CRUD

To be Continued...

Middlewares Orientés Messages

Communication Synchrone vs. Asynchrone API JMS : Java Message Service JMS avec ActiveMQ et HornetQ KAFKA

Spring Batch

Spring Batch

Stream Processing

Kafka Streams

Architectures Serverless

Architectures Serverless Résumé



Stream Processing

Stream processing involves processing continuous flows of data (streams) in real time or near-real time. It contrasts with traditional batch processing, where data is collected, stored, and processed in discrete chunks.

Create Kafka Topics

Download Kafka: https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

Example

  • Consume messages from an input Kafka topic.
  • Transform the messages (e.g., convert them to uppercase).
  • Publish the transformed messages to an output Kafka topic.

# Start Zookeeper
.\bin\windows\zookeeper-server-start.bat  .\config\zookeeper.properties

# Start Kafka
.\bin\windows\kafka-server-start.bat  .\config\server.properties

# Create input-topic
.\bin\windows\kafka-topics.bat --create --topic inputtopic --bootstrap-server localhost:9092

# Create output-topic
.\bin\windows\kafka-topics.bat --create --topic outputtopic --bootstrap-server localhost:9092
#list topic List
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

Zookeeper in Kafka

  1. Broker Metadata: Keeps track of brokers (Kafka servers) in the cluster.
  2. Leader Election: Elects leaders for Kafka partitions.
  3. Topic Management: Maintains metadata about topics and partitions.
  4. Configuration Management: Stores configuration for Kafka brokers and topics.
graph TD A[Kafka Producers] -->|Send Data| B[Kafka Brokers] B -->|Coordinate| C[Zookeeper Cluster] C -->|Metadata| D[Kafka Consumers]

Create a Spring Boot Project :Producer

Dependencies

Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.

application.propertie

# Kafka Producer Configuration
spring.application.name=ProducerApplication
server.port=8085
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

KafkaProducerConfig.java


import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerService.java


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

MessageController.java


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendMessage(@RequestBody String message) {
        kafkaProducerService.sendMessage("inputtopic", message);
        return "Message sent to input-topic";
    }
}

ProcessorApplication

	<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

application.properties

spring.application.name=ProcessorApplication
spring.kafka.streams.application-id=kafka-streams-demo
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.apache.kafka.common.serialization.Serdes$StringSerde

server.port=8082

KafkaStreamsConfig.java


package com.example.ProcessorApplication;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

   

	 @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	    public KafkaStreamsConfiguration kStreamsConfigs() {
	        Map<String, Object> props = new HashMap<>();
	        props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-demo");
	        props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	        return new KafkaStreamsConfiguration(props);
	    }

	    @Bean
	    public KStream<String, String> kStream(StreamsBuilder builder) {
	        // Define a list of unwanted words
	        List<String> unwantedWords = Arrays.asList("hello", "unwanted", "test");

	        KStream<String, String> source = builder.stream("inputtopic");
	        KStream<String, String> processed = source.mapValues(value -> {
	            for (String word : unwantedWords) {
	                value = value.replaceAll("\\b" + word + "\\b", "").trim();
	            }
	            return value;
	        });
	        processed.to("outputtopic");
	        return source;
	    }
	}

ProcessorApplication.java


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class ProcessorApplication {

	public static void main(String[] args) {
		SpringApplication.run(ProcessorApplication.class, args);
	}

}

ConsumerApplication

Dependencies

Dependencies: Spring Web, Spring for Apache Kafka, Spring Boot DevTools.

Application.properties

spring.application.name=ConsumerApplication
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-streams-demo-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
server.port=8083a

KafkaConsumerListener.java

package com.example.ConsumerApplication;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "outputtopic", groupId = "kafka-streams-demo-group")
    public void listen(String message) {
        System.out.println("Received message from output-topic: " + message);
    }
}

RUN ALL SERVICE AND SEND A MESSAGE :http://localhost:8085/api/messages/send

process customer reviews

Suppose we have an e-commerce platform where we want to process customer reviews. Specifically, we want to:
  • Filter out reviews with inappropriate language.
  • Count the number of positive and negative reviews.
  • Store the processed reviews in separate topics.
Step 1: Set Up Your Spring Boot Project Follow the same steps as before to set up your Spring Boot project.

Dependencies:

Spring Web
Spring Boot DevTools
Spring Kafka

Application.properties


# Kafka Producer Configuration
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Kafka Consumer Configuration
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=ecommerce-review-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

# Kafka Streams Configuration
spring.kafka.streams.application-id=ecommerce-review-processing
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties.spring.json.trusted.packages=*

Define the Review Model


public class Review {
    private String id;
    private String productId;
    private String customerId;
    private String text;
    private String sentiment; // Positive, Negative, Neutral

    // Getters and Setters
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getCustomerId() {
        return customerId;
    }

    public void setCustomerId(String customerId) {
        this.customerId = customerId;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getSentiment() {
        return sentiment;
    }

    public void setSentiment(String sentiment) {
        this.sentiment = sentiment;
    }

    @Override
    public String toString() {
        return "Review{" +
                "id='" + id + '\'' +
                ", productId='" + productId + '\'' +
                ", customerId='" + customerId + '\'' +
                ", text='" + text + '\'' +
                ", sentiment='" + sentiment + '\'' +
                '}';
    }
}

Create a Kafka Producer

KafkaProducerConfig.java


import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Review> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Review> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerService.java


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, Review> kafkaTemplate;

    public void sendReview(String topic, Review review) {
        kafkaTemplate.send(topic, review.getId(), review);
    }
}

ReviewController.java


import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/api/reviews")
public class ReviewController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendReview(@RequestBody Review review) {
        review.setId(UUID.randomUUID().toString());
        kafkaProducerService.sendReview("input-topic", review);
        return "Review sent to input-topic";
    }
}

Kafka Stream Processor

Create a Kafka stream processor that listens to the input topic, filters out inappropriate language, determines sentiment, and writes to the appropriate output topics.

Define a List of Inappropriate Words


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@Configuration
public class AppConfig {

    @Bean
    public List<String> inappropriateWords() {
        return Arrays.asList("bad", "horrible", "terrible", "awful", "poor", "stupid");
    }
}

Create a Sentiment Analysis Utility


import com.example.ecommercereviewprocessing.model.Review;

public class SentimentAnalyzer {

    public static String analyzeSentiment(String text) {
        if (text.toLowerCase().contains("good") || text.toLowerCase().contains("great") || text.toLowerCase().contains("excellent")) {
            return "Positive";
        } else if (text.toLowerCase().contains("bad") || text.toLowerCase().contains("horrible") || text.toLowerCase().contains("terrible") || text.toLowerCase().contains("awful") || text.toLowerCase().contains("poor") || text.toLowerCase().contains("stupid")) {
            return "Negative";
        } else {
            return "Neutral";
        }
    }

    public static Review processReview(Review review, List<String> inappropriateWords) {
        String cleanedText = review.getText();
        for (String word : inappropriateWords) {
            cleanedText = cleanedText.replaceAll("\\b" + word + "\\b", "").trim();
        }
        review.setText(cleanedText);
        review.setSentiment(analyzeSentiment(cleanedText));
        return review;
    }
}

Create a Kafka Streams Configuration Class


import com.example.ecommercereviewprocessing.model.Review;
import com.example.ecommercereviewprocessing.util.SentimentAnalyzer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.serializer.JsonSerde;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Autowired
    private List<String> inappropriateWords;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-review-processing");
        props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
        props.put(JsonSerde.TRUSTED_PACKAGES, "*");
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public KStream<String, Review> kStream(StreamsBuilder builder) {
        KStream<String, Review> source = builder.stream("input-topic");

        // Filter out reviews with inappropriate language
        KStream<String, Review> filteredReviews = source.filter((key, review) -> {
            for (String word : inappropriateWords) {
                if (review.getText().toLowerCase().contains(word)) {
                    return false;
                }
            }
            return true;
        });

        // Process reviews to determine sentiment
        KStream<String, Review> processedReviews = filteredReviews.mapValues(review -> SentimentAnalyzer.processReview(review, inappropriateWords));

        // Split processed reviews into positive and negative topics
        Predicate<String, Review> isPositive = (key, review) -> "Positive".equals(review.getSentiment());
        Predicate<String, Review> isNegative = (key, review) -> "Negative".equals(review.getSentiment());

        // [0]:Accesses the first branch of the split streams.
        processedReviews.branch(
            (key, review) -> isPositive.test(key, review),
            (key, review) -> isNegative.test(key, review)
        )[0].to("positive-reviews-topic");
        processedReviews.branch(
            (key, review) -> isPositive.test(key, review),
            (key, review) -> isNegative.test(key, review)
        )[1].to("negative-reviews-topic");

        return source;
    }
}

Create Kafka Consumers

Create Kafka consumers to listen to the positive-reviews-topic and negative-reviews-topic.

Kafka Consumer Listener for Positive Reviews


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class PositiveReviewConsumerListener {

    @KafkaListener(topics = "positive-reviews-topic", groupId = "ecommerce-review-group")
    public void listen(Review review) {
        System.out.println("Received positive review: " + review);
    }
}

Kafka Consumer Listener for Negative Reviews


import com.example.ecommercereviewprocessing.model.Review;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class NegativeReviewConsumerListener {

    @KafkaListener(topics = "negative-reviews-topic", groupId = "ecommerce-review-group")
    public void listen(Review review) {
        System.out.println("Received negative review: " + review);
    }
}

Run the Application


curl -X POST http://localhost:8080/api/reviews/send -H "Content-Type: application/json" -d '{
    "productId": "123",
    "customerId": "456",
    "text": "This product is great and excellent!"
}'