Gitlab - Argos ALM by PALO IT

Commit 581f7da5 authored by Julian Pulido's avatar Julian Pulido

Change package

parent e6681eec
......@@ -14,3 +14,12 @@ services:
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker port
rabbit:
image: rabbitmq:3-management
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: pa55word
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
......@@ -11,7 +12,6 @@
<groupId>com.example</groupId>
<artifactId>http-reactive-ms</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>HttpReactive</name>
<description>Demo project for Spring Boot</description>
<properties>
......@@ -25,18 +25,24 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
......@@ -44,8 +50,8 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
......
package com.example.http.reactive;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.example.http.reactive.dto.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
@Slf4j
@SpringBootApplication
@RequiredArgsConstructor
public class HttpReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(HttpReactiveApplication.class, args);
Hooks.enableAutomaticContextPropagation();
}
/*@Bean
Consumer<Message<Rule>> ruleConsumer(){
return rule -> log.info("Consumer rule: {}", rule);
}
@Bean
Consumer<Message<com.example.http.reactive.dto.Message>> messageConsumer(){
return message -> log.info("Consumer message: {}", message);
}*/
/*@Bean
Consumer<Flux<Rule>> ruleConsumer() {
return ruleFlux -> ruleFlux.
doOnNext(rule -> log.info("Consumer rule: {}", rule))
.subscribe();
}*/
@Bean
Consumer<Flux<Message>> messageConsumer() {
return messageFlux -> messageFlux
.doOnNext(message -> log.info("Consumer message: {}", message))
.subscribe();
}
}
......@@ -9,27 +9,33 @@ import com.example.http.reactive.dto.Rule;
import com.example.http.reactive.stream.MessageProducerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Component
@RequiredArgsConstructor
public class HttpReactiveHandler {
private final MessageProducerAdapter messageProducerAdapter;
record Response(Boolean status, Object payload){}
public Mono<ServerResponse> publishMessage(ServerRequest request) {
return request.bodyToMono(Message.class)
.flatMap(requestObject -> {
messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(requestObject);
log.info("Before publish {}", requestObject);
boolean result = messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(new Response(result, requestObject));
});
}
public Mono<ServerResponse> publishRule(ServerRequest request) {
return request.bodyToMono(Rule.class)
.flatMap(requestObject -> {
messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(requestObject);
log.info("Before publish {}", requestObject);
boolean result = messageProducerAdapter.publishMessage(requestObject);
return ServerResponse.ok().bodyValue(new Response(result, requestObject));
});
}
......
package com.example.http.reactive.config;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -10,9 +11,6 @@ import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Configuration(proxyBeanMethods = false)
public class HttpReactiveRouter {
......
package com.example.http.reactive.consumer;
import java.util.function.Consumer;
import org.springframework.stereotype.Component;
import com.example.http.reactive.dto.Rule;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
@Component
public class RuleConsumer implements Consumer<Flux<Rule>> {
@Override
public void accept(Flux<Rule> ruleFlux) {
ruleFlux
.doOnNext(rule -> log.info("Consumer rule: {}", rule))
.subscribe();
}
}
package com.example.http.reactive.controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.http.reactive.dto.Message;
import com.example.http.reactive.dto.Rule;
import com.example.http.reactive.stream.MessageProducerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
@RequestMapping("/block")
@RequiredArgsConstructor
public class HttpReactiveController {
private final MessageProducerAdapter messageProducerAdapter;
record Response(Boolean status, Object payload){}
@PostMapping("/publishMessage")
public Response publishMessage(@RequestBody Message message){
log.info("Before publish message {}", message);
Boolean result = messageProducerAdapter.publishMessage(message);
return new Response(result,message);
}
@PostMapping("/publishRule")
public Response publishRule(@RequestBody Rule rule){
log.info("Before publish rule {}", rule);
Boolean result = messageProducerAdapter.publishMessage(rule);
return new Response(result,rule);
}
}
package com.example.http.reactive.dto;
import lombok.Getter;
import lombok.Setter;
import lombok.Data;
@Getter
@Setter
@Data
public class Message {
private String message;
}
package com.example.http.reactive.dto;
import lombok.Getter;
import lombok.Setter;
import lombok.Data;
@Getter
@Setter
@Data
public class Rule {
private String ruleId;
......
......@@ -14,12 +14,12 @@ public class MessageProducerAdapter {
private final StreamBridge streamBridge;
public void publishMessage(Message Message){
streamBridge.send("messageProducer", Message);
public boolean publishMessage(Message message){
return streamBridge.send("messageProducer", message);
}
public void publishMessage(Rule rule){
streamBridge.send("ruleProducer", rule);
public boolean publishMessage(Rule rule){
return streamBridge.send("ruleProducer", rule);
}
}
spring.application.name=http-reactive-ms
#logging
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
# Stream
spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.rabbitmq.host=localhost
#spring.rabbitmq.username=user
#spring.rabbitmq.password=pa55word
#Producer
spring.cloud.stream.bindings.messageProducer.destination=testMessage
spring.cloud.stream.bindings.ruleProducer.destination=testRule
spring.cloud.stream.bindings.messageProducer.destination=test.message-3
spring.cloud.stream.bindings.ruleProducer.destination=test.rule-3
#Consumer
spring.cloud.function.definition=ruleConsumer;messageConsumer
#Consumer testRule
spring.cloud.stream.bindings.ruleConsumer-in-0.destination=test.rule-3
spring.cloud.stream.bindings.ruleConsumer-in-0.group=${spring.application.name}
#Consumer testMessage
spring.cloud.stream.bindings.messageConsumer-in-0.destination=test.message-3
spring.cloud.stream.bindings.messageConsumer-in-0.group=${spring.application.name}
management.endpoints.jmx.exposure.exclude=*
management.endpoints.web.exposure.include=health,info,metrics,prometheus
management.info.git.enabled=true
management.info.java.enabled=true
management.info.os.enabled=true
logging.level.org.apache.kafka=ERROR
spring.cloud.stream.kafka.binder.enableObservation=true
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment