Giới thiệu về CompletableFuture trong Spring Boot

Trong thế giới phát triển ứng dụng web hiện đại, hiệu năng và khả năng mở rộng là hai yếu tố quan trọng. Một trong những cách để cải thiện hiệu năng là sử dụng lập trình bất đồng bộ (asynchronous programming) để tối ưu hóa việc sử dụng tài nguyên và tránh các tác vụ chặn (blocking tasks). CompleteFuture trong Spring Boot là một phần của thư viện Java CompletableFuture, được sử dụng để xử lý các tác vụ bất đồng bộ (asynchronous) một cách hiệu quả và đơn giản hơn. Nó cho phép thực thi các tác vụ song song mà không cần tạo ra các Thread một cách thủ công.

1. Cơ bản về CompletableFuture:

CompletableFuture cung cấp các phương thức để chạy tác vụ không đồng bộ và kết hợp kết quả của các tác vụ đó khi chúng hoàn thành. Nó rất hữu ích khi muốn thực hiện các tác vụ phức tạp mà không làm chậm ứng dụng.

Một số tính năng chính:

  • runAsync(): Chạy một tác vụ không trả về kết quả trong một thread khác.
  • supplyAsync(): Chạy một tác vụ trả về kết quả trong một thread khác.
  • thenApply(): Xử lý kết quả khi một tác vụ kết thúc.
  • thenCombine(): Kết hợp kết quả của nhiều tác vụ không đồng bộ.

1.1. Tạo một CompletableFuture

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // Thực hiện một tác vụ tốn thời gian
    return "Kết quả";
});
future.thenApply(result -> {
    // Xử lý kết quả
    return "Xử lý " + result;
});

1.2. Kết hợp nhiều CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Kết quả 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Kết quả 2");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
    return result1 + " & " + result2;
});

2. Sử dụng CompletableFuture trong Spring Boot

2.1. Kích hoạt hỗ trợ bất đồng bộ

Đầu tiên, cần kích hoạt hỗ trợ bất đồng bộ bằng cách thêm annotation @EnableAsync vào lớp cấu hình:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.2. Sử dụng @Async

Sử dụng annotation @Async để đánh dấu một phương thức sẽ được thực thi bất đồng bộ:

@Service
public class MyService {
    @Async
    public CompletableFuture<String> asyncMethod() {
        // Thực hiện tác vụ tốn thời gian
        return CompletableFuture.completedFuture("Kết quả");
    }
}

3. Ví dụ thực tế: Gọi song song nhiều API và tổng hợp kết quả

3.1. Bài toán

Giả sử chúng ta xây dựng một ứng dụng tổng hợp tin tức từ nhiều nguồn khác nhau. Chúng ta cần gọi đến các API của các trang tin tức, lấy dữ liệu và tổng hợp lại để hiển thị cho người dùng. Việc gọi tuần tự từng API sẽ tốn nhiều thời gian, do đó chúng ta sẽ sử dụng CompletableFuture để gọi các API này song song.

3.2. Triển khai

@Service
public class NewsService {
    @Async
    public CompletableFuture<List<Article>> getArticlesFromSourceA() {
        // Gọi API của nguồn A
        List<Article> articles = callApi("https://api.sourcea.com/news");
        return CompletableFuture.completedFuture(articles);
    }

    @Async
    public CompletableFuture<List<Article>> getArticlesFromSourceB() {
        // Gọi API của nguồn B
        List<Article> articles = callApi("https://api.sourceb.com/news");
        return CompletableFuture.completedFuture(articles);
    }

    private List<Article> callApi(String url) {
        // Thực hiện gọi API và parse kết quả
        // Giả sử sử dụng RestTemplate hoặc WebClient
    }
}

@RestController
public class NewsController {
    @Autowired
    private NewsService newsService;

    @GetMapping("/news")
    public CompletableFuture<ResponseEntity<?>> getNews() {
        CompletableFuture<List<Article>> sourceAFuture = newsService.getArticlesFromSourceA();
        CompletableFuture<List<Article>> sourceBFuture = newsService.getArticlesFromSourceB();

        return sourceAFuture.thenCombine(sourceBFuture, (articlesA, articlesB) -> {
            // Tổng hợp và sắp xếp các bài viết
            List<Article> combinedArticles = new ArrayList<>();
            combinedArticles.addAll(articlesA);
            combinedArticles.addAll(articlesB);
            combinedArticles.sort(Comparator.comparing(Article::getPublishedDate).reversed());
            return ResponseEntity.ok(combinedArticles);
        });
    }
}

3.3. Giải thích

  • @Async: Đánh dấu các phương thức sẽ được thực thi bất đồng bộ.
  • CompletableFuture: Cho phép chúng ta nhận kết quả khi tác vụ hoàn thành.
  • thenCombine: Kết hợp kết quả từ hai CompletableFuture khi cả hai đều hoàn thành.
  • Tổng hợp kết quả: Sau khi nhận được dữ liệu từ cả hai nguồn, chúng ta tổng hợp và sắp xếp để trả về cho người dùng.

4. Xử lý ngoại lệ

Có nhiều cách để xử lý ngoại lệ trong CompletableFuture, bao gồm:

  • Sử dụng exceptionally(): Xử lý ngoại lệ và cung cấp giá trị thay thế.
  • Sử dụng handle(): Xử lý cả kết quả và ngoại lệ.
  • Kết hợp các phương thức trên với thenCombine(): Để xử lý ngoại lệ trong việc kết hợp nhiều CompletableFuture.

Sử dụng exceptionally() để xử lý ngoại lệ từng tác vụ

@RestController
public class NewsController {
    @Autowired
    private NewsService newsService;

    @GetMapping("/news")
    public CompletableFuture<ResponseEntity<?>> getNews() {
        CompletableFuture<List<Article>> sourceAFuture = newsService.getArticlesFromSourceA()
                .exceptionally(ex -> {
                    // Xử lý ngoại lệ từ nguồn A
                    System.err.println("Error fetching articles from Source A: " + ex.getMessage());
                    return Collections.emptyList(); // Trả về danh sách rỗng hoặc giá trị mặc định
                });

        CompletableFuture<List<Article>> sourceBFuture = newsService.getArticlesFromSourceB()
                .exceptionally(ex -> {
                    // Xử lý ngoại lệ từ nguồn B
                    System.err.println("Error fetching articles from Source B: " + ex.getMessage());
                    return Collections.emptyList();
                });

        return sourceAFuture.thenCombine(sourceBFuture, (articlesA, articlesB) -> {
            // Tổng hợp và sắp xếp các bài viết
            List<Article> combinedArticles = new ArrayList<>();
            combinedArticles.addAll(articlesA);
            combinedArticles.addAll(articlesB);
            combinedArticles.sort(Comparator.comparing(Article::getPublishedDate).reversed());
            return ResponseEntity.ok(combinedArticles);
        });
    }
}

exceptionally(): Phương thức này cho phép chúng ta xử lý ngoại lệ và cung cấp một giá trị thay thế. Ở đây, nếu có lỗi xảy ra khi gọi getArticlesFromSourceA() hoặc getArticlesFromSourceB(), chúng ta trả về một danh sách rỗng.

Sử dụng handle() để xử lý kết quả và ngoại lệ
Nếu muốn xử lý cả kết quả và ngoại lệ trong cùng một phương thức, có thể sử dụng handle()

CompletableFuture<List<Article>> sourceAFuture = newsService.getArticlesFromSourceA()
        .handle((result, ex) -> {
            if (ex != null) {
                // Xử lý ngoại lệ
                System.err.println("Error fetching articles from Source A: " + ex.getMessage());
                return Collections.emptyList();
            } else {
                return result;
            }
        });

Xử lý ngoại lệ trong thenCombine()

@RestController
public class NewsController {
    @Autowired
    private NewsService newsService;

    @GetMapping("/news")
    public CompletableFuture<ResponseEntity<?>> getNews() {
        CompletableFuture<List<Article>> sourceAFuture = newsService.getArticlesFromSourceA()
                .exceptionally(ex -> {
                    // Xử lý ngoại lệ từ nguồn A
                    System.err.println("Error fetching articles from Source A: " + ex.getMessage());
                    return Collections.emptyList(); // Trả về danh sách rỗng hoặc giá trị mặc định
                });

        CompletableFuture<List<Article>> sourceBFuture = newsService.getArticlesFromSourceB()
                .handle((articles, ex) -> {
                    if (ex != null) {
                        // Xử lý ngoại lệ từ nguồn B
                        System.err.println("Error fetching articles from Source B: " + ex.getMessage());
                        return Collections.emptyList();
                    }
                    return articles;
                });

        return sourceAFuture.thenCombine(sourceBFuture, (articlesA, articlesB) -> {
            if ((articlesA == null || articlesA.isEmpty()) && (articlesB == null || articlesB.isEmpty())) {
                // Nếu cả hai nguồn đều thất bại
                return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
                        .body("Không thể lấy dữ liệu từ các nguồn tin tức");
            }
            // Tổng hợp và sắp xếp các bài viết
            List<Article> combinedArticles = new ArrayList<>();
            combinedArticles.addAll(articlesA);
            combinedArticles.addAll(articlesB);
            combinedArticles.sort(Comparator.comparing(Article::getPublishedDate).reversed());
            return ResponseEntity.ok(combinedArticles);
        }).exceptionally(ex -> {
            // Xử lý ngoại lệ trong quá trình kết hợp
            System.err.println("Error combining articles: " + ex.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Lỗi khi kết hợp dữ liệu");
        });
    }
}

5. Ưu điểm và lưu ý

5.1. Ưu điểm

  • Hiệu năng cao hơn: Giảm thời gian chờ đợi bằng cách thực hiện các tác vụ song song.
  • Tối ưu tài nguyên: Sử dụng ít luồng hơn so với việc tạo mới luồng cho mỗi tác vụ.
  • Mã nguồn rõ ràng: Sử dụng API của CompletableFuture giúp mã nguồn dễ đọc và bảo trì.

5.2. Lưu ý

  • Quản lý ngoại lệ: Cần xử lý ngoại lệ trong các tác vụ bất đồng bộ để tránh việc bỏ sót lỗi.
  • Thread Pool: Mặc định, @Async sử dụng SimpleAsyncTaskExecutor, không giới hạn số lượng luồng. Nên cấu hình ThreadPoolTaskExecutor để quản lý tài nguyên tốt hơn.
  • Đồng bộ dữ liệu: Khi thao tác với dữ liệu chia sẻ, cần chú ý đến vấn đề đồng bộ hóa để tránh xung đột.