Chào mọi người, chủ đề Apache Spark hôm nay mình sẽ nói về 1 vấn đề rất hay trong Spark mà mọi người nếu đã làm việc vơi Spark SQL thì sẽ va chạm hằng ngày, đó là Join.
Nói ngắn gọn 1 chút, SQL Join là việc kết hợp giữa 1 hay nhiều bảng (hoặc tập dữ liệu) để đưa ra 1 tập kết quả khác dựa vào 1 tiêu chí nào đó. Tiêu chí để join thông thường là 1 biểu thức logic để match dữ liệu giữa các bảng hoặc tập dữ liệu (join expression). Phép join được sử dụng cực kì nhiều, tuy nhiên nó cũng chính là tính toán rất nặng và phức tạp.
Spark SQL hỗ trợ hầu hết các phép join cho nhu cầu xử lý dữ liệu, bao gồm:
- Inner join (default):Trả về kết quả 2 cột nếu biểu thức join expression true
- Left outer join: Trả về kết quả bên trái kể cả biểu thức join expression false
- Right outer join: Ngược với Left
- Outer join: Trả về kết quả 2 bên
- Left anti join: Trả về kết quả bên trái khi join expression false
- Left semi join: Trả về kết quả bên trái khi join expression true
- Cross join (aka Cartesian ): Join kiểu tích đề-các
Join Strategies in Spark
Nhưng bài hôm nay mình không hướng dẫn các bạn cách để viết code join dữ liệu trong Spark với nhau, nó là về 1 chủ đề khác, Spark Join các tập dữ liệu với nhau như nào. Nếu câu trả lời trong đầu bạn bây giờ kiểu như: thì cứ thực thi biểu thức join experssion với các key trong các bảng rồi ghép các cột với nhau, thì…
Distributed and… distributed
Nói như trên đúng mà, nhưng dữ liệu của chúng ta, với Spark là các DataFrame, chúng không ở cùng 1 máy, mỗi phần dữ liệu rải rác 1 máy khác nhau và chúng vẫn phải tìm cách “match” với nhau, vì vậy sẽ có nhiều điều hơn để nói với các bạn ở phần dưới đây.
Broadcast Join
Đây là phép join cho tốc độ nhanh hơn cả trong Spark. Có 2 lý do, 1 là nó thường dùng trong phép join có ít nhất 1 bảng nhỏ, thứ 2 là nó yêu cầu shuffle dữ liệu rất ít và hiệu quả. Với Broadcast join, bảng dữ liệu bé (màu cam) được clone thành nhiều phần gửi đến các node chứa dữ liệu bảng to (màu xanh), tại đây chúng sẽ thực hiện join và trả ra kết quả. Lúc này dữ liệu được đóng gói và đẩy đi 1 lần duy nhất.
Nếu đọc phần trên hơi lú, hãy dành 1 phút đọc phần Q&A này.
- Shuffle dữ liệu?
Cho bạn nào chưa biết thì trong Spark, các executor khi muốn trao đổi dữ liệu, chúng phải gửi cho nhau qua mạng, quá trình đó gọi là Shuffle.
- Nhưng khi nào thì chúng cần gửi dữ liệu cho nhau?
À khi chúng cần thì chúng gửi thôi
Trong VD về join, các bản ghi mà cùng 1 key sẽ được đẩy về 1 executor để xử lý, nếu các bản ghi cùng key ở nhiều máy thì chúng sẽ phải trải qua quá trình gọi là Shuffle để “về với nhau”. Tại đây, 2 tập bản ghi của 2 DataFrame sẽ được nối với nhau theo logic phép join của bạn (như là inner hay left…), còn với Broadcast, toàn bộ dữ liệu của 1 bảng được gửi tới các phần dữ liệu của bảng kia nên nó không cần hỏi ai nữa.
- Vậy thế nào là bảng nhỏ.
Tùy mỗi hệ thống mà cái sự nhỏ là khác nhau, nếu 1 executor có 4Gb ram thì 200MB là nhỏ nhưng nếu executor chỉ có 512MB thôi thì nhỏ lại là 1 con số khác. Spark cho phép cầu hình tham số: spark.sql.autoBroadcastJoinThreshold để control việc này. Giá trị mặc định là 10MB, tức là nếu 1 trong các bảng bạn join có dung lượng <=10 MB thì bảng này sẽ được clone gửi sang các executor chứa bảng to để join.
- Nếu 2 bảng cùng lớn thì sao?
Thì nó sẽ k còn hiệu quả nữa do việc clone 1 bảng lớn thành nhiều bản sẽ tốt kém hơn nhiều.
VD bạn có 1 bảng chứa thông tin chi tiết 1 triệu khách hàng và 1 bảng chứa danh mục địa chỉ các chi nhánh của bạn. Join 2 bảng này sử dụng Broadcast join sẽ rất hiệu quả.
Shuffle Hash Join
Trong trường hợp 2 bảng tương đối lớn, Spark trước hết sẽ thực hiện việc shuffle các bản ghi cùng 1 key (key chính là giá trị trong điều kiện join) về 1 máy và xử lý local ở đó rồi đưa ra kết quả.
VD ban đầu partition0, 1 chứa cả key A và key B. Khi join, Spark sẽ tạo thành 1 DF trung gian mà ở đó, partition0 mới chỉ chứa key A, partition1 chỉ chứa key B, quá trình di chuyển toàn bộ key A từ partition0 cũ sang partition0 mới chính là shuffle. Tất nhiên việc move key này sẽ là key của cả 2 DataFrame cần join.
Sau khi các key được đưa về đúng chỗ, nó lại xử lý join như bình thường, nhìn lại Broadcast Join, bản chất nó cũng yêu cầu move dữ liệu nhưng khác là move toàn bộ phần dữ liệu bé.
Với VD trên có 2 partitions thì cũng khá đơn giản đúng không, tuy nhiên sự việc sẽ bớt đơn giản khi DF có khoảng 100–200 partition và key lúc này lại phân bố đều trên các partition, lúc này hàng trăm executor trao đổi dữ liệu cho nhau có cost rất lớn do nó liên quan đến IO.
Nhưng để dữ liệu chuyển qua lại giữa các executor, nó phải trải qua kha khá bước.
- Để xác định key = nhau, 1 phép hash được thực thi, trong máy tính, để so sánh A với B, người ta sẽ so sanh mã hash của A và B thay vì xem xét giá trị của nó.
- Để bản ghi được chuyển đi, dữ liệu sẽ cần serialize ở đầu gửi và deserialize ở đầu nhận.
- Hàng chục, hàng trăm executor phải mở các connection đến nhau để truyền dữ liệu.
- Băng thông mạng ảnh hưởng trực tiếp đến tốc độ shuffle.
Sort Merge Join
Sort Merge Join để giảm bớt gánh nặng shuffle cũng như tối ưu việc load dữ liệu, có thể hiểu nó là cách làm tối ưu hơn cho Shuffle Hash Join. Ở chế độ này, DF ban đầu sẽ được sort theo key cần join trước khi shuffle. Đây là bước trung gian, giúp làm giảm thời gian shuffle dữ liệu.
Do dữ liệu được sort sẵn they key cần join, VD hết key A đến key B đến key C, sẽ không còn có chuyện partition1 và partition100 đều chứa key A nữa phải không. Phần dữ liệu cần shuffle lúc này sẽ là nhiều phần nhỏ dữ liệu nhưng của chung 1 key (tương tự như broadcast join).
Link bài viết gốc tại đây
Bài viết đăng tải lại dưới sự cho phép của tác giả : thầy Nguyễn Chí Thanh là giảng viên khoá Big Data tại Techmaster
Bình luận