Bài viết được dịch từ bài: Building an Event Storage

1. Cấu trúc

Event Storage trong hệ quản trị cơ sở dữ liệu quan hệ (RDBMS) gồm 2 bảng: Event Log và Aggregate

Bảng Event Log
Tên cột Kiểu dữ liệu
AggregateID GUID
Data Blob
Version Int

 

Bảng Aggregate
Tên cột Kiểu dữ liệu
AggregateID GUID
Type Varchar
Version Int

Bảng Event Log:

  • Mỗi sự kiện sẽ tương ứng với một bản ghi, trong đó dữ liệu của sự kiện sẽ được lưu ở cột Data. Các bản ghi này sẽ được INSERT theo trình tự thời gian diễn ra sự kiện.
  • Các cột AggregateID, Data và Version là những cột bắt buộc phải có trong bảng Event Log. Ngoài ra, tùy vào yêu cầu nghiệp vụ mà bảng Event Log có thể thêm các cột khác, ví dụ: Thời gian xảy ra sự kiện, ID của user tạo ra sự kiện, địa chỉ IP của nơi xảy ra sự kiện, ...
  • Mỗi event trong bảng Event Log được đánh số Version. Trong phạm vi của từng AggregateID, số Version là độc nhất và được đánh tăng dần.
  • Cột AggregateID là foreign key trỏ đến cột AggregateID trong bảng Aggregate

Bảng Aggregate:

  • Lưu trữ danh sách các aggregate (đối tượng gây ra sự kiện) trong hệ thống
  • Mỗi aggregate đi kèm với Version hiện tại của aggregate đó. Version này được sử dụng để check concurrency conflict.
  • Cột Type: lưu kiểu của aggregate

2. Hoạt động

Các bảng Event Log, Aggregate hỗ trợ 2 hoạt động chính:

  • SELECT ra tất cả các sự kiện diễn ra đối với một AggregateID. Khi tiến hành SELECT cần đảm bảo các bản ghi được lấy ra theo đúng trình tự thời gian các sự kiện diễn ra trong hệ thống
SELECT * FROM EVENT_LOG WHERE AGGREGATEID = '@ID' ORDER BY VERSION
  • INSERT các sự kiện diễn ra đối với một aggregate vào bảng EVENT_LOG. Quá trình INSERT có thể được hiện trong application code hoặc trong Stored Procedure. Nếu dùng Stored Procedure các bạn có thể tham khảo đoạn pseudo-code sau:
BEGIN
  version = SELECT VERSION FROM AGGREGATES WHERE AGGREGATEID = "@ID"
  IF version is null
    INSERT INTO AGGREGATES
    VERSION = 0
  END
  IF your_version != version
    raise concurrency_problem
  foreach event
    insert event with incremented version number
  update aggregate with last version number
END TRANSACTION
  • Với đoạn lệnh Stored Procedure trên, trước tiên hệ thống sẽ check xem trong bảng AGGREGATES có tồn tại aggregate nào với ID bạn muốn tìm không. Nếu như không có, hệ thống sẽ INSERT một bản ghi mới vào bảng AGGREGATE với version = 0. Sau đó, hệ thống sẽ tiến hành kiểm tra concurrency conflict: Nếu như version của aggregate mà bạn đang update khác với version hiện tại của aggregate trong bảng AGGREGATES, khi đó hệ thống sẽ thông báo lỗi concurrency conflict. Nếu 2 version khớp nhau, hệ thống sẽ chạy vòng lặp qua các events và INSERT từng event vào bảng EVENT_LOG, mỗi event sẽ có một version độc nhất và tăng dần. Cuối cùng, hệ thống sẽ cập nhật version của aggregate trong bảng AGGREGATES

3. Snapshots

Với Snapshot, cứ sau một chuỗi các events xảy ra với một aggregate, chúng ta sẽ replay các events xảy ra, qua đó dựng lại được trạng thái của aggregate tại một thời điểm trong quá khứ, sau đó lưu trạng thái này vào một bảng Snapshot:

Bảng Snapshot
Tên cột Kiểu dữ liệu
AggregateID GUID
SerializedData Blob
Version Int

Snapshot được sử dụng để tránh trường hợp phải liên tục replay lại từ đầu tất cả các event xảy ra đối với một aggregate

Để tạo Snapshot, ta có thể sử dụng một background process; background process này sẽ tiến hành check định kỳ các aggregate: các aggregate trải qua một số lượng events vượt quá một số lượng nhất định (tùy vào yêu cầu của từng application) sẽ được tạo Snapshot. Quy trình có thể diễn ra như sau:

  • JOIN 2 bảng Aggregate và Snapshot theo AggregateID
  • Sử dụng điều kiện WHERE:  (Version ở bảng Aggregate - Version ở bảng Snapshot) > một số nhất định tùy theo yêu cầu của application
  • Sau khi lấy ra được các aggregate thỏa mãn điều kiện WHERE ở trên, ta sử dụng một background process để tạo Snapshot cho các aggregates này. Các Snapshot mới tạo sẽ được lưu vào bảng Snapshot

P/S: Trong bài viết gốc còn có một phần nữa là "Event Storage as a Queue". Phần này mình đọc chưa hiểu rõ nên chưa dịch được. Mình sẽ cố gắng đọc hiểu và dịch lại cho các bạn trong thời gian sớm nhất.

Cảm ơn các bạn đã đọc blog của Techmaster :D