Synchronous Kafka: Sử dụng Spring Request-Reply

09 tháng 12, 2021 - 4193 lượt xem

Điều đầu tiên mà chúng ta có thể nghĩ đến khi nói về Kafka là 1 hệ thống nhanh và không đồng bộ - asynchronous system. Khái niệm Request-Reply là 1 điều không thường thấy khi chúng ta nói về Kafka. Để có thể tạo nên mô hình Request-Reply, nhà phát triển đã tạo ra 1 hệ thống các ID tương quan trong các bản ghi producer và kết nối chúng trong các bản ghi consumer.

Hình bên dưới là là 1 service minh họa đơn giản để tính tổng của 2 số yêu cầu đồng bộ (synchronous – sử dụng mô hình Request-Reply).

Mô hình Request-Reply

1. Thiết lập Spring ReplyingKafkaTemplate

Class này kế thừa các tính chất của KafkaTemplate để cung cấp mô hình Request-Reply. Để có thể thiết lập các tính chất này, chúng ta cần 1 producer (xem hàm ProducerFactory trong code bên dưới) và KafkaMessageListenerContainer. Đây là 1 thiết lập trực quan bởi cả producer lẫn consumer đều cần thiết cho Request-Reply.

// ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) {
	return new ReplyingKafkaTemplate<>(pf, container);
}

// Listener Container to be set up in ReplyingKafkaTemplate
@Bean
public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {
	ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
	return new KafkaMessageListenerContainer<>(cf, containerProperties);

}

// Default Producer Factory to be used in ReplyingKafkaTemplate
@Bean

public ProducerFactory<String,Model> producerFactory() {
	return new DefaultKafkaProducerFactory<>(producerConfigs());
}

// Standard KafkaProducer settings - specifying brokerand serializer
@Bean
public Map<String, Object> producerConfigs() {
	Map<String, Object> props = new HashMap<>();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
	return props;
}

 

2. Thiết lập Spring - Kafka Listener

Đây là thiết lập tiêu chuẩn của Kafka Listener. Thay đổi duy nhất là bổ sung ReplyTemplate vào. Điều này thực sự cần thiết bởi vì consumer bây giờ sẽ cần gửi kết quả trên reply topic của bản ghi.

// Default Consumer Factory
@Bean
public ConsumerFactory<String, Model> consumerFactory() {
	return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
}

// Concurrent Listner container factory
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	// NOTE - set up of reply template
	factory.setReplyTemplate(kafkaTemplate());
	return factory;
}

// Standard KafkaTemplate
@Bean
public KafkaTemplate<String, Model> kafkaTemplate() {
	return new KafkaTemplate<>(producerFactory());
}

3. Kafka Consumer

Kafka Consumer cũng giống như consumer chúng ta đã tạo trước đó. Thứ duy nhất thay đổi là thêm annotation @SendTo. Annotation này trả về kết quả trên reply topic.

@KafkaListener(topics = "${kafka.topic.request-topic}")
@SendTo
public Model listen(Model request) throws InterruptedException {
	int sum = request.getFirstNumber() + request.getSecondNumber();
	request.setAdditionalProperty("sum", sum);
	return request;
}

4. Sum Service

Bây giờ chúng ta sẽ kết hợp chúng lại với nhau. Ở dòng 15, tôi in tất cả header. Bạn có thể thấy rằng Spring tự động đặt các ID tương quan trong bản ghi của producer. ID này được trả về nguyên trạng bởi annotation @SendTo ở consumer cuối.

@ResponseBody
@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)
public Model sum(@RequestBody Model request) throws InterruptedException, ExecutionException {
	// create producer record
	ProducerRecord<String, Model> record = new ProducerRecord<String, Model>(requestTopic, request);
	// set reply topic in header
	record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
	// post in kafka topic
	RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);
	// confirm if producer produced successfully
	SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get();
	//print all headers
	sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
	// get consumer record
	ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get();
	// return consumer value
	return consumerRecord.value();
}

5. Concurrent Consumers

Các tính chất của Request-Reply là nhất quán ngay cả khi bạn đã tạo, chẳng hạn như 3 phân vùng của request topic và đặt chúng đồng thời trong consumer. Các phản hồi từ 3 consumer vẫn đi đến 1 reply topic duy nhất. Bộ chứa ở đầu nghe – Listening, có thể thực hiện công việc phù hợp với các ID tương quan. Có nghĩa là chúng sẽ được phân biệt với nhau dựa theo các ID. Với request topic có ID là A sẽ nhận được phản hồi tương ứng với ID là A ở reply topic.

6. Các thiết lập khác.

Ngoài ra, nếu bạn đang tìm kiếm 1 thiết lập Kafka nhanh chóng trên thiết bị local của mình, t khuyên bạn nên sử dụng Kafka bằng Docker. Đây là lệnh sẽ khởi chạy Kafka để kiểm thử trên local.

docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev:latest

Nếu bạn thích giao diện người dùng trên Kafka được khởi chạy bằng Docker, thì hãy chạy câu lệnh Docker này và bạn sẽ có giao diện người dùng chạy ở cổng 8000.

docker run --rm -it -p 8000:8000 -e "KAFKA_REST_PROXY_URL=http://localhost:8082" landoop/kafka-topics-ui

Happy coding!

Bình luận

avatar
Trịnh Minh Cường 2021-12-09 04:54:52.615213 +0000 UTC

Bài rất hay nhưng source code ví dụ ở đâu bạn ơi.

Avatar
* Vui lòng trước khi bình luận.
Ảnh đại diện
  +2 Thích
+2