Điều đầu tiên mà chúng ta có thể nghĩ đến khi nói về Kafka là 1 hệ thống nhanh và không đồng bộ - asynchronous system. Khái niệm Request-Reply là 1 điều không thường thấy khi chúng ta nói về Kafka. Để có thể tạo nên mô hình Request-Reply, nhà phát triển đã tạo ra 1 hệ thống các ID tương quan trong các bản ghi producer và kết nối chúng trong các bản ghi consumer.
Hình bên dưới là là 1 service minh họa đơn giản để tính tổng của 2 số yêu cầu đồng bộ (synchronous – sử dụng mô hình Request-Reply).
1. Thiết lập Spring ReplyingKafkaTemplate
Class này kế thừa các tính chất của KafkaTemplate để cung cấp mô hình Request-Reply. Để có thể thiết lập các tính chất này, chúng ta cần 1 producer (xem hàm ProducerFactory trong code bên dưới) và KafkaMessageListenerContainer. Đây là 1 thiết lập trực quan bởi cả producer lẫn consumer đều cần thiết cho Request-Reply.
// ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
// Listener Container to be set up in ReplyingKafkaTemplate
@Bean
public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
// Default Producer Factory to be used in ReplyingKafkaTemplate
@Bean
public ProducerFactory<String,Model> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
// Standard KafkaProducer settings - specifying brokerand serializer
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
2. Thiết lập Spring - Kafka Listener
Đây là thiết lập tiêu chuẩn của Kafka Listener. Thay đổi duy nhất là bổ sung ReplyTemplate vào. Điều này thực sự cần thiết bởi vì consumer bây giờ sẽ cần gửi kết quả trên reply topic của bản ghi.
// Default Consumer Factory
@Bean
public ConsumerFactory<String, Model> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
}
// Concurrent Listner container factory
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// NOTE - set up of reply template
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
// Standard KafkaTemplate
@Bean
public KafkaTemplate<String, Model> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
3. Kafka Consumer
Kafka Consumer cũng giống như consumer chúng ta đã tạo trước đó. Thứ duy nhất thay đổi là thêm annotation @SendTo. Annotation này trả về kết quả trên reply topic.
@KafkaListener(topics = "${kafka.topic.request-topic}")
@SendTo
public Model listen(Model request) throws InterruptedException {
int sum = request.getFirstNumber() + request.getSecondNumber();
request.setAdditionalProperty("sum", sum);
return request;
}
4. Sum Service
Bây giờ chúng ta sẽ kết hợp chúng lại với nhau. Ở dòng 15, tôi in tất cả header. Bạn có thể thấy rằng Spring tự động đặt các ID tương quan trong bản ghi của producer. ID này được trả về nguyên trạng bởi annotation @SendTo ở consumer cuối.
@ResponseBody
@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)
public Model sum(@RequestBody Model request) throws InterruptedException, ExecutionException {
// create producer record
ProducerRecord<String, Model> record = new ProducerRecord<String, Model>(requestTopic, request);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post in kafka topic
RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);
// confirm if producer produced successfully
SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get();
//print all headers
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
// get consumer record
ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get();
// return consumer value
return consumerRecord.value();
}
5. Concurrent Consumers
Các tính chất của Request-Reply là nhất quán ngay cả khi bạn đã tạo, chẳng hạn như 3 phân vùng của request topic và đặt chúng đồng thời trong consumer. Các phản hồi từ 3 consumer vẫn đi đến 1 reply topic duy nhất. Bộ chứa ở đầu nghe – Listening, có thể thực hiện công việc phù hợp với các ID tương quan. Có nghĩa là chúng sẽ được phân biệt với nhau dựa theo các ID. Với request topic có ID là A sẽ nhận được phản hồi tương ứng với ID là A ở reply topic.
6. Các thiết lập khác.
Ngoài ra, nếu bạn đang tìm kiếm 1 thiết lập Kafka nhanh chóng trên thiết bị local của mình, t khuyên bạn nên sử dụng Kafka bằng Docker. Đây là lệnh sẽ khởi chạy Kafka để kiểm thử trên local.
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev:latest
Nếu bạn thích giao diện người dùng trên Kafka được khởi chạy bằng Docker, thì hãy chạy câu lệnh Docker này và bạn sẽ có giao diện người dùng chạy ở cổng 8000.
docker run --rm -it -p 8000:8000 -e "KAFKA_REST_PROXY_URL=http://localhost:8082" landoop/kafka-topics-ui
Happy coding!
Bình luận
Bài rất hay nhưng source code ví dụ ở đâu bạn ơi.