-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathConsumerService.java
42 lines (37 loc) · 1.55 KB
/
ConsumerService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.example.integrationtestspringkafka.service;
import com.example.integrationtestspringkafka.dto.ExampleDTO;
import com.example.integrationtestspringkafka.entity.ExampleEntity;
import com.example.integrationtestspringkafka.repository.ExampleRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
Logger log = LoggerFactory.getLogger(ConsumerService.class);
private ExampleRepository exampleRepository;
ConsumerService(ExampleRepository exampleRepository) {
this.exampleRepository = exampleRepository;
}
/**
* Consume ExampleDTO on topic : TOPIC_EXAMPLE
* Then save it in database.
*
* @param exampleDTO {@link ExampleDTO}
*/
@KafkaListener(topics = "TOPIC_EXAMPLE", groupId = "consumer_example_dto")
public void consumeExampleDTO(ExampleDTO exampleDTO) {
log.info("Received from topic=TOPIC_EXAMPLE ExampleDTO={}", exampleDTO);
exampleRepository.save(convertToExampleEntity(exampleDTO));
log.info("saved in database {}", exampleDTO);
}
/**
* In Java world you should use an Mapper, or an dedicated service to do this.
*/
public ExampleEntity convertToExampleEntity(ExampleDTO exampleDTO) {
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setDescription(exampleDTO.getDescription());
exampleEntity.setName(exampleDTO.getName());
return exampleEntity;
}
}