Gitlab - Argos ALM by PALO IT

Added Avro configuration to produce and consume

parent db20524c
...@@ -4,13 +4,14 @@ ...@@ -4,13 +4,14 @@
POST /publishMessage POST /publishMessage
Content-Type: application/json Content-Type: application/json
{ {
"message":"Message test" "id": "{{$guid}}",
"description":"The time is {{$timestamp}}"
} }
### publish Rule ### publish Rule
POST /publishRule POST /publishRule
Content-Type: application/json Content-Type: application/json
{ {
"ruleId": "1", "id": "{{$guid}}",
"description":"Rule number 1" "description":"The time is {{$localDatetime iso8601}}"
} }
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId> <parent>
<artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId>
<version>3.1.3</version> <artifactId>spring-boot-starter-parent</artifactId>
<relativePath/> <!-- lookup parent from repository --> <version>3.1.3</version>
</parent> <relativePath/> <!-- lookup parent from repository -->
<groupId>com.example</groupId> </parent>
<artifactId>http-reactive-ms</artifactId>
<version>0.0.1-SNAPSHOT</version> <groupId>com.example</groupId>
<name>HttpReactive</name> <artifactId>http-reactive-ms</artifactId>
<description>Demo project for Spring Boot</description> <version>0.0.1-SNAPSHOT</version>
<properties> <name>HttpReactive</name>
<java.version>17</java.version> <description>Demo project for Spring Boot</description>
<spring-cloud.version>2022.0.3</spring-cloud.version>
</properties> <properties>
<dependencies> <java.version>17</java.version>
<dependency> <spring-cloud.version>2022.0.3</spring-cloud.version>
<groupId>org.springframework.boot</groupId> <org.apache.avro.version>1.11.2</org.apache.avro.version>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency> <spotless-maven-plugin.version>2.38.0</spotless-maven-plugin.version>
<dependency> </properties>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId> <repositories>
</dependency> <repository>
<!--<dependency> <id>confluent</id>
<groupId>org.springframework.cloud</groupId> <name>confluent</name>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <url>https://packages.confluent.io/maven/</url>
</dependency>--> </repository>
<dependency> </repositories>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-cloud-dependencies</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring-cloud.version}</version> </dependency>
<type>pom</type> <dependency>
<scope>import</scope> <groupId>org.springframework.cloud</groupId>
</dependency> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!--dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency-->
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${org.apache.avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement>
<build> <dependencyManagement>
<plugins> <dependencies>
<plugin> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-cloud-dependencies</artifactId>
</plugin> <version>${spring-cloud.version}</version>
</plugins> <type>pom</type>
</build> <scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${org.apache.avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless-maven-plugin.version}</version>
<configuration>
<java>
<importOrder/>
<toggleOffOn/>
<trimTrailingWhitespace/>
<endWithNewline/>
<removeUnusedImports>
<engine>google-java-format</engine>
</removeUnusedImports>
<googleJavaFormat>
<style>GOOGLE</style>
<reflowLongStrings>true</reflowLongStrings>
</googleJavaFormat>
</java>
</configuration>
<executions>
<execution>
<id>apply</id>
<phase>compile</phase>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>
{
"type": "record",
"name": "PolicyAvro",
"namespace":"com.example.http.reactive.avro",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "description",
"type": "string"
}
]
}
\ No newline at end of file
{
"type": "record",
"name": "RuleAvro",
"namespace":"com.example.http.reactive.avro",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "description",
"type": "string"
}
]
}
\ No newline at end of file
package com.example.http.reactive; package com.example.http.reactive;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.core.publisher.Hooks; import reactor.core.publisher.Hooks;
@Slf4j @Slf4j
...@@ -12,9 +11,8 @@ import reactor.core.publisher.Hooks; ...@@ -12,9 +11,8 @@ import reactor.core.publisher.Hooks;
@RequiredArgsConstructor @RequiredArgsConstructor
public class HttpReactiveApplication { public class HttpReactiveApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(HttpReactiveApplication.class, args); SpringApplication.run(HttpReactiveApplication.class, args);
Hooks.enableAutomaticContextPropagation(); Hooks.enableAutomaticContextPropagation();
} }
} }
package com.example.http.reactive.config;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.example.http.reactive.dto.Message;
import com.example.http.reactive.dto.Rule;
import com.example.http.reactive.stream.MessageProducerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Component
@RequiredArgsConstructor
public class HttpReactiveHandler {
private final MessageProducerAdapter messageProducerAdapter;
record Response(Boolean status, Object payload){}
public Mono<ServerResponse> publishMessage(ServerRequest request) {
return request.bodyToMono(Message.class)
.flatMap(requestObject -> {
log.info("Before publish {}", requestObject);
boolean result = messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(new Response(result, requestObject));
});
}
public Mono<ServerResponse> publishRule(ServerRequest request) {
return request.bodyToMono(Rule.class)
.flatMap(requestObject -> {
log.info("Before publish {}", requestObject);
boolean result = messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(new Response(result, requestObject));
});
}
}
package com.example.http.reactive.config;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicate;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration(proxyBeanMethods = false)
public class HttpReactiveRouter {
private static final RequestPredicate ACCEPT_JSON = accept(MediaType.APPLICATION_JSON);
@Bean
public RouterFunction<ServerResponse> route(HttpReactiveHandler httpReactiveHandler) {
return RouterFunctions.route()
.POST("/publishMessage",ACCEPT_JSON, httpReactiveHandler::publishMessage)
.POST("/publishRule",ACCEPT_JSON, httpReactiveHandler::publishRule)
.build();
}
}
package com.example.http.reactive.dto;
import lombok.Data;
@Data
public class Message {
private String message;
}
package com.example.http.reactive.dto;
import lombok.Data;
@Data
public class Rule {
private String ruleId;
private String description;
}
package com.example.http.reactive.handler; package com.example.http.reactive.handler;
import com.example.http.reactive.avro.PolicyAvro;
import com.example.http.reactive.avro.RuleAvro;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import com.example.http.reactive.dto.Message;
import com.example.http.reactive.dto.Rule;
import com.example.http.reactive.stream.MessageProducerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@RestController @RestController
@RequestMapping("/block")
@RequiredArgsConstructor @RequiredArgsConstructor
public class HttpHandlers { public class HttpHandlers {
private final MessageProducerAdapter messageProducerAdapter; private final StreamBridge streamBridge;
record Response(Boolean status, Object payload){}
@PostMapping("/publishMessage") @PostMapping("/publishMessage")
public Response publishMessage(@RequestBody Message message){ @ResponseStatus(HttpStatus.ACCEPTED)
log.info("Before publish message {}", message); public void publishMessage(@RequestBody PolicyAvro pocmessage) {
Boolean result = messageProducerAdapter.publishMessage(message); log.info("Request Received message [{}]", pocmessage);
return new Response(result,message); streamBridge.send("messageProducer", pocmessage);
} }
@PostMapping("/publishRule") @PostMapping("/publishRule")
public Response publishRule(@RequestBody Rule rule){ @ResponseStatus(HttpStatus.ACCEPTED)
log.info("Before publish rule {}", rule); public void publishRule(@RequestBody RuleAvro rule) {
Boolean result = messageProducerAdapter.publishMessage(rule); log.info("Request Received rule [{}]", rule);
return new Response(result,rule); streamBridge.send("ruleProducer", rule);
} }
} }
package com.example.http.reactive.handler; package com.example.http.reactive.handler;
import com.example.http.reactive.avro.PolicyAvro;
import com.example.http.reactive.avro.RuleAvro;
import java.util.function.Consumer; import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import com.example.http.reactive.dto.Message;
import com.example.http.reactive.dto.Rule;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@Slf4j @Slf4j
...@@ -16,17 +13,16 @@ import reactor.core.publisher.Flux; ...@@ -16,17 +13,16 @@ import reactor.core.publisher.Flux;
public class StreamHandlers { public class StreamHandlers {
@Bean @Bean
Consumer<Flux<Rule>> ruleConsumer() { Consumer<Flux<RuleAvro>> ruleConsumer() {
return ruleFlux -> ruleFlux. return ruleFlux -> ruleFlux.doOnNext(rule -> log.info("Consumer rule: {}", rule)).subscribe();
doOnNext(rule -> log.info("Consumer rule: {}", rule)) }
.subscribe();
}
@Bean @Bean
Consumer<Flux<Message>> messageConsumer() { Consumer<Flux<PolicyAvro>> messageConsumer() {
return messageFlux -> messageFlux return messageFlux -> messageFlux.doOnNext(
.doOnNext(message -> log.info("Consumer message: {}", message)) message -> {
log.info("Consumer message: {}", message);
})
.subscribe(); .subscribe();
} }
} }
package com.example.http.reactive.stream;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import com.example.http.reactive.dto.Message;
import com.example.http.reactive.dto.Rule;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class MessageProducerAdapter {
private final StreamBridge streamBridge;
public boolean publishMessage(Message message){
return streamBridge.send("messageProducer", message);
}
public boolean publishMessage(Rule rule){
return streamBridge.send("ruleProducer", rule);
}
}
spring.application.name=http-reactive-ms spring.application.name=http-reactive-ms
#logging
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
# Stream
spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.rabbitmq.host=localhost
#spring.rabbitmq.username=user
#spring.rabbitmq.password=pa55word
#Producer
spring.cloud.stream.bindings.messageProducer.destination=test.message-3
spring.cloud.stream.bindings.ruleProducer.destination=test.rule-3
#Consumer
spring.cloud.function.definition=ruleConsumer;messageConsumer
#Consumer testRule
spring.cloud.stream.bindings.ruleConsumer-in-0.destination=test.rule-3
spring.cloud.stream.bindings.ruleConsumer-in-0.group=${spring.application.name}
#Consumer testMessage
spring.cloud.stream.bindings.messageConsumer-in-0.destination=test.message-3
spring.cloud.stream.bindings.messageConsumer-in-0.group=${spring.application.name}
# Actuator
management.endpoints.jmx.exposure.exclude=* management.endpoints.jmx.exposure.exclude=*
management.endpoints.web.exposure.include=health,info,metrics,prometheus management.endpoints.web.exposure.include=health,info,metrics,prometheus
management.info.git.enabled=true management.info.git.enabled=true
management.info.java.enabled=true management.info.java.enabled=true
management.info.os.enabled=true management.info.os.enabled=true
#logging
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
logging.level.org.apache.kafka=ERROR logging.level.org.apache.kafka=ERROR
spring.cloud.stream.kafka.binder.enableObservation=true logging.level.io.confluent.kafka=ERROR
# Stream
spring.cloud.function.definition=ruleConsumer;messageConsumer
spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.default.consumer.useNativeDecoding=true
spring.cloud.stream.default.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
## Producer
spring.cloud.stream.bindings.messageProducer.destination=test.message
spring.cloud.stream.kafka.bindings.messageProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
#spring.cloud.stream.kafka.bindings.messageProducer.producer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.messageProducer.producer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.bindings.ruleProducer.destination=test.rule
spring.cloud.stream.kafka.bindings.ruleProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
#spring.cloud.stream.kafka.bindings.ruleProducer.producer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.ruleProducer.producer.configuration.schema.registry.url=http://localhost:8081
## Consumer
spring.cloud.stream.bindings.ruleConsumer-in-0.destination=test.rule
spring.cloud.stream.bindings.ruleConsumer-in-0.group=${spring.application.name}
#spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.bindings.messageConsumer-in-0.destination=test.message
spring.cloud.stream.bindings.messageConsumer-in-0.group=${spring.application.name}
#spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.specific.avro.reader=true
...@@ -6,8 +6,6 @@ import org.springframework.boot.test.context.SpringBootTest; ...@@ -6,8 +6,6 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest @SpringBootTest
class HttpReactiveApplicationTests { class HttpReactiveApplicationTests {
@Test @Test
void contextLoads() { void contextLoads() {}
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment