Reactive programming ra đời dựa trên bối cảnh thực tế rằng ngay nay sự phát triển không ngừng của các dịch vụ khiến cho dữ liệu bị phân tán ở nhiều nơi và cần phải có một cách thức tận dụng đa luồng, bất đồng bộ để tối ưu thời gian tổng hợp dữ liệu. Trong bài này Dũng sẽ cùng các bạn tìm hiểu spring webflux, một thư viện hỗ trợ lập trình Reactive của spring.
Khởi tạo module
Chúng ta sẽ khởi tạo module có tên webflux.
Cấu hình dự án
Chúng ta sẽ cần cập nhật tập tin webflux/pom.xml như sau:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>vn.techmaster</groupId>
<artifactId>mastering-spring-boot</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>webflux</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor.version}</version>
</dependency>
</dependencies>
</project>
Ở đây chúng ta bổ sung thêm 2 phụ thuộc mới là:
- Spring-webflux: Chứa các thành phần cho reactive của spring.
- Reactor-netty: Vì spring không thực sự cung cấp cơ chế xử lý reactive nên chúng ta sẽ cần bổ sung thư viện này.
Khởi tạo controller
Sau khi cấu hình dự án chúng ta có thể tạo ra một lớp HelloController với nội dung như sau:
package vn.techmaster.webflux.controller;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
public String index() {
return "Welcome to TechMaster!";
}
}
Controller này chỉ đơn giản là xử lý yêu cầu người dùng truy cập vào trang chủ và trả lại dòng chữ “Welcome to TechMaster!” mà thôi.
Khởi tạo DispatcherHandler
DispatcherHandler hoạt động giống như một cầu nối giữa tầng ứng dụng và các tầng bên dưới của spring để sau đó kết nối với thư viện Reactive thực tế như reactor. Chúng ta sẽ khai báo lớp này như sau:
package vn.techmaster.webflux.handler;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.DispatcherHandler;
@Component("webHandler")
public class WebFluxWebHandler extends DispatcherHandler {}
Tuy nhiên chỉ lớp này là chưa đủ, chúng ta sẽ cần cài đặt thêm các lớp thành phần như sau:
package vn.techmaster.webflux.handler;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import vn.techmaster.webflux.controller.HelloController;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@Component
public class WebFluxHandlerMapping implements HandlerMapping {
private final Map<String, Function<ServerWebExchange, Object>> handlerByUri;
public WebFluxHandlerMapping(
HelloController helloController
) {
handlerByUri = new HashMap<>();
handlerByUri.put("/", (exchange) ->
helloController.index()
);
handlerByUri.put("/favicon.ico", (exchange) ->
Mono.just("")
);
}
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
String uri = exchange.getRequest().getURI().getPath();
return Mono.just(handlerByUri.get(uri));
}
}
Lớp WebFluxHandlerMapping này dùng để ánh xạ một uri đến một đối tượng xử lý yêu cầu cụ thể, ở đây chúng ta đang sử dụng lambda Function để cài đặt hàm xử lý.
package vn.techmaster.webflux.handler;
import org.springframework.core.MethodParameter;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.HandlerAdapter;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@Component
public class WebFluxHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return true;
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
try {
Function<ServerWebExchange, Object> func = (Function) handler;
return Mono.just(
new HandlerResult(
handler,
func.apply(exchange),
new MethodParameter(
Function.class.getDeclaredMethod(
"apply",
Object.class
),
-1
)
)
);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
}
Lớp WebFluxHandlerAdapter này dùng để chuyển đổi kết quả xử lý lấy được từ tầng ứng dụng, cụ thể là kết quả từ controller sẽ được chuyển sang dạng HandlerResult của spring.
package vn.techmaster.webflux.handler;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.web.reactive.HandlerResultHandler;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
@Component
public class WebFluxHandlerResultHandler implements HandlerResultHandler {
private final DefaultDataBufferFactory dataBufferFactory =
new DefaultDataBufferFactory();
@Override
public boolean supports(HandlerResult result) {
return true;
}
@Override
public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
try {
DataBuffer dataBuffer = dataBufferFactory.wrap(
result
.getReturnValue()
.toString()
.getBytes(StandardCharsets.UTF_8)
);
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response
.getHeaders()
.add("Content-Type", "text/html");
return response.writeWith(Mono.just(dataBuffer));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
Lớp WebFluxHandlerResultHandler này dùng để xử lý kết quả, cụ thể ở đây chúng ta sẽ ghi dữ liệu kết quả vào ServerHttpResponse để sau đó netty sẽ giúp chúng ta phản hồi lại cho client.
Chạy chương trình
Để có thể chạy chương trình, trước hết chúng ta cần cài đặt lớp WebFluxStartUp như sau:
package vn.techmaster.webflux;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import reactor.netty.http.server.HttpServer;
public class WebFluxStartUp {
public static void main(String[] args) throws Exception {
ApplicationContext applicationContext =
new AnnotationConfigApplicationContext(
"vn.techmaster.webflux"
);
HttpHandler httpHandler = WebHttpHandlerBuilder
.applicationContext(applicationContext)
.build();
HttpServer.create()
.port(8080)
.handle(new ReactorHttpHandlerAdapter(httpHandler))
.bindNow()
.onDispose()
.block();
}
}
Lớp này sẽ khởi tạo một http server thông qua thư viện reactor của Netty và lắng nghe cổng 8080. Ở bên trong WebHttpHandlerBuilder sẽ lấy ra singleton webHandler mà chúng ta đã khai báo thông qua lớp WebFluxWebHandler. Nếu không có singleton này thì WebHttpHandlerBuilder sẽ báo lỗi thiếu bean và chương trình của chúng ta sẽ không khởi động được.
Bây giờ hãy chạy chương trình qua lớp WebFluxStartUp và truy cập vào http://localhost:8080/ chúng ta sẽ nhận được kết quả là một trang web với dòng chữ Welcome to TechMaster!.
Mono là gì?
Để có thể hiểu ý nghĩa của lớp Mono trước hết chúng ta cần tìm hiểu về vấn đề đói thread thông qua mã nguồn dưới đây:
package vn.techmaster.webflux.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadStarvation {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
executorService.execute(() -> {
System.out.println("Run first");
countDownLatch.countDown();
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Done");
});
}
}
Trong mã nguồn này chúng ta chỉ tạo ra một thread duy nhất, tuy nhiên chúng ta lại làm hai việc lồng nhau:
- Việc đầu tiên chúng ta sẽ làm đó là đăng ký hành động in ra “Run first” với executorService, sau đó mới thực thi việc hiển thị “Done”.
- Việc thứ 2 chúng ta in ra dòng chữ “Run first” và thông báo cho việc đầu tiên biết là đã xong.
Tuy nhiên khi chúng ta chạy lớp ThreadStarvation thì sẽ chẳng có dòng nào được in ra cả, nguyên nhân là chúng ta đang thực thi hai việc chờ nhau trong khi chỉ có duy nhất 1 thread thực thi, đây được gọi là tình trạng đói thread.
Để giải quyết việc đói thread này chúng ta có thể cho số thread tăng lên 2 như sau:
package vn.techmaster.webflux.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadStarvation {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// tăng số lượng thread lên 2
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
executorService.execute(() -> {
System.out.println("Run first");
countDownLatch.countDown();
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Done");
});
}
}
Thì khi thực thi chương trình chúng ta sẽ nhận được kết quả là:
Run first
Done
Tuy nhiên cách giải quyết này không triệt để vì trong thực tế có rất nhiều yêu cầu gửi đến và chúng ta sẽ không thể biết chính xác cần phải tăng lên bao nhiêu thread cả, để nhiều thread sẽ gây ra các vấn đề về bộ nhớ và hiệu năng.
Một cách đơn giản khác là chúng ta chấp nhận hy sinh việc gọi tuần tự và mã nguồn sẽ như sau:
package vn.techmaster.webflux.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadStarvation {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
executorService.execute(() -> {
System.out.println("Run first");
});
System.out.println("Done");
});
}
}
Chương trình của chúng ta sẽ có thể in ra theo thứ tự ngẫu nhiên:
Done
Run first
Tại sao 2 việc lại hoàn thành được dù chỉ có một thread? Hãy nhìn hàm execute một chút.
void execute(Runnable command);
Hàm này có tham số Runnable, về bản chất nó là một lớp proxy, ở bên trong ExecutorService sẽ không thưc thi ngay khi gọi execute mà nó sẽ đưa vào hàng đợi và sau đó mới thực thi tuần tự, dẫn đến việc chương trình của chúng ta vẫn thực thi được dù chỉ có một thread.
Lớp Mono cũng tương tự như Runnable, nó cũng chỉ là một lớp proxy để bao bọc lại việc thi thực tế để sau đó được đưa vào hàng đợi và các thư viện reactive sẽ xử lý sau.
Tổng kết
Vậy là chúng ta đã cùng nhau:
- Khơi tạo và chạy một chương trình sử dụng spring-webflux và reactive với netty.
- Tìm hiểu về tình trạng đói thread và ý nghĩa của lớp Mono.
Sách tham khảo
Bạn có thể tìm hiểu thêm về proxy design pattern thông qua cuốn sách làm chủ các mẫu thiết kế kinh điển trong lập trình và đừng quên nhập mã giảm giá Tech10 để được giảm giá 10% nhé.
Cám ơn bạn đã quan tâm đến bài viết|video này. Để nhận được thêm các kiến thức bổ ích bạn có thể:
- Đọc các bài viết của TechMaster trên facebook: https://www.facebook.com/techmastervn
- Xem các video của TechMaster qua Youtube: https://www.youtube.com/@TechMasterVietnam nếu bạn thấy video/bài viết hay bạn có thể theo dõi kênh của TechMaster để nhận được thông báo về các video mới nhất nhé.
- Chat với techmaster qua Discord: https://discord.gg/yQjRTFXb7a
Bình luận