Hôm nay là bài cuối cùng trong series tổng quan về Kafka trên Group, trước khi xuống nội dung, mình gửi tặng mọi người cuốn Kafka Definitive guide. Đây không phải sách lậu đâu nhé mà là tài liệu free từ confluence mình share lại thôi. Các bạn tải sách tại đây
Sau 5 bài tràn ngập lý thuyết và các mô hình, kiến trúc Kafka, bài hôm nay mình sẽ hướng dẫn các bạn setup 1 cluster Kafka đơn giản và thực hành thử việc tạo topic, đọc ghi dữ liệu để các bạn hình dung cụ thể về Producer, Consumer… và cách kafka hoạt động.
Mình đã setup 1 máy ảo Virtual Box có sẵn Kafka và Zookeeper, mọi người có thể tải file ova tại đây, sau đó sử dụng Virtual Box để load lên chạy thử luôn không cần setup gì thêm.
Các thông tin quan trọng:
User/ pass OS: root/1
Thư mục ứng dụng: /home/app
Trong đây đã có sẵn Kafka cũng như code demo.
Sau khi khởi động, các bạn chạy lệnh sau để bật Kafka:
cd /home/app/kafka_2.11–2.4.1
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup ./bin/kafka-server-start.sh config/server.properties &
Các bạn nhớ phải bật zookeeper trước sau đó mới bật được kafka nhé, các bài trước mình đã trình bày về vai trò của Zk trong hệ thống Kafka rồi các bạn có thể đọc lại.
Chạy các lệnh xong, kiểm tra lại tiến trình bằng lệnh jps -m
Mặc định, ZK sẽ chạy trên port 2181 và kafka là 9092 nhé. Bắt đầu thử 1 vài lệnh cơ bản thôi.
Các bạn có thể tham khảo thêm các lệnh kafka thường dùng tại link này:
Your first Kafka topic
Tạo topic:
./bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 2 — topic test
Ở đây mình tạo topic mới, có tên là test, với 2 partition và replication = 1, vì cụm kafka test chỉ có 1 broker nên không thể replicate nhiều hơn được, trong thực tế các bạn nên để là 3 nhé.
Xem thông tin topic
./bin/kafka-topics.sh — describe — zookeeper localhost:2181 — topic test
Kiểm tra offset topic:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell — broker-list localhost:9092 — partitions 0,1 — time -1 — topic test
Reading data
Kafka cung cấp sẵn cả producer-console và consumer-console, ta chỉ cần chạy là có thể thử nghiệm đọc ghi dữ liệu được ngay.
Start consumer
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test
Nếu muốn đọc dữ liệu từ đầu thì thêm option — from-beginning, lệnh bên trên chỉ đọc dữ liệu mới từ lúc start consumer, như sau:
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test — from-beginning
Start producer:
./bin/kafka-console-producer.sh — broker-list localhost:9092 — topic test
Để thể hiện tính liên tục dữ liệu, các bạn có thể mở 2 tab producer và console song song.
Khi gõ ở tab producer, dữ liệu gần như sẽ hiển thị ngay lập tức bên tab consumer
Bạn cũng có thể gửi nội dung 1 file đến 1 topic như sau:
./bin/kafka-console-producer.sh — broker-list localhost:9092 — topic test < config/log4j.properties
Mình thử gửi cả file config log4j mặc định lên topic test và mỗi dòng trong file sẽ là 1 bản ghi trên Kafka.
Zookeeper
Muốn nhìn sâu hơn vào kafka, hãy thử đọc 1 số thông tin trên zookeeper nhé.
Đầu tiên, list các node trên root path của zookeeper xem chúng ta có gì nào:
Ngoại trừ node zookeeper (mình bôi vàng), tất cả các node khác đều là kafka sinh ra.
Node /brokers/topics chứa tên tất cả các topic được quản lý bởi Kafka, thử list sâu vào trong, tới node /brokers/topics/test/partitions, zookeeper trả ra 2 node con là 0 và 1, chính là cấu hình 2 partition của topic ta tạo lúc đầu.
Nếu get thông tin của partition 0 ra, các bạn sẽ thấy các thông tin như leader, isr… rất giống với câu lệnh describe topic đúng không nào. Đó chính là cách Kafka sử dụng Zookeeper để quản lý cấu hình trong cụm nhiều Brokers
Kafka API for Java client
Đây là 1 project sample producer và consumer mình viết bằng Java, các bạn có thể xem trên Github tại đây:
https://github.com/karcuta/kafka-sample
Code mình đã build gói jar sẵn ở thư mục /home/app/demo-kafka-vnbdc.jar
Giả sử ta cần đọc ghi dữ liệu các giao dịch mua điện thoại của khách hàng, tạo 1 topic mới là cellphones chứa thông tin các đơn hàng điện thoại.
Chạy lần lượt console-consumer, simple consoler và simple producer để check kết quả, các câu lệnh như sau|:
- Kafka consumer console: ./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic cellphones
- Simple console: java -cp /home/app/demo-kafka-vnbdc.jar com.vnbdc.demo.kafka.SimpleConsumer celhones
- Simple producer: java -cp /home/app/demo-kafka-vnbdc.jar com.vnbdc.demo.kafka.SimpleProducer cellphones
Ở simple consumer mình có in thêm ra thông tin partition, offset và key của bản ghi trong khi consumer console mặc định chỉ in ra value. Có thể thấy rằng kafka sẽ chia dữ liệu mặc định theo kiểu round-robin, 5 bản ghi chia đều 2 partitions.
Trong dữ liệu nguồn có 5 bản ghi thì 1 bản ghi là khách hàng order từ App và 4 đơn hàng từ web. Nhưng do chia điều nên partition 0 đang chứa cả bản ghi web và app.
Vậy giờ mình muốn các bản ghi giao dịch web sẽ đẩy vào 1 partition và giao dịch app đẩy vào 1 partition thì phải làm thế nào?
Các bạn thử clone code trên git hub về và custom lại producer xem có được không nhé, đây cũng như 1 bài thực hành nhỏ để kết thúc series về kafka của mình.
Nếu chạy được các bạn có thể gửi Pull request lên project của mình để các bạn khác tham khảo :)
Cám ơn mọi người đã theo dõi 5 bài vừa rồi trong series của mình
Link bài viết gốc tại đây
Bài viết đăng tải lại dưới sự cho phép của tác giả : thầy Nguyễn Chí Thanh là giảng viên khoá Big Data tại Techmaster
Bình luận