Spring WebFlux: Reactive Programming

In the field of software development, as the scale and complexity of Internet applications continue to increase, traditional programming models have gradually exposed some limitations, especially when faced with scenarios such as high concurrency and large-scale data flow processing. In order to meet these challenges, reactive programming (Reactive Programming) emerged as the times require. It provides a more efficient and flexible programming paradigm to adapt to changing system requirements.

1. Introduction to Spring WebFlux

WebFlux provides a non-blocking, asynchronous web framework that allows developers to build high-performance, scalable web applications, especially suitable for handling large numbers of concurrent connections, such as in microservice architectures and cloud environments.

WebFlux is an important component introduced by Spring Framework 5, which represents Spring's support for reactive programming (Reactive Programming).

1.1. Asynchronous and non-blocking

Asynchronous non-blocking is a programming pattern that allows a program to continue performing other tasks while waiting for an operation to complete. This mode is based on the event loop and can handle multiple I/O operations on a single thread, greatly improving the throughput and scalability of the system.

  • Server-side processing: WebFlux uses non-blocking servers such as Netty or Undertow, which can handle a large number of connections without blocking threads. This means that the server can handle multiple requests simultaneously in a single thread, improving resource utilization and throughput.
  • Database Access: By using R2DBC (Reactive Relational Database Connectivity) or other database clients that support reactive programming, asynchronous operations can be implemented in database queries to avoid blocking caused by threads waiting for database responses.
  • Asynchronous API Call: When processing external service calls, you can use WebClient to make asynchronous HTTP requests. WebClient is completely non-blocking and can handle other tasks while waiting for a response.

1.2. Reactive Streams

Reactive Streaming is a specification that defines the interface and behavior of asynchronous stream processing.

1.2.1.Reactive programming

Reactive programming is an asynchronous, event-driven programming paradigm that is particularly suitable for building applications that can handle real-time data streams. In this model, data and events are processed as streams, allowing developers to build complex asynchronous logic in a declarative manner.

Spring WebFlux follows this specification, using Publisher as the source of the reactive stream and Subscriber as the consumer of the stream. This model allows developers to handle asynchronous data flows in a declarative manner while maintaining flow control and backpressure management.

  • Data flow processing: Spring WebFlux integrates the Reactor library and uses Flux and Mono types to process data flows. Flux represents an asynchronous sequence of 0 to N elements, and Mono represents an asynchronous result of 0 or 1 elements. Both support backpressure strategies and can intelligently adjust the data production speed to match the consumer's processing capabilities.
  • Event-driven programming: Applications can easily handle various events, such as messages in the message queue, events on WebSocket connections, etc. Through the reactive streaming model, these events can be processed efficiently without blocking the main thread. .

1.2.2. Backpressure management

Backpressure is an important concept in reactive streaming, used to control the data flow rate between producers and consumers. Project Reactor provides a variety of backpressure strategies to help developers deal with data flow overload.

1.2.3.Project Reactor

Project Reactor is a rich set of APIs for creating and manipulating reactive streams.

Project Reactor is a reactive programming library based on Java 8, developed by the Pivotal team and designed specifically for use with the Spring framework.

Reactive types: Mono and Flux
  • Mono: Reactive type representing 0 to 1 element, suitable for asynchronous operations that represent a single result or no result.
  • Flux: Reactive type representing 0 to N elements, used to represent an asynchronous sequence of multiple results.
Operators and reactive data flow

Project Reactor provides a large number of operators for processing elements in reactive streams. These operators include:

  • Map: Apply a function to each element in the stream and publish the result.
  • Filter: Filter elements in the stream based on conditions.
  • FlatMap: Converts each element in a stream to another stream and merges the resulting streams into one stream.
  • SwitchIfEmpty: If the source stream is empty, switch to the alternative Mono or Flux.

1.2.4. Comparison with traditional Spring MVC

Spring MVC is a web framework based on Servlet API, which uses a blocking I/O model, and each request/response pair is bound to a thread. This performs well when the amount of concurrency is low, but in high concurrency scenarios, the consumption of thread resources will increase dramatically.

In contrast, Spring WebFlux is based on reactive streaming, which does not rely on the Servlet API and can run on non-Servlet servers such as Netty and Undertow. This model enables WebFlux to handle concurrent requests in a non-blocking manner, effectively utilizing resources and improving performance.

Insert image description here

1.3. Functional programming

Functional programming is a programming paradigm that emphasizes viewing tasks as a series of composable function calls. Defining the processing flow in a declarative manner makes the code more concise and readable, and is more suitable for processing complex asynchronous logic. WebFlux adopts the functional programming paradigm and uses Lambda expressions to simplify the programming model. Routing and request processing are defined in functional programming, which is completely different from the traditional annotation-based controller method.

  • Routing and processing: WebFlux provides a functional programming model, allowing developers to use Java 8 Lambda expressions to define routing rules and processing functions, making the code more concise and readable. For example, you can use the RouterFunctions.route() method to define routes and ServerResponse to build the response.
  • Chained operations and composition: Using the rich operators of reactive type Flux and Mono, such as map(), filter(), flatMap(), etc., you can easily build complex asynchronous data Handle flows without explicitly managing callbacks or threads.

1.3.1. Request routing

Use RouterFunction to define request routing

RouterFunction is a functional interface used to define request routing in Spring WebFlux. By implementing RouterFunction, you can precisely control the matching and processing of requests.

Routing predicates and handlers
  • Routing predicate: used to match specific attributes of HTTP requests, such as paths, methods, headers, etc.

  • Processor: Once the routing predicate is matched successfully, the processor will be responsible for processing the request and returning the response.

1.3.2. Functional endpoint

Spring WebFlux also introduces the concept of functional endpoints, allowing developers to handle requests and generate responses in the form of simple functions, which usually return ServerResponse.

1.3.3. Functional programming and reactive streams

Functional programming is an important part of the reactive programming model. It advocates the use of side-effect-free functions, immutable data structures, and declarative programming. These principles dovetail with the concept of reactive streaming, which emphasizes declarative processing of data flows and the application of various operators in data flows to transform, filter, and combine data.

2. Spring WebFlux application construction

2.1 Environment preparation

Project dependency configuration

For a Maven-based Springboot project, the dependency configuration in the pom.xml file may look like this:

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> 

2.2 Define routes and processors

2.2.1. Create RouterFunction Bean

In Spring WebFlux, use RouterFunction to define the route of the request.

First, create a configuration class and define routes in it:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class WebFluxConfig {

    @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {

        return RouterFunctions.route(GET("/hello"), handler::hello);
    }
} 
  • RouterFunctions.route() is the starting point for creating routing rules.
  • GET("/hello") is a static method from RequestPredicates, which defines a predicate for requests matching the HTTP GET method and the path is “/hello”.
  • handler::hello is a method reference pointing to a method named hello in the MyHandler class. This means that when the above HTTP request conditions are matched, the handler.hello() method will be called to handle the request, and it is expected to return a ServerResponse object as a response.
2.2.2. Use HandlerFunction to process requests

Create a handler class that will contain methods for handling requests. These methods can return Mono<ServerResponse>orFlux<ServerResponse>, depending on whether they handle a single response or a stream of responses:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class MyHandler {

    public Mono<ServerResponse> hello(ServerRequest request) {
        String name = request.queryParam("name").orElse("World");
        String message = "Hello, " + name + "!";
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just(message), String.class);
    }
} 

The hello method is used to handle HTTP requests and return a response.

  • Mono<ServerResponse>: The return type is a Mono object, a class in the Reactor library used to represent an asynchronous sequence of 0 to 1 elements. Here, it will eventually contain a ServerResponse object, which is the HTTP response. Mono is used to support non-blocking and reactive programming.

  • ServerRequest request: Input parameters indicating the received HTTP request information.

Extract query parameters:

String name = request.queryParam("name").orElse("World"); 

This line of code attempts to obtain the query parameter named “name” from the ServerRequest object. If this parameter is included in the request, such as http://example.com/hello?name=John, then the name variable will be assigned the value “John”; if not provided, the default value “World” will be used .

Constructing the HTTP response:

  • ServerResponse.ok(): Creates a basic response indicating success (HTTP status code 200 OK).
  • .contentType(MediaType.TEXT_PLAIN): Set the content type of the response to plain text (PLAIN TEXT).
  • .body(Mono.just(message), String.class): Specifies the content of the response body. Mono.just(message) is used here to wrap the greeting message string, indicating that the response body is an asynchronous sequence containing a single element and its type is String. This ensures that the entire process is non-blocking.

2.3 Global exception handling

Global exception handling is an important part of any web application

import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GlobalExceptionHandler {

    public Mono<Void> handleException(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        DataBuffer buffer = response.bufferFactory().wrap("{\"error\": \"Internal Server Error\"}".getBytes());
        return response.writeWith(Mono.just(buffer));
    }
} 

This code defines a global exception handler to catch unhandled exceptions in Spring WebFlux applications and return a unified error response to the client. Below is a detailed description of each part:

@Order(Ordered.HIGHEST_PRECEDENCE)

  • The @Order annotation is used to specify the execution order of components, especially when there are multiple components of the same type (such as multiple exception handlers) that need to be executed in a specific order. Ordered.HIGHEST_PRECEDENCE is a constant with a value of Integer.MIN_VALUE, which means that this exception handler will have the highest priority and will be executed before all other components of the same type. In other words, if there are other exception handlers that can handle the same exception type but do not specify such a high priority, then this GlobalExceptionHandler will handle the exception first.

handleException method

  • parameter:

  • ServerWebExchange exchange: encapsulates all information of HTTP requests and responses, and is the core object when WebFlux processes requests.

  • Throwable ex: The exception object thrown, that is, the exception that needs to be handled.
  • Function:
  1. Set response status code: First obtain the response object through exchange.getResponse(), and set its status code to HttpStatus.INTERNAL_SERVER_ERROR (HTTP 500), indicating that the server has encountered an unknown error.

  2. Construct the response body: Use the response.bufferFactory().wrap() method to create a DataBuffer object containing error information. The message body content here is {"error": "Internal Server Error"}, indicating that an internal server error has occurred.

  3. Write the response body and complete the response: Finally, write the built error message buffer into the response through response.writeWith(Mono.just(buffer)) and return a Mono<Void>Indicates that this is an asynchronous operation with no return value and completes the sending of the response.

3.Application details

3.1.RouterFunction

RouterFunction is a functional interface provided by Spring WebFlux for defining and processing HTTP routing. It is one of the basic components for building responsive Web applications. Compared with traditional annotation-based controllers (such as @GetMapping, @PostMapping, etc.), RouterFunction provides a more flexible and powerful way to define routing logic, which is especially suitable for functional programming style. Its usage is introduced in detail below:

3.1.1. Basic concepts

  • RouterFunction: Represents a routing processing logic that associates HTTP requests with corresponding processing logic. A RouterFunction can match a request and return the next RouterFunction or a processing result (HandlerFunction).
  • HandlerFunction: Represents a processing logic, which accepts a ServerRequest and returns a Mono<ServerResponse>. That is, it is responsible for processing requests and producing responses.

3.1.2. Create RouterFunction

Creating a RouterFunction usually involves defining routing rules and processing logic. Here is a simple example showing how to define a route to handle GET requests:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class GreetingRouter {
    @Bean
    public  RouterFunction<ServerResponse> routingFunction() {
        return route(GET("/hello2"), request -> ok().bodyValue("Hello, Spring WebFlux!"));
    }
} 

This code defines a simple RouterFunction<ServerResponse>Instance, used to handle the routing logic of an HTTP GET request to the /hello path.

return route(GET("/hello"), request -> ok().bodyValue("Hello, Spring WebFlux!")); 
  • route is a static method in the RouterFunctions class, used to create a basic route definition. It accepts two parameters:

  • Predicate: GET("/hello") is a predicate used to match the method and path of the HTTP request. This matches all requests with GET method and path /hello.

  • HandlerFunction: request -> ok().bodyValue("Hello, Spring WebFlux!") is a Lambda expression that represents the specific logic of processing the request. It accepts a ServerRequest object as input and returns a Mono<ServerResponse>indicates the response result.
  • request -> ...: Lambda expression defines how to generate a response based on the request.

  • ok(): This is a static factory method of ServerResponse, used to create a response indicating success (HTTP status code 200 OK).

  • .bodyValue("Hello, Spring WebFlux!"): This method sets the content of the response body to the given string and specifies the content type (by default, if not explicitly specified, it will be inferred based on the content) .

3.1.3. Register RouterFunction

To register a RouterFunction in a Spring Boot application, it is usually declared as a @Bean in the configuration class so that Spring can automatically discover and configure it:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class WebConfig {

    @Bean
    public RouterFunction<ServerResponse> routes(GreetingHandler handler) {
        return RouterFunctions.route(GET("/hello"), handler::sayHello);
    }
} 

Among them, GreetingHandler is a HandlerFunction containing business logic.

3.1.4. Combining RouterFunction

RouterFunction can be combined to form complex routing structures. This makes routing configuration more modular and maintainable.

  • Predicate combination: Predicates can be combined using logical operators such as and and or to implement more complex routing matching logic.
 @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {
        return RouterFunctions.route(GET("/haha")
                .and(accept(MediaType.TEXT_PLAIN)), handler::haha)

                .andRoute(GET("/reactor")
                .and(accept(MediaType.APPLICATION_JSON)), handler::reactorExample);
    } 
 // Handle simple text requests
    public Mono<ServerResponse> haha(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .bodyValue("haha!");
    }

    // Handle complex reactive requests
    public Mono<ServerResponse> reactorExample(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue("{'message': 'This is a reactive response!'}");
    } 

3.1.5. Processing request parameters

RouterFunction can conveniently handle request parameters, whether they are path parameters, query parameters or request bodies. For example, to handle requests with path parameters:

public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
    return route(GET("/users/{id}"), handler::getUserById);
} 

3.1.6. Error handling

Spring WebFlux supports global or local error handling, which can be implemented by providing a RouterFunction that handles specific exception types:

3.1.6.1. Comes with exception handling
 @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {
        return RouterFunctions.route()
                .GET("/greeting", handler::reactorExample)
                // Global error handling
                .onError(Exception.class, (exception, request) -> {
                    System.out.println("An exception occurred: " + exception.getMessage());
                    return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                            .contentType(MediaType.TEXT_PLAIN)
                            .bodyValue("Oops! Something went wrong.");
                })

                .build();
    } 
3.1.6.2. Global exception handling
 // Error handling routing
    private RouterFunction<ServerResponse> errorRoute() {
        return RouterFunctions.route(RequestPredicates.all(),
                request -> ServerResponse.status(HttpStatus.BAD_REQUEST)
                        .contentType(MediaType.TEXT_PLAIN)
                        .bodyValue("This is a fallback for unmatched requests。"));
    } 

RequestPredicates.all(), means that this predicate will match all HTTP requests, regardless of method (GET, POST, etc.) or path.

3.1.7. Advanced usage

  • Filters and Interceptors: Custom filters or interceptors can be inserted to handle the pre-processing or post-processing logic of the request.
  • Conditional Routing: Dynamically select routes based on environment, configuration, or other conditions.

3.2. Request parameters

Spring WebFlux fully supports receiving RESTful style parameters. RESTful interfaces usually pass parameters through URL paths, query parameters, request headers, and request bodies. In Spring WebFlux, you can use both functional and annotation methods to define endpoints to receive these parameters. Here are several common ways to receive parameters:

3.2.1. Path Variables

In the route definition, you can use {variableName} to mark path variables, and then receive them in the handler method through @PathVariable annotation.

@Bean
public RouterFunction<ServerResponse> userRoute(UserHandler handler) {
    return RouterFunctions.route(GET("/users/{id}"), handler::getUserById);
}

@Component
public class UserHandler {

    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        // processing logic...
    }
} 

3.2.2. Query Parameters

Query parameters can be obtained directly from ServerRequest or annotated using @RequestParam.

public Mono<ServerResponse> searchUsers(ServerRequest request) {
    String keyword = request.queryParam("keyword").orElse("");
    // processing logic...
} 

Or use @RequestParam:

public Mono<ServerResponse> searchUsers(@RequestParam(name = "keyword", defaultValue = "") String keyword) {
    // processing logic...
} 

3.2.3. Request Body

For methods such as POST and PUT, you may need to read data from the request body. You can use the @RequestBody annotation and specify the corresponding object type to automatically bind JSON or form data.

public Mono<ServerResponse> createUser(@RequestBody UserDTO user) {
    // processing logic...
} 

3.2.4. Request Headers

Request headers can be obtained through ServerRequest.headers(), or using the @RequestHeader annotation.

public Mono<ServerResponse> handleRequestWithHeader(@RequestHeader("Authorization") String authHeader) {
    // processing logic...
} 

3.3.Response content

3.3.1.ServerResponse object

In Spring WebFlux, the return content of the response is usually constructed and managed through the ServerResponse object, which represents the HTTP response to be sent to the client.

3.3.1.1. Status Code (Status Code)

Each HTTP response will have a status code used to indicate the processing result of the request. In Spring WebFlux, you can set the status code as follows:

  • Use static methods, such as ServerResponse.ok() for 200 OK, ServerResponse.created() for 201 Created, etc.
  • Or directly specify the status code, such as ServerResponse.status(HttpStatus.NOT_FOUND) means 404 Not Found.
3.3.1.2. Response body (Body)

The response body is the main content of the response, which can be data in various formats such as text, JSON, XML, etc. There are several ways to construct a response body:

  • Direct value: Use .bodyValue("response content"), which is suitable for converting simple strings or objects directly into response bodies.
  • Stream (Stream): .body(fromPublisher(publisher, MediaType)), this method is very suitable for processing asynchronous data streams when the response content comes from a publisher (such as Flux or Mono).
  • Object conversion: Combine .body(toEntity(object)) or .bodyValue(object) with libraries such as Jackson to automatically convert Java objects into response bodies in formats such as JSON.
3.3.1.3. Content-Type

Specify the content type of the response through the .contentType(MediaType) method, such as MediaType.APPLICATION_JSON_UTF8 representing JSON format, MediaType.TEXT_PLAIN representing plain text, etc.

3.3.1.4. Headers

Custom header information can be added to the response, such as .header("X-Custom-Header", "value").

3.3.1.5. Constructing the response

Once the status code, response body, content type, header and other information are defined, the construction of ServerResponse is completed through the .build() method. This step is necessary and combines all previously set configurations into a single response object to be sent.

3.3.1.6.Example

Let's say we want to build a response that returns data in JSON format:

import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

public Mono<ServerResponse> getUserInfo(ServerRequest request) {
    // hypothesisgetUserDetails()Is aMono<User>,Useris customizedJavaobject
    Mono<User> userDetails = userService.getUserDetails(request.pathVariable("id"));

    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(userDetails, User.class) // automatic conversionUserThe object isJSON
            .switchIfEmpty(ServerResponse.notFound().build()); // If the user cannot be found,return404
} 

In this example, we first specify the response status code as 200 OK and the content type as JSON, and then convert the user details (User object) queried from the database into a JSON response body. If the query result is empty (that is, the user does not exist), switch to returning a 404 Not Found response through the .switchIfEmpty() method.

3.3.2.Responsive type

In the field of reactive programming, especially when using Spring WebFlux and the Reactor framework, Mono and Flux are the two core reactive types. They are both implementations of the Reactive Streams specification provided by the Reactor library. Both types implement the Publisher interface, which means that they can serve as producers of asynchronous data streams and play a vital role in reactive systems.

3.3.2.1. Mono
  • Concept: Mono represents an asynchronous sequence of 0 or 1 elements. In other words, it either emits an element, emits a completion signal (indicating that there is no element), or emits an error signal. This makes it ideal for representing single or empty results, such as when a database query returns a single row of records, or when performing an operation that may fail.

  • Typical uses: A single result of a database query, a response to a network request, an operation to calculate a single value, etc.

  • Operators: Mono provides a rich set of operators, such as map, flatMap, zip, then, etc., for processing the transformation, combination and error handling of a single data stream.

1. map
  • Function: Apply a function to the elements in Mono for transformation.

    ```java
    Mono<String> monoStr = Mono.just("Hello");
    Mono<Integer> monoLength = monoStr.map(String::length); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>2. flatMap</h6>
    
    <ul>
    <li><strong>Function</strong>: Convert elements in a <code>Mono</code> to another <code>Mono</code> or <code>Flux</code>, then flatten this resulting stream into a single stream.
    
    <pre><code>```java
    Mono<User> monoUser = ...; // Assume that user information is obtained
    Mono<Address> monoAddress = monoUser.flatMap(user -> getAddressForUser(user.getId())); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>3. then</h6>
    
    <ul>
    <li><strong>Function</strong>: Ignore the elements in <code>Mono</code>, and only execute the next <code>Mono</code> when the current <code>Mono</code> is completed.
    
    <pre><code>```java
    Mono<Void> saveUser = userRepository.save(newUser);
    Mono<User> findUser = saveUser.then(userRepository.findById(newUser.getId())); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>4. zipWith</h6>
    
    <ul>
    <li><strong>Function</strong>: Combine the output of two <code>Mono</code> in some way (usually a tuple), and it will only be triggered when both <code>Mono</code> are successful.
    
    <pre><code>```java
    Mono<User> monoUser = ...;
    Mono<Order> monoOrder = ...;
    Mono<Tuple2<User, Order>> combined = monoUser.zipWith(monoOrder); 
    ```
    </code></pre></li>
    </ul>
    
    <h5>3.3.2.2. Flux</h5>
    
    <ul>
    <li><strong>Concept</strong>: <code>Flux</code> represents an asynchronous sequence of 0 to N elements. It can emit any number of elements, including an unlimited number, until it completes or encounters an error. This makes Flux ideal for working with collections, event streams, or any scenario where multiple data pushes are possible.</p></li>
    <li><p><strong>Typical uses</strong>: Processing list or collection data, real-time data streams (such as WebSocket messages), multi-row results of database queries, etc.</p></li>
    <li><p><strong>Operators</strong>: <code>Flux</code> also provides a wealth of operators, such as <code>map</code>, <code>filter</code>, <code>concatMap</code>, <code>buffer</code>, <code>window</code>, etc., which are used to process the transformation, filtering, and Merge, windowing and other operations.</p></li>
    </ul>
    
    <h6>1. map</h6>
    
    <ul>
    <li><p><strong>Function</strong>: Apply a function to each element in the stream for transformation.
    
    <pre><code>```java
    Flux<String> names = Flux.fromIterable(Arrays.asList("Alice", "Bob", "Charlie"));
    Flux<Integer> lengths = names.map(String::length); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>2. filter</h6>
    
    <ul>
    <li><strong>Function</strong>: Filter elements from the stream based on conditions.
    
    <pre><code>```java
    Flux<String> names = ...;
    Flux<String> longNames = names.filter(name -> name.length() > 5); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>3. flatMap</h6>
    
    <ul>
    <li><strong>Function</strong>: Convert each element in the stream to a new stream, then merge the streams into a single stream.
    
    <pre><code>```java
    Flux<User> users = ...;
    Flux<Order> orders = users.flatMap(user -> getOrderListForUser(user.getId())); 
    ```
    </code></pre></li>
    </ul>
    
    <h6>4. buffer</h6>
    
    <ul>
    <li><strong>Function</strong>: Collect elements in the stream into a buffer and emit them as a list or array after reaching certain conditions (such as quantity or time).
    
    <pre><code>```java
    Flux<Integer> numbers = Flux.interval(Duration.ofMillis(100)).take(10);
    Flux<List<Integer>> buffered = numbers.buffer(3); // Every3elements packed once 
    ```
    </code></pre></li>
    </ul>
    
    <h6>5. concatMap</h6>
    
    <ul>
    <li><strong>Function</strong>: Similar to <code>flatMap</code>, but guaranteed to process the stream produced by each element in sequence in the order of the source stream.
    
    <pre><code>```java
    Flux<User> users = ...;
    Flux<Order> ordersSequential = users.concatMap(user -> getOrderListForUser(user.getId())); 
    ```
    
3.3.3.3. Common characteristics
  • Responsive: Both Mono and Flux are non-blocking, support backpressure, and can automatically adjust the speed of data flow between data producers and consumers to prevent excessive production from causing problems on the consumer side. A situation that cannot be handled.

  • Chained Operations: Both support chained call operators, allowing complex asynchronous data processing flows to be built declaratively without the need to explicitly manage threads or callbacks.

  • Asynchronous and event-driven: Their design philosophy encourages writing asynchronous and event-driven code, which improves the scalability and resource utilization of the system.

3.3.3.4.Conversion relationship

Mono and Flux can be converted to each other through operators. For example, Mono can be converted to Flux through flatMapMany, and multiple Mono or Flux can be converted through concat, merge Wait for the operations to be merged.

4.WebClient

WebClient is a non-blocking, responsive HTTP client introduced in Spring Framework 5. It is part of the Spring WebFlux module. It is designed for building high-performance, asynchronous web service clients, especially suitable for handling large numbers of concurrent requests and interacting with reactive services. The following is a detailed explanation of WebClient:

Core Features

  1. Non-blocking: WebClient is based on non-blocking IO, which means that it does not occupy threads when waiting for data, so it can efficiently handle a large number of concurrent connections and reduce resource consumption.

  2. Responsive: Follow the reactive programming model in the Reactor project and use Mono and Flux as return types to facilitate processing of asynchronous data flows and backpressure mechanisms.

  3. Chain call: Provides a set of fluent APIs that allow HTTP requests to be constructed through chain calls, making it easy to understand and maintain.

  4. Flexible configuration: Supports a variety of configuration options, including basic URL, default header, timeout settings, SSL configuration, etc.

  5. Content Negotiation: Automatically handles content type and encoding, and supports serialization and deserialization of multiple data formats such as JSON and XML.

  6. Filter: Allows adding custom filters to modify requests or responses, and implement logging, authentication, retry logic, etc.

Basic usage

Create WebClient instance
// The easiest way to create
WebClient client = WebClient.create();

// Configurable creation
WebClient client = WebClient.builder()
        .baseUrl("http://example.com")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
        .build(); 
Send GET request
Mono<String> response = client.get()
        .uri("/api/data")
        .retrieve() // Get response body
        .bodyToMono(String.class); // Convert response body toStringtype 
Send POST request
Mono<String> response = client.post()
        .uri("/api/data")
        .body(BodyInserters.fromObject(someObject)) // send the object
        .retrieve()
        .bodyToMono(String.class); 
Processing response
  • Using the .block() method will block the current thread until a response arrives. This is usually only used in testing or non-reactive environments.
  • In a reactive environment, responses should be processed by subscribing to Mono or Flux, or combining them with other reactive streams.
Error handling

Operators such as .onErrorResume can be used to handle error conditions gracefully, such as retrying logic or returning a default value.

Mono<String> withErrorHandling = client.get()
        .uri("/api/data")
        .retrieve()
        .onErrorResume(WebClientResponseException.class, e -> {
            // handling errors,For example, return the default value
            return Mono.just("Default Value");
        })
        .bodyToMono(String.class); 

Summarize

WebClient is a powerful and flexible tool that plays an important role in modern web applications and inter-service communication, especially when it is necessary to build high-performance, scalable systems. By taking full advantage of reactive programming, developers can build more efficient and easier-to-maintain client logic.