Gitlab - Argos ALM by PALO IT

Added support to save in mongodb

parent ab78e654
......@@ -15,11 +15,21 @@ services:
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker port
rabbit:
image: rabbitmq:3-management
mongo:
image: mongo
ports:
- 5672:5672
- 15672:15672
- 27017:27017
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: pa55word
MONGO_INITDB_ROOT_USERNAME: user
MONGO_INITDB_ROOT_PASSWORD: pa55word
mongo-express:
image: mongo-express
depends_on:
- mongo
ports:
- 7081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: user
ME_CONFIG_MONGODB_ADMINPASSWORD: pa55word
ME_CONFIG_MONGODB_URL: mongodb://user:pa55word@mongo:27017/
@host=http://localhost:8080
### publish Message
POST /publishMessage
### policy
POST /publishPolicy
Content-Type: application/json
X-Correlation-Id: {{$guid}}
{
"id": "{{$guid}}",
"id": "{{$guid}}",
"description":"The time is {{$timestamp}}"
}
### publish Rule
### rule
POST /publishRule
Content-Type: application/json
{
......
......@@ -42,14 +42,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!--dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency-->
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
......@@ -58,7 +54,6 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
......
package com.example.http.reactive.entity;
import java.time.Instant;
import lombok.Data;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
@Data
public class Event<T> {
public Event(T payload) {
this.id = new ObjectId();
this.payload = payload;
this.createdAt = Instant.now();
}
@Id private ObjectId id;
private T payload;
private Instant createdAt;
}
......@@ -18,16 +18,16 @@ public class HttpHandlers {
private final StreamBridge streamBridge;
@PostMapping("/publishMessage")
@PostMapping("/publishPolicy")
@ResponseStatus(HttpStatus.ACCEPTED)
public void publishMessage(@RequestBody PolicyAvro pocmessage) {
log.info("Request Received message [{}]", pocmessage);
streamBridge.send("messageProducer", pocmessage);
public void policy(@RequestBody PolicyAvro policy) {
log.info("Request Received message [{}]", policy);
streamBridge.send("policyProducer", policy);
}
@PostMapping("/publishRule")
@ResponseStatus(HttpStatus.ACCEPTED)
public void publishRule(@RequestBody RuleAvro rule) {
public void rule(@RequestBody RuleAvro rule) {
log.info("Request Received rule [{}]", rule);
streamBridge.send("ruleProducer", rule);
}
......
......@@ -2,10 +2,12 @@ package com.example.http.reactive.handler;
import com.example.http.reactive.avro.PolicyAvro;
import com.example.http.reactive.avro.RuleAvro;
import com.example.http.reactive.entity.Event;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import reactor.core.publisher.Flux;
@Slf4j
......@@ -13,16 +15,22 @@ import reactor.core.publisher.Flux;
public class StreamHandlers {
@Bean
Consumer<Flux<RuleAvro>> ruleConsumer() {
return ruleFlux -> ruleFlux.doOnNext(rule -> log.info("Consumer rule: {}", rule)).subscribe();
Consumer<Flux<PolicyAvro>> policyConsumer(ReactiveMongoTemplate reactiveMongoTemplate) {
return flux ->
flux.doOnNext(policy -> log.info("Consume Policy: {}", policy))
.map(policy -> new Event(policy))
.doOnNext(event -> log.info("Persisted with id:[{}]", event.getId()))
.flatMap(message -> reactiveMongoTemplate.save(message, "policy"))
.subscribe();
}
@Bean
Consumer<Flux<PolicyAvro>> messageConsumer() {
return messageFlux -> messageFlux.doOnNext(
message -> {
log.info("Consumer message: {}", message);
})
.subscribe();
Consumer<Flux<RuleAvro>> ruleConsumer(ReactiveMongoTemplate reactiveMongoTemplate) {
return flux ->
flux.doOnNext(rule -> log.info("Consume Rule: {}", rule))
.map(rule -> new Event(rule))
.doOnNext(event -> log.info("Persisted with id:[{}]", event.getId()))
.flatMap(event -> reactiveMongoTemplate.save(event, "rule"))
.subscribe();
}
}
......@@ -8,12 +8,15 @@ management.info.java.enabled=true
management.info.os.enabled=true
#logging
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
logging.level.org.apache.kafka=ERROR
logging.level.io.confluent.kafka=ERROR
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
# MongoDB
spring.data.mongodb.uri=mongodb://user:pa55word@localhost:27017/raw?authSource=admin
# Stream
spring.cloud.function.definition=ruleConsumer;messageConsumer
spring.cloud.function.definition=ruleConsumer;policyConsumer
spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.default.consumer.useNativeDecoding=true
spring.cloud.stream.default.producer.useNativeEncoding=true
......@@ -22,29 +25,24 @@ spring.cloud.stream.kafka.binder.configuration.key.serializer=org.apache.kafka.c
spring.cloud.stream.kafka.binder.configuration.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.enableObservation=true
## Producer
spring.cloud.stream.bindings.messageProducer.destination=test.message
spring.cloud.stream.kafka.bindings.messageProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
#spring.cloud.stream.kafka.bindings.messageProducer.producer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.messageProducer.producer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.bindings.policyProducer.destination=test.policy
#spring.cloud.stream.kafka.bindings.policyProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
spring.cloud.stream.kafka.bindings.policyProducer.producer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.bindings.ruleProducer.destination=test.rule
spring.cloud.stream.kafka.bindings.ruleProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
#spring.cloud.stream.kafka.bindings.ruleProducer.producer.configuration.auto.register.schemas=false
#spring.cloud.stream.kafka.bindings.ruleProducer.producer.messageKeyExpression=T(java.lang.String).valueOf(headers['id'])
spring.cloud.stream.kafka.bindings.ruleProducer.producer.configuration.schema.registry.url=http://localhost:8081
## Consumer
spring.cloud.stream.bindings.ruleConsumer-in-0.destination=test.rule
spring.cloud.stream.bindings.ruleConsumer-in-0.group=${spring.application.name}
#spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.bindings.ruleConsumer-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.bindings.messageConsumer-in-0.destination=test.message
spring.cloud.stream.bindings.messageConsumer-in-0.group=${spring.application.name}
#spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.auto.register.schemas=false
spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.bindings.messageConsumer-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.bindings.policyConsumer-in-0.destination=test.policy
spring.cloud.stream.bindings.policyConsumer-in-0.group=${spring.application.name}
spring.cloud.stream.kafka.bindings.policyConsumer-in-0.consumer.configuration.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.bindings.policyConsumer-in-0.consumer.configuration.specific.avro.reader=true
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment