Tôi kể bạn nghe, kể từ phiên bản 3.6, MongoDB có một tính năng rất “cool” gọi là “Change stream”. Nó cho phép các ứng dụng truy cập vào luồng thay đổi dữ liệu theo thời gian thực mà không gây tắc nghẽn oplog. Các ứng dụng có thể sử dụng change stream để subscribe các thay đổi trên một collection, thậm chí một database để ngay lập tức thực hiện tương tác khi có “biến”. Ừm, để dễ hiểu thì bạn cứ tưởng tượng Change stream như đứa em hay mách lẻo ở nhà vậy. Nó được bố mẹ giao giám sát nhất cử nhất động của bạn. Bạn vừa lén đập lợn để lấy tiền “quẩy” game hay lỡ tay làm vỡ cái bình quí giá thì ngay lập tức bố mẹ bạn biết được và có biện pháp xử lí. Sẽ thật là một tuổi thơ “đầy tang thương” nếu có những đứa em như vậy. Nhưng ngược lại, với các bậc làm cha làm mẹ thì đứa em đó lại là cánh tay phải đắc lực. Change stream cũng vậy. Với các ứng dụng đòi hỏi thời gian thực thì nó chính là một giải pháp hữu hiệu. Thôi, hãy tạm gác câu chuyện muôn đời về những đứa em quái chiêu lại để cùng tìm hiểu về Change stream nào.

Change stream hoạt động như thế nào?

Change stream hoạt động dựa trên oplog (operations log). Oplog là một tập hợp các bản ghi của hệ thống, lưu trữ mọi hoạt động sửa đổi trong cơ sở dữ liệu. Nó giống như quyển nhật kí hằng ngày của bạn. Oplog được sinh ra để hỗ trợ tính năng sao chép (replication) trong MongoDB, giúp một bản sao (replica) đồng bộ hóa với master. Do oplog được sử dụng trong xây dựng một tập các bản sao (replica set) nên không thể sử dụng Change stream trên một máy chủ MongoDB độc lập. Change stream chỉ khả dụng trong một tập các bản sao (replica set) hoặc các cụm được phân đoạn (sharded clusters).

Tạo replica set

Bây giờ chúng ta sẽ tạo một replica set để khám phá tính năng Change stream. Như đã nói ở trên, replica set là một tập hợp mongod. Tuy nhiên, bạn có thể tạo replica set với chỉ một mongod. Trong bài viết này, tôi sẽ làm như vậy. Hẹn các bạn ở các bài tiếp theo, tôi sẽ giải thích chi tiết hơn về replica set và cách cài đặt chứa nhiều mongod.

Tôi đã đề cập đến cách cài đặt MongoDB thông qua docker compose ở bài “Thao tác với MongoDB trong Go”. Các bạn có thể xem lại để được giải thích rõ hơn. Trong bài này, tôi sẽ tiếp tục sử dụng file docker-compose.yml ở bài trước với một số chỉnh sửa để tạo replica set.

version: '3'
services:
  mongodb:
    image: mongo:latest
    restart: always
    container_name: mongodb
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: 1234
      MONGO_INITDB_DATABASE: test_db
    ports: 
      - "27017:27017"
    volumes: 
      - ./initMongo/:/docker-entrypoint-initdb.d/
    hostname: "127.0.0.1"
    command: mongod --smallfiles --replSet mongo-rs

Như các bạn thấy, tôi đã bổ sung thêm khai báo hostname (ở đây là địa chỉ localhost máy tôi) và câu lệnh sẽ được thực thi khi khởi tạo mongod. Mongod này sẽ nằm trong một replica set có tên là “mongo-rs”. Thực hiện build file docker-compose cho ta thấy kết qủa “Sessions collection is not set up; waiting untilnext sessions reap interval: Replication has not yet been configured”.

docker-compose up
Kết quả
Kết quả 

Đây không phải là lỗi đâu nhé. Trong lần đầu khởi tạo replica set, bạn phải có thêm một bước cho phép các replica có thể “giao tiếp” với nhau.

Đầu tiên, các bạn truy cập vào container mongodb chúng ta đã tạo

docker exec -it mongodb bash

Đăng nhập vào mongodb với tài khoản root đã khai báo trong file docker-compose.yml

mongo -u “root” -p “1234” --authenticationDatabase “admin”

Truy cập vào database admin

use admin

Với replica set chỉ có một mongod chúng ta chỉ cần sử dụng lệnh rs.initiate(). Trong trường hợp có nhiều hơn mongod, cần khai báo cụ thể danh sách mongod. Cách này tôi sẽ đề cập sau.

rs.initiate()

Như vậy là chúng ta đã tạo thành công replica set chứa một mongod. Để kiểm tra các bạn gõ lệnh.

rs.conf()

