Gitlab - Argos ALM by PALO IT

Implements rabbitMQ rather than Kafka

parent c3b25504
...@@ -31,21 +31,15 @@ services: ...@@ -31,21 +31,15 @@ services:
networks: networks:
- 'mongo' - 'mongo'
kafka: rabbitmq:
image: landoop/fast-data-dev:latest image: rabbitmq:3.12.3-management
environment:
ADV_HOST: '127.0.0.1'
RUNTESTS: 0
FORWARDLOGS: 0
SAMPLEDATA: 0
ports: ports:
- 2181:2181 # Zookeeper port - 5672:5672
- 3030:3030 # Landoop UI port - 15672:15672 #management
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports environment:
- 9581-9585:9581-9585 # JMX Ports RABBITMQ_DEFAULT_USER: "user"
- 9092:9092 # Kafka Broker port RABBITMQ_DEFAULT_PASS: "p@ssword"
networks:
- 'kafka'
networks: networks:
kafka: kafka:
......
...@@ -30,11 +30,11 @@ ...@@ -30,11 +30,11 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId> <artifactId>spring-boot-configuration-processor</artifactId>
......
...@@ -2,6 +2,7 @@ package com.cardif.pocmeasurerest.infra.controller; ...@@ -2,6 +2,7 @@ package com.cardif.pocmeasurerest.infra.controller;
import com.cardif.pocmeasurerest.domain.model.Measure; import com.cardif.pocmeasurerest.domain.model.Measure;
import com.cardif.pocmeasurerest.domain.service.MeasureService; import com.cardif.pocmeasurerest.domain.service.MeasureService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -10,6 +11,7 @@ import java.util.Optional; ...@@ -10,6 +11,7 @@ import java.util.Optional;
@RestController @RestController
@CrossOrigin("*") @CrossOrigin("*")
@RequestMapping(value = "/measure") @RequestMapping(value = "/measure")
@Slf4j
public class MeasureController { public class MeasureController {
private final MeasureService measureService; private final MeasureService measureService;
...@@ -20,6 +22,7 @@ public class MeasureController { ...@@ -20,6 +22,7 @@ public class MeasureController {
@PostMapping @PostMapping
public Measure save(@RequestBody Measure measure) { public Measure save(@RequestBody Measure measure) {
log.info("Create Measure by Rest, request: {}", measure);
return measureService.save(measure); return measureService.save(measure);
} }
......
package com.cardif.pocmeasurerest.infra.publisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@EnableRabbit
@Slf4j
@RequiredArgsConstructor
public class MeasureRabbitMQPublisher {
private final RabbitTemplate rabbitTemplate;
private final Queue queue;
public void sendMessage(Object message) {
log.info("Sending message {} to RabbitMQ...", message);
rabbitTemplate.convertAndSend(queue.getName(), message);
log.info("Message sent successfully");
}
}
package com.cardif.pocmeasurerest.infra.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MeasureRabbitMQPublisherConfig {
@Value("${spring.rabbitmq.queue}")
private String queueName;
@Bean
public Queue queue() {
return new Queue(queueName, false);
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter);
return template;
}
}
\ No newline at end of file
...@@ -2,22 +2,24 @@ package com.cardif.pocmeasurerest.infra.repository; ...@@ -2,22 +2,24 @@ package com.cardif.pocmeasurerest.infra.repository;
import com.cardif.pocmeasurerest.domain.model.Measure; import com.cardif.pocmeasurerest.domain.model.Measure;
import com.cardif.pocmeasurerest.domain.repository.MeasureRepository; import com.cardif.pocmeasurerest.domain.repository.MeasureRepository;
import com.cardif.pocmeasurerest.infra.publisher.MeasureRabbitMQPublisher;
import com.cardif.pocmeasurerest.infra.repository.entity.MeasureEntity; import com.cardif.pocmeasurerest.infra.repository.entity.MeasureEntity;
import org.springframework.cloud.stream.function.StreamBridge; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Optional; import java.util.Optional;
@Repository @Repository
@Slf4j
public class MeasureRepositoryImpl implements MeasureRepository { public class MeasureRepositoryImpl implements MeasureRepository {
private final JpaMeasureRepository jpaMeasureRepository; private final JpaMeasureRepository jpaMeasureRepository;
private final StreamBridge streamBridge; private final MeasureRabbitMQPublisher rabbitMQPublisher;
public MeasureRepositoryImpl(JpaMeasureRepository jpaMeasureRepository, StreamBridge streamBridge) { public MeasureRepositoryImpl(JpaMeasureRepository jpaMeasureRepository, MeasureRabbitMQPublisher rabbitMQPublisher) {
this.jpaMeasureRepository = jpaMeasureRepository; this.jpaMeasureRepository = jpaMeasureRepository;
this.streamBridge = streamBridge; this.rabbitMQPublisher = rabbitMQPublisher;
} }
@Override @Override
...@@ -27,9 +29,11 @@ public class MeasureRepositoryImpl implements MeasureRepository { ...@@ -27,9 +29,11 @@ public class MeasureRepositoryImpl implements MeasureRepository {
entity.setDeviceId(measure.deviceId()); entity.setDeviceId(measure.deviceId());
entity.setValue(measure.value()); entity.setValue(measure.value());
MeasureEntity entitySaved = jpaMeasureRepository.save(entity); MeasureEntity entitySaved = jpaMeasureRepository.save(entity);
streamBridge.send("measureProducer", entitySaved); var measureSaved = new Measure(entitySaved.getDeviceId(), entitySaved.getValue(),
return new Measure(entitySaved.getDeviceId(), entitySaved.getValue(),
entitySaved.getDate()); entitySaved.getDate());
log.info("Publish Message: {}", measureSaved);
rabbitMQPublisher.sendMessage(measureSaved);
return measureSaved;
} }
@Override @Override
......
package com.cardif.pocmeasurerest.infra.repository.entity; package com.cardif.pocmeasurerest.infra.repository.entity;
import lombok.ToString;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Document;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@Document(collection = "measure") @Document(collection = "measure")
@ToString
public class MeasureEntity { public class MeasureEntity {
@Id @Id
......
...@@ -8,5 +8,9 @@ spring.data.mongodb.username=root ...@@ -8,5 +8,9 @@ spring.data.mongodb.username=root
spring.data.mongodb.password=myPasswordMongo21c spring.data.mongodb.password=myPasswordMongo21c
spring.data.mongodb.authentication-database=admin spring.data.mongodb.authentication-database=admin
#Kafka configurations stream # RabbitMQ Configuration
spring.cloud.stream.kafka.binder.brokers=localhost:9092 spring.rabbitmq.host=localhost
\ No newline at end of file spring.rabbitmq.queue=measureConsumer
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=p@ssword
\ No newline at end of file
...@@ -23,8 +23,3 @@ spring.jackson.visibility.field=any ...@@ -23,8 +23,3 @@ spring.jackson.visibility.field=any
spring.jackson.visibility.getter=none spring.jackson.visibility.getter=none
spring.jackson.visibility.setter=none spring.jackson.visibility.setter=none
spring.jackson.visibility.is-getter=none spring.jackson.visibility.is-getter=none
spring.cloud.function.definition=measureConsumer
#Producer
spring.cloud.stream.bindings.measureProducer.destination=measure
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment