Gitlab - Argos ALM by PALO IT

Implements rabbit mq with stream

parent 2ef6d35f
...@@ -31,13 +31,9 @@ ...@@ -31,13 +31,9 @@
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId> <artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency> </dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-stream-kafka</artifactId>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-boot-starter-amqp</artifactId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
......
package com.cardiff.pocmeasurekafka.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.queue}")
private String queue;
@Bean
public Queue queue() {
return new Queue(queue, false);
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter);
return template;
}
}
...@@ -2,7 +2,6 @@ package com.cardiff.pocmeasurekafka.in.stream; ...@@ -2,7 +2,6 @@ package com.cardiff.pocmeasurekafka.in.stream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.cardiff.pocmeasurekafka.model.Measure; import com.cardiff.pocmeasurekafka.model.Measure;
...@@ -10,23 +9,19 @@ import com.cardiff.pocmeasurekafka.service.MeasureService; ...@@ -10,23 +9,19 @@ import com.cardiff.pocmeasurekafka.service.MeasureService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.util.function.Consumer;
@Component("measureConsumer") @Component("measureConsumer")
@RequiredArgsConstructor @RequiredArgsConstructor
public class MeasureConsumer { public class MeasureConsumer implements Consumer<Measure> {
private static final Logger log = LoggerFactory.getLogger(MeasureConsumer.class); private static final Logger log = LoggerFactory.getLogger(MeasureConsumer.class);
private final MeasureService measureService; private final MeasureService measureService;
// @Override @Override
// public void accept(Measure measure) { public void accept(Measure measure) {
// measureService.saveMeasure(measure); measureService.saveMeasure(measure);
// log.info("Catch event {}", measure); log.info("Catch event {}", measure);
// }
@RabbitListener(queues = "${spring.rabbitmq.queue}")
public void receiveMessage(Measure message) {
measureService.saveMeasure(message);
log.info("Received message: [{}]", message);
} }
} }
...@@ -18,6 +18,7 @@ public class MeasureService { ...@@ -18,6 +18,7 @@ public class MeasureService {
private final RedisAdapter redisAdapter; private final RedisAdapter redisAdapter;
private final RestAdapter restAdapter; private final RestAdapter restAdapter;
public void saveMeasure(Measure measure){ public void saveMeasure(Measure measure){
log.info("catch measure to save: {}", measure);
String deviceIdRedis = redisAdapter.saveMeasureRedis(measure).deviceId(); String deviceIdRedis = redisAdapter.saveMeasureRedis(measure).deviceId();
log.info("catch deviceID save in redis {}", deviceIdRedis); log.info("catch deviceID save in redis {}", deviceIdRedis);
String deviceIdFromRedis = redisAdapter.getByDeviceId(measure.deviceId()).deviceId(); String deviceIdFromRedis = redisAdapter.getByDeviceId(measure.deviceId()).deviceId();
......
spring.application.name=@project.artifactId@
server.shutdown=graceful
management.server.port=8080
spring.jmx.enabled=false
management.endpoint.health.show-components=always
management.endpoint.health.show-details=always
#server configurations
server.port=8080
#logs
logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]
# Actuator
management.endpoints.jmx.exposure.exclude=*
management.endpoints.web.exposure.include=health,info,metrics,prometheus
management.info.git.enabled=true
management.info.java.enabled=true
management.info.os.enabled=true
# Web
spring.jackson.default-property-inclusion=non_empty
spring.jackson.visibility.field=any
spring.jackson.visibility.getter=none
spring.jackson.visibility.setter=none
spring.jackson.visibility.is-getter=none
# Stream # Stream
#spring.cloud.stream.kafka.binder.brokers=localhost:9092 #spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.cloud.function.definition=measureConsumer
#spring.cloud.stream.kafka.binder.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer #spring.cloud.stream.kafka.binder.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
# RabbitMQ # RabbitMQ
...@@ -38,15 +9,6 @@ spring.rabbitmq.port=5672 ...@@ -38,15 +9,6 @@ spring.rabbitmq.port=5672
spring.rabbitmq.username=user spring.rabbitmq.username=user
spring.rabbitmq.password=p@ssword spring.rabbitmq.password=p@ssword
#Producer
#spring.cloud.stream.bindings.measureProducer.destination=measure
## Consumer
#spring.cloud.stream.bindings.measureConsumer-in-0.destination=measure
#spring.cloud.stream.bindings.measureConsumer-in-0.group=measureGroup
# Redis # Redis
spring.data.redis.host=localhost spring.data.redis.host=localhost
spring.data.redis.port=6379 spring.data.redis.port=6379
......
...@@ -19,27 +19,15 @@ logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:- ...@@ -19,27 +19,15 @@ logging.pattern.level=%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-
# Web # Web
spring.jackson.default-property-inclusion=non_empty spring.jackson.default-property-inclusion=non_empty
spring.jackson.visibility.field=any
spring.jackson.visibility.getter=none
spring.jackson.visibility.setter=none
spring.jackson.visibility.is-getter=none
#Producer
spring.cloud.stream.bindings.measureProducer.destination=measure
# Stream # Stream
#spring.cloud.stream.kafka.binder.brokers=${SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS} spring.cloud.function.definition=measureConsumer
#spring.cloud.function.definition=measureConsumer
#spring.cloud.stream.kafka.binder.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
## Consumer ## Consumer
#spring.cloud.stream.bindings.measureConsumer-in-0.destination=measure spring.cloud.stream.bindings.measureConsumer-in-0.destination=measure
#spring.cloud.stream.bindings.measureConsumer-in-0.group=measureGroup spring.cloud.stream.bindings.measureConsumer-in-0.group=${spring.application.name}
# Redis # Redis
#spring.data.redis.host=localhost
#spring.data.redis.port=6379
#spring.data.redis.password=pa55word
spring.data.redis.jedis.pool.max-active=15 spring.data.redis.jedis.pool.max-active=15
spring.data.redis.jedis.pool.min-idle=3 spring.data.redis.jedis.pool.min-idle=3
spring.cache.redis.time-to-live=1m spring.cache.redis.time-to-live=1m
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment