Bài viết áp dụng cho hệ điều hành Ubuntu

1. Cài đặt Docker

Đầu tiên chạy 4 lệnh sau:

sudo apt-get update

sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg-agent \
    software-properties-common

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

sudo add-apt-repository \
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"

Sau đó chạy tiếp:

sudo apt-get update

sudo apt-get install docker-ce docker-ce-cli containerd.io

Sau đó, kiểm tra xem đã cài được Docker chưa bằng lệnh:

sudo docker run hello-world

Cuối cùng, chạy các lệnh sau để có thể chạy các lệnh docker mà không cần sudo:

sudo groupadd docker

sudo usermod -aG docker $USER

Nhớ tắt terminal đi rồi bật lại, sau đó chạy lại lệnh sau để kiểm tra:

docker run hello-world

 

2. Cài đặt docker-compose

Lần lượt chạy từng lệnh sau:

sudo curl -L "https://github.com/docker/compose/releases/download/1.26.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

sudo chmod +x /usr/local/bin/docker-compose

Kiểm tra đã cài thành công chưa bằng lệnh: docker-compose --version

 

3. Tạo file docker-compose-sqlserver.yaml

Tạo file docker-compose-sqlserver.yaml bằng lệnh: touch docker-compose-sqlserver.yaml

Sau đó mở trình soạn thảo nano (hoặc vim), copy paste nội dung sau:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  sqlserver:
    image: microsoft/mssql-server-linux:2017-CU9-GDR2
    ports:
     - 1433:1433
    environment:
     - ACCEPT_EULA=Y
     - MSSQL_PID=Standard
     - SA_PASSWORD=Password!
     - MSSQL_AGENT_ENABLED=true
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - sqlserver
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

 

4. Khởi chạy docker-compose

Lần lượt chạy các lệnh sau:

export DEBEZIUM_VERSION=1.1

docker-compose -f docker-compose-sqlserver.yaml up -d

Kiểm tra xem các docker container đã chạy hay chưa bằng lệnh: docker ps

Các container đã chạy
Các container đã chạy

5. Mockup dữ liệu

Đầu tiên, tạo file inventory.sql với nội dung như sau:

-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;

-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
  id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);
INSERT INTO products(name,description,weight)
  VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
  VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
  VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
  VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
  VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
  VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
  VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
  VALUES ('jacket','water resistent black wind breaker',0.1);
INSERT INTO products(name,description,weight)
  VALUES ('spare tire','24 inch spare tire',22.2);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
-- Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
  product_id INTEGER NOT NULL PRIMARY KEY,
  quantity INTEGER NOT NULL,
  FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products_on_hand', @role_name = NULL, @supports_net_changes = 0;
-- Create some customers ...
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
-- Create some very simple orders
CREATE TABLE orders (
  id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
  order_date DATE NOT NULL,
  purchaser INTEGER NOT NULL,
  quantity INTEGER NOT NULL,
  product_id INTEGER NOT NULL,
  FOREIGN KEY (purchaser) REFERENCES customers(id),
  FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
  VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
  VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
  VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
  VALUES ('21-FEB-2016', 1003, 1, 107);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO

Sau đó, sử dụng lệnh docker ps để lấy ra được ID của container chạy SQL Server:

ID của container chạy SQL Server
ID của container chạy SQL Server

Sau đó, chạy lệnh sau để mockup dữ liệu test:

cat inventory.sql | docker exec -i 63bfd9e2cfe5 bash -c '/opt/mssql-tools/bin/sqlcmd -U
 sa -P $SA_PASSWORD'

Lưu ý: 63bfd9e2cfe5 là ID của container chạy SQL Server trên máy mình, còn trên máy các bạn sẽ là một giá trị khác, các bạn nhớ thay ID cho đúng nhé

 

6. Kết nối Debezium với SQL Server

Tạo file register-sqlserver.json với nội dung sau:

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.hostname" : "sqlserver",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.dbname" : "testDB",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

Sau đó chạy lệnh:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
Tạo kết nối thành công
Tạo kết nối thành công

 7. Lắng nghe message từ tất cả các topic

Chạy lệnh sau:

docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --whitelist '.*'

 

8. INSERT - UPDATE - DELETE dữ liệu trong SQL Server và kiểm tra xem có nhận được message không

Mở một terminal khác, sau đó chạy lệnh sau:

docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'

Thử INSERT bản ghi mới vào bảng products bằng cách lần lượt chạy 2 lệnh sau (nhớ là phải chạy lệnh GO nhé):

INSERT INTO products(name,description,weight) VALUES ('Marcus Rashford','Striker', 70);
GO
INSERT thành công vào bảng products
INSERT thành công vào bảng products

Sau khi INSERT xong, mở terminal còn lại ( đang chạy Debezium ) lên để kiểm tra xem có nhận được message thông báo thay đổi dữ liệu trong bảng không:

Debezium nhận được message thông báo có bản ghi mới
Debezium nhận được message thông báo có bản ghi mới

Thử INSERT bản ghi khác vào bảng customers:

INSERT INTO customers(first_name,last_name,email) VALUES ('Alex','Ferguson','ferguson@manutd.com');
GO

 

INSERT thành công bản ghi vào bảng customers
INSERT thành công bản ghi vào bảng customers

Kiểm tra xem có nhận được message không:

Nhận được message thông báo INSERT thành công
Nhận được message thông báo INSERT thành công

 Thử DELETE bản ghi vừa thêm mới vào bảng products xem sao:

DELETE from products WHERE name = 'Marcus Rashford';
GO
DELETE bản ghi khỏi bảng products
DELETE bản ghi khỏi bảng products

Kiểm tra xem có nhận được message không:

Nhận được message thông báo xóa bản ghi khỏi bảng products
Nhận được message thông báo xóa bản ghi khỏi bảng products

Như vậy là chúng ta đã cấu hình xong Debezium kết nối tới SQL Server sử dụng Docker. Chúc các bạn thành công

Đừng quên tham khảo các khóa học hay tại Techmaster nhé ✌️✌️✌️✌️✌️✌️