Bắt sự kiện Change stream trong Go

Giờ chúng ta sẽ viết một chương trình Go để bắt sự kiện Change stream.

Trong bài trước tôi có hướng dẫn các bạn sử dụng package mgo. Tuy nhiên package này hiện chưa hỗ trợ tính năng Change stream của MongoDB. Do vậy ở bài này chúng ta sẽ sử dụng một package khác là “mongo-go-driver”. Các bạn nhớ sử dụng go get để lấy package này về nhé.

go get "github.com/mongodb/mongo-go-driver/mongo"

Mã nguồn chương trình

package main

import (
	"context"
	"log"

	"github.com/mongodb/mongo-go-driver/bson/objectid"
	"github.com/mongodb/mongo-go-driver/mongo"
)

const (
	database       = "test_db"
	collection     = "post"
	replicaSetName = "mongo-rs"
)

type Post struct {
	Tile    string `json:"tile" bson:"tile"`
	Content string `json:"content" bson:"content"`
}
type IDELem struct {
	Data string `json:"data" bson:"_data"`
}
type NSELem struct {
	DB   string `json:"db" bson:"db"`
	Coll string `json:"coll" bson:"coll"`
}
type DocumentKeyElem struct {
	ID objectid.ObjectID `json:"id" bson:"_id"`
}
type CSElem struct {
	ID            IDELem          `json:"id" bson:"_id"`
	OperationType string          `json:"operationType" bson:"operationType"`
	FullDocument  Post            `json:"fullDocument" bson:"fullDocument"`
	NS            NSELem          `json:"ns" bson:"ns"`
	DocumentKey   DocumentKeyElem `json:"documentKey" bson:"documentKey"`
}

func main() {
	// Kết nối với MongoDB
	client, err := mongo.NewClient("mongodb://user1:example@localhost:27017/test_db")
	if err != nil {
		log.Fatal(err)
	}
	err = client.Connect(context.TODO())
	if err != nil {
		log.Fatal(err)
	}
	collection := client.Database(database).Collection(collection)
	ctx := context.Background()

	var pipeline interface{}

	// Theo dõi sự kiện Change stream
	cur, err := collection.Watch(ctx, pipeline)
	if err != nil {
		return
	}
	defer cur.Close(ctx)

	for cur.Next(ctx) {
		elem := CSElem{}
		// Parse json
		if err := cur.Decode(&elem); err != nil {
			log.Fatal(err)
		}

		// In ra màn hình sự kiện change stream
		log.Println(elem)
	}
	if err := cur.Err(); err != nil {
		log.Fatal(err)
	}
}

Đầu tiên, chúng ta vẫn tạo kết nối đến MongoDB. Trong “mongo-go-driver”, các bạn sử dụng cú pháp mongo.NewClient(url) để kết nối. Trong đó, cụ thể:

url = “mongodb://[your_username]:[your_password]@[hostname]/[database]”

Ở đây địa chỉ kết nối của tôi sẽ là "mongodb://user1:example@localhost:27017/test_db"

Để bắt sự kiện Change stream các bạn sử dụng hàm Watch(). Các bạn có thể lập trình chi tiết, bắt tất cả các sự kiên xảy ra hay chỉ bắt một sự kiện ví dụ như “insert” thôi. Ở đây tôi sẽ bắt tất cả sự kiện, sau đó in ra màn hình. Để tạo sự kiện, tôi sẽ tạo một file go khác thực hiện việc insert dữ liệu vào MongoDB.

package main

import (
	"context"
	"log"

	"github.com/mongodb/mongo-go-driver/mongo"
)

const (
	database       = "test_db"
	collection     = "post"
)
type Post struct {
	Tile    string
	Content string
}

func main() {
	client, err := mongo.NewClient("mongodb://user1:example@localhost:27017/test_db")
	if err != nil {
		log.Fatal(err)
	}
	err = client.Connect(context.TODO())
	if err != nil {
		log.Fatal(err)
	}
	collection := client.Database(database).Collection(collection)
	res, err := collection.InsertOne(context.Background(), Post{"Greeter", "Hello Change stream. You are awesome!"})
	if err != nil { log.Fatal(err) }
	id := res.InsertedID
	log.Println(id)
}

Run file bắt sự kiện Change stream trước, sau đó chạy file test insert dữ liệu. Cùng xem kết quả nhé.

Cấu trúc của một sự kiện Change Stream như cấu trúc CSElem tôi đã khai báo. Bài viết này tôi dừng lại tại đây. Bài sau tôi sẽ chia sẻ cách làm một ứng dụng realtime sử dụng Go, Change stream của MongoDB và Websocket. Hẹn gặp lại các ban. ^.^

Mã nguồn trong bài viết này các bạn có thể tham khảo tại đây.