Apache Kafka là một dự án mã nguồn mở tương đối thú vị dành cho streaming dữ liệu và đang được sử dụng tương đối rộng rãi. Khác với các framework message queue thì Kafka không lưu tất cả dữ liệu trên bộ nhớ mà nó lưu trong ổ cứng, chính vì vậy nó có khả năng đáp ứng một lượng dữ liệu rất lớn. Trong bài này Dũng sẽ cùng các bạn đi tìm hiểu về spring boot và Kafka nhé.

Mô hình hoạt động

Đầu tiên chúng ta hãy tìm hiểu một chút về mô hình hoạt động của kafka

  1. Producer: Là các kết nối từ phía các ứng dụng đẩy dữ liệu vào cho Kafka cluster.
  2. Kafka cluster: Là một cụp các kafka broker được quản lý trạng thái bởi zookeeper.
  3. Broker: Là một phần máy chủ nhận và quản lý dữ liệu được đẩy vào từ producer và cho phép các consumer lấy ra dữ liệu. Khi producer và consumer khởi tạo kết nối nó sẽ kết nối đến một broker gọi là bootstrap và sau đó lấy toàn bộ thông tin của các broker còn lại.
  4. Zookeeper: Là một phần mềm máy chủ để quản lý trạng thái của các broker.
  5. Consumer: Là các kết nối từ các ứng dụng để lấy và tiêu thụ dữ liệu.

Khởi tạo module

Chúng ta sẽ khởi tạo module có tên spring-boot-kafka.

Chuẩn bị

Để cho đơn giản chúng ta sẽ sử dụng docker để cài đặt kafka và khởi động nó thông qua các câu lệnh sau:

docker pull apache/kafka
docker run --name=kafka -p 9092:9092 -d apache/kafka:latest

Một kafka server sẽ được khởi tạo và expose cổng 9092.
Lưu ý rằng chúng ta đang sử dụng cho mục đích học tập nên sử dụng docker với Kafka, trong môi trường dự án thực tế có thể kafka sẽ được cài theo một cách khác sử dụng vps hoặc máy chủ riêng biệt.

Cấu hình dự án

Chúng ta sẽ cần thay đổi tập tin spring-boot-kafka/pom.xml với mã nguồn như sau:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>vn.techmaster</groupId>
        <artifactId>mastering-spring-boot</artifactId>
        <version>1.0.0</version>
    </parent>

    <artifactId>spring-boot-kafka</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>

Ở đây chúng bổ sung thư viện spring-kafka để giao tiếp với máy chủ apache kafka.

Cấu hình producer

Để có thể tạo ra đối tượng producer để kết nối và gửi được dữ liệu đến kafka broker chúng ta sẽ cần tạo ra lớp KafkaProducerConfig với mã nguồn như sau:

package vn.techmaster.kafka.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Như đã nói ở trên, chúng ta cần chỉ định kafka broker boostrap để producer kết nối. Bởi vì dữ liệu gửi nhận qua mạng là byte bit, nên chúng ta cũng cần quy định đối tượng chuyển đổi dữ liệu từ đối tượng java sang byte array và ngược lại, để cho đơn giản chúng ta sẽ sử dụng StringSerializer.
Kết quả mà chúng ta cần đó là đối tượng singleton kafkaTemplate để gửi dữ diệu đến kafka broker.
Tiếp theo chúng ta có thể tạo ra một lớp chuyên để gửi dữ liệu thế này:

package vn.techmaster.kafka.producer;

import lombok.AllArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@AllArgsConstructor
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

Cấu hình consumer

Để có thể tiêu thụ được dữ liệu từ kafka broker chúng ta sẽ cần chỉ định kafka broker bootrstrap để kết nối, quy định lớp để chuyển đổi dữ liệu giữ java object và byte array, ở đây chúng ta vẫn sẽ dùng StringDeserializer cho đơn giản.

package vn.techmaster.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "techmaster");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Tiếp theo chúng ta có thể tạo ra một lớp để tiêu thụ dữ liệu như sau:

package vn.techmaster.kafka.producer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

    @KafkaListener(topics = "techmaster", groupId = "techmaster")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

Spring sẽ tự khởi tạo đối tượng singleton messageConsumer cho chúng ta và tự động để đăng ký làm lớp tiêu thụ dữ liệu trên topic techmaster và groupId techmaster.

Khởi chạy chương trình

Để khởi chạy chương trình chúng ta sẽ cần tạo ra lớp SpringBootKafkaStartUp với mã nguồn như sau:

package vn.techmaster.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import vn.techmaster.kafka.producer.MessageProducer;

@SpringBootApplication
public class SpringBootKafkaStartUp {

    public static void main(String[] args) {
        ApplicationContext applicationContext = SpringApplication
            .run(SpringBootKafkaStartUp.class);
        MessageProducer messageProducer = applicationContext
            .getBean(MessageProducer.class);
        messageProducer.sendMessage("techmaster", "Hello Techmaster");
    }
}

Ở đây chúng ta lấy ra đối tượng messageProducer để gửi đến kafka broker dữ liệu “Hello Techmaster” theo topic là “techmaster”.
Consumer khi gọi đến kafka sẽ lấy được dữ liệu trong topic “techmaster” và xử lý. kết quả chúng ta nhận được là:

// bỏ qua các log trước đó
2024-07-05T14:14:16.116+07:00  INFO 3962 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : techmaster: partitions assigned: [techmaster-0]
Received message: Hello Techmaster

Tổng kết

Như vậy chúng ta đã cùng nhau tìm hiểu spring boot kafka, tìm hiểu cách để tạo ra producer, consumer và chạy thử thành công chương trình.


Cám ơn bạn đã quan tâm đến bài viết|video này. Để nhận được thêm các kiến thức bổ ích bạn có thể:

  1. Đọc các bài viết của TechMaster trên facebook: https://www.facebook.com/techmastervn
  2. Xem các video của TechMaster qua Youtube: https://www.youtube.com/@TechMasterVietnam nếu bạn thấy video/bài viết hay bạn có thể theo dõi kênh của TechMaster để nhận được thông báo về các video mới nhất nhé.
  3. Chat với techmaster qua Discord: https://discord.gg/yQjRTFXb7a