目录

Spring Boot - 响应式远程调用

# 前言

在 Spring 5 之前,如果我们想要调用其他系统提供的 HTTP 服务,通常可以使用 Spring 提供的 RestTemplate 来访问,不过由于 RestTemplate 是 Spring 3 中引入的同步阻塞式 HTTP 客户端,因此存在一定性能瓶颈。根据 Spring 官方文档介绍,在将来的版本中它可能会被弃用。

作为替代,Spring 官方已在 Spring 5 中引入了 WebClient 作为非阻塞式 Reactive HTTP 客户端。下面通过样例演示如何使用 WebClient。

需要 Spring Boot 3 以上才能使用 WebClient。

# 基本介绍

# 什么是 WebClient

从 Spring 5 开始,Spring 中全面引入了 Reactive 响应式编程。而 WebClient 则是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。

由于 WebClient 的请求模式属于异步非阻塞,能够以少量固定的线程处理高并发的 HTTP 请求。因此,从 Spring 5 开始,HTTP 服务之间的通信我们就可以考虑使用 WebClient 来取代之前的 RestTemplate。

# WebClient 的优势

与 RestTemplate 相比,WebClient 有如下优势:

  • 非阻塞,Reactive 的,并支持更高的并发性和更少的硬件资源
  • 提供利用 Java 8 lambdas 的函数 API
  • 支持同步和异步方案
  • 支持从服务器向上或向下流式传输

RestTemplate 不适合在非阻塞应用程序中使用,因此 Spring WebFlux 应用程序应始终使用 WebClient。在大多数高并发场景中,WebClient 也应该是 Spring MVC 中的首选,并且用于编写一系列远程,相互依赖的调用。

# 安装配置

编辑 pom.xml 文件,添加 Spring WebFlux 依赖,从而可以使用 WebClient。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1
2
3
4

# 创建 WebClient 实例

从 WebClient 的源码中可以看出,WebClient 接口提供了三个不同的静态方法来创建 WebClient 实例:

# 利用 create() 创建

下面利用 create() 方法创建一个 WebClient 对象,并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。

注意:由于利用 create() 创建的 WebClient 对象没有设定 baseURL,所以这里的 uri() 方法相当于重写 baseURL。

public void test() {
  WebClient webClient = WebClient.create();

  Mono<String> mono = webClient
    .get() // GET 请求
    .uri("http://jsonplaceholder.typicode.com/posts/1")  // 请求路径
    .retrieve() // 获取响应体
    .bodyToMono(String.class); // 响应数据类型转换

  System.out.println(mono.block());

}
1
2
3
4
5
6
7
8
9
10
11
12

# 利用 create(String baseUrl) 创建

下面利用 create(String baseUrl) 方法创建一个 WebClient 对象,并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。

注意:由于利用 create(String baseUrl) 创建的 WebClient 对象时已经设定了 baseURL,所以 uri() 方法会将返回的结果和 baseUrl 进行拼接组成最终需要远程请求的资源 URL。

public void test() {
  WebClient webClient = WebClient.create("http://jsonplaceholder.typicode.com");

  Mono<String> mono = webClient
    .get() // GET 请求
    .uri("/posts/1")  // 请求路径
    .retrieve() // 获取响应体
    .bodyToMono(String.class); // 响应数据类型转换

  System.out.println(mono.block());
}
1
2
3
4
5
6
7
8
9
10
11

# 利用 builder 创建(推荐)

下面使用 builder() 返回一个 WebClient.Builder,然后再调用 build 就可以返回 WebClient 对象。并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。

注意:由于返回的不是 WebClient 类型而是 WebClient.Builder,我们可以通过返回的 WebClient.Builder 设置一些配置参数(例如:baseUrl、header、cookie 等),然后再调用 build 就可以返回 WebClient 对象了。

使用 WebClient.builder() 指定额外的配置:

  • uriBuilderFactory: 用作定制 baseURL
  • defaultUriVariables: 扩展URI模板时使用的默认值
  • defaultHeader: 设置每个请求的默认 Header
  • defaultCookie: 设置每个请求的默认 Cookie
  • defaultRequest: 设置每个消费者自定义请求
  • filter: 请求过滤器
  • exchangeStrategies: HTTP 消息读取器/写入器自定义
  • clientConnector: HTTP客户端库设置
public void test() {
  WebClient webClient = WebClient.builder()
    .baseUrl("http://jsonplaceholder.typicode.com")
    .defaultHeader(HttpHeaders.USER_AGENT,"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)")
    .defaultCookie("ACCESS_TOKEN", "test_token")
    .build();

  Mono<String> mono = webClient
    .get() // GET 请求
    .uri("/posts/1")  // 请求路径
    .retrieve() // 获取响应体
    .bodyToMono(String.class); //响应数据类型转换

  System.out.println(mono.block());

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

不变性

一旦构建完成,WebClient 就是不可变的。但是,可以克隆它并构建一个修改后的副本,如下所示:

WebClient client1 = WebClient.builder()
        .filter(filterA).filter(filterB).build();

WebClient client2 = client1.mutate()
        .filter(filterC).filter(filterD).build();
1
2
3
4
5

# GET 请求

# 使用 retrieve()

# 获取 String 结果数据

下面代码将响应结果映射为一个 String 字符串,并打印出来。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/posts/1")  // 请求路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class); // 响应数据类型转换
        System.out.println(mono.block());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 将结果转换为对象

当响应的结果是 JSON 时,也可以直接指定为一个 Object,WebClient 将接收到响应后把 JSON 字符串转换为对应的对象。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Mono<PostBean> mono = webClient
                .get() // GET 请求
                .uri("/posts/1")  // 请求路径
                .retrieve() // 获取响应体
                .bodyToMono(PostBean.class); // 响应数据类型转换
        System.out.println(mono.block());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

其中定义的实体 Bean 代码如下:

@Getter
@Setter
@ToString
public class PostBean {
    private int userId;
    private int id;
    private String title;
    private String body;
}
1
2
3
4
5
6
7
8
9

# 将结果转成集合

假设接口返回的是一个 json 数组。

我们可以将其转成对应的 Bean 集合:

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Flux<PostBean> flux = webClient
                .get() // GET 请求
                .uri("/posts")  // 请求路径
                .retrieve() // 获取响应体
                .bodyToFlux(PostBean.class); // 响应数据类型转换
        List<PostBean> posts = flux.collectList().block();
        System.out.println("结果数:" + posts.size());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 参数传递的几种方式

下面 3 种方式的结果都是一样的。

使用占位符的形式传递参数:

Mono<String> mono = webClient
        .get() // GET 请求
        .uri("/{1}/{2}", "posts", "1")  // 请求路径
        .retrieve() // 获取响应体
        .bodyToMono(String.class); // 响应数据类型转换

1
2
3
4
5
6

另一种使用占位符的形式:

String type = "posts";
int id = 1;
 
Mono<String> mono = webClient
        .get() // GET 请求
        .uri("/{type}/{id}", type, id)  // 请求路径
        .retrieve() // 获取响应体
        .bodyToMono(String.class); // 响应数据类型转换
        System.out.println(mono.block());
1
2
3
4
5
6
7
8
9

我们也可以使用 map 装载参数:

Map<String,Object> map = new HashMap<>();
map.put("type", "posts");
map.put("id", 1);
 
Mono<String> mono = webClient
        .get() // GET 请求
        .uri("/{type}/{id}", map)  // 请求路径
        .retrieve() // 获取响应体
        .bodyToMono(String.class); //响应数据类型转换
1
2
3
4
5
6
7
8
9

# subscribe 订阅(非阻塞式调用)

前面的样例我们都是人为地使用 block 方法来阻塞当前程序。其实 WebClient 是异步的,也就是说等待响应的同时不会阻塞正在执行的线程。只有在响应结果准备就绪时,才会发起通知。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        System.out.println("--- begin ---");
 
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/posts/1")  // 请求路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class); // 响应数据类型转换
 
        // 订阅(异步处理结果)
        mono.subscribe(result -> {
            System.out.println(result);
        });
 
        System.out.println("--- end ---");
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 使用 exchange()

方法介绍

  • 前面我们都是使用 retrieve() 方法直接获取到了响应的内容,如果我们想获取到响应的头信息、Cookie 等,可以在通过 WebClient 请求时把调用 retrieve() 改为调用 exchange()
  • 通过 exchange() 方法可以访问到代表响应结果的对象,通过该对象我们可以获取响应码、contentType、contentLength、响应消息体等。

使用样例

下面代码请求一个网络接口,并将响应体、响应头、响应码打印出来。其中响应体的类型设置为 String。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Mono<ClientResponse> mono = webClient
                .get() // GET 请求
                .uri("/posts/1")  // 请求路径
                .exchange();
 
        // 获取完整的响应对象
        ClientResponse response = mono.block();
 
        HttpStatus statusCode = response.statusCode(); // 获取响应码
        int statusCodeValue = response.rawStatusCode(); // 获取响应码值
        Headers headers = response.headers(); // 获取响应头
 
        // 获取响应体
        Mono<String> resultMono = response.bodyToMono(String.class);
        String body = resultMono.block();
 
        // 输出结果
        System.out.println("statusCode:" + statusCode);
        System.out.println("statusCodeValue:" + statusCodeValue);
        System.out.println("headers:" + headers.asHttpHeaders());
        System.out.println("body:" + body);
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# POST 请求

# 使用 retrieve()

# 发送一个 JSON 格式数据(使用 json 字符串)

下面代码使用 post 方式发送一个 json 格式的字符串,并将结果打印出来(以字符串的形式)。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        // 需要提交的 json 字符串
        String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
 
        // 发送请求
        Mono<String> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(jsonStr))
                .retrieve() // 获取响应体
                .bodyToMono(String.class); //响应数据类型转换
 
        // 输出结果
        System.out.println(mono.block());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

发送一个 JSON 格式数据(使用 Java Bean)

下面代码使用 post 方式发送一个 Bean 对象,并将结果打印出来(以字符串的形式)。结果同上面是一样的:

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        // 要发送的数据对象
        PostBean postBean = new PostBean();
        postBean.setUserId(222);
        postBean.setTitle("abc");
        postBean.setBody("航歌");
 
        // 发送请求
        Mono<String> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .syncBody(postBean)
                .retrieve() // 获取响应体
                .bodyToMono(String.class); // 响应数据类型转换
 
        // 输出结果
        System.out.println(mono.block());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

上面发送的 Bean 对象实际上会转成如下格式的 JSON 数据提交

# 使用 Form 表单的形式提交数据

下面样例使用 POST 方式发送 multipart/form-data 格式的数据:

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        //提交参数设置
        MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
        map.add("title", "abc");
        map.add("body", "航歌");
 
        // 发送请求
        Mono<String> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .body(BodyInserters.fromFormData(map))
                .retrieve() // 获取响应体
                .bodyToMono(String.class); // 响应数据类型转换
 
        // 输出结果
        System.out.println(mono.block());
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

上面代码最终会通过如下这种 form 表单方式提交数据。

将结果转成自定义对象

上面样例我们都是将响应结果以 String 形式接收,其实 WebClient 还可以自动将响应结果转成自定的对象或则数组。

# 设置 url 参数

如果 url 地址上面需要传递一些参数,可以使用占位符的方式:

String url = "http://jsonplaceholder.typicode.com/{1}/{2}";
String url = "http://jsonplaceholder.typicode.com/{type}/{id}";
1
2

# subscribe 订阅(非阻塞式调用)

前面的样例我们都是人为地使用 block 方法来阻塞当前程序。其实 WebClient 是异步的,也就是说等待响应的同时不会阻塞正在执行的线程。只有在响应结果准备就绪时,才会发起通知。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        System.out.println("--- begin ---");
 
        // 需要提交的 json 字符串
        String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
 
        Mono<String> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(jsonStr))
                .retrieve() // 获取响应体
                .bodyToMono(String.class); // 响应数据类型转换
 
        // 订阅(异步处理结果)
        mono.subscribe(result -> {
            System.out.println(result);
        });
 
        System.out.println("--- end ---");
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 使用 exchange()

方法介绍

  • 前面我们都是使用 retrieve() 方法直接获取到了响应的内容,如果我们想获取到响应的头信息、Cookie 等,可以在通过 WebClient 请求时把调用 retrieve() 改为调用 exchange()。
  • 通过 exchange() 方法可以访问到代表响应结果的对象,通过该对象我们可以获取响应码、contentType、contentLength、响应消息体等。

使用样例

下面代码请求一个网络接口,并将响应体、响应头、响应码打印出来。其中响应体的类型设置为 String。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://jsonplaceholder.typicode.com")
            .build();
 
    @GetMapping("/test")
    public void test() {
        // 需要提交的 json 字符串
        String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
 
        // 发送请求
        Mono<ClientResponse> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(jsonStr))
                .exchange();
 
        // 获取完整的响应对象
        ClientResponse response = mono.block();
 
        HttpStatus statusCode = response.statusCode(); // 获取响应码
        int statusCodeValue = response.rawStatusCode(); // 获取响应码值
        Headers headers = response.headers(); // 获取响应头
 
        // 获取响应体
        Mono<String> resultMono = response.bodyToMono(String.class);
        String body = resultMono.block();
 
        // 输出结果
        System.out.println("statusCode:" + statusCode);
        System.out.println("statusCodeValue:" + statusCodeValue);
        System.out.println("headers:" + headers.asHttpHeaders());
        System.out.println("body:" + body);
        return;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# Attributes

您可以向请求添加属性。如果您想传递信息,这很方便通过过滤器链并影响给定请求的过滤器行为。 例如:

public void test() {
  WebClient client = WebClient.builder()
    .filter((request, next) -> {
      Optional<Object> myAttribute = request.attribute("myAttribute");
      System.out.println(myAttribute.get());
    })
    .build();

  client.get().uri("https://example.org/")
    .attribute("myAttribute", "...")
    .retrieve()
    .bodyToMono(Void.class);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 过滤器 Filters

通过 WebClient 注册客户端过滤器(ExchangeFilterFunction)。生成器来拦截和修改请求,下例所示的是请求拦截器:

public void test() {
  WebClient client = WebClient.builder()
    .filter((request, next) -> {
      ClientRequest filtered = ClientRequest.from(request)
        .header("foo", "bar")
        .build();
      return next.exchange(filtered);
    })
    .build();
}
1
2
3
4
5
6
7
8
9
10

当然这样不明确,不知道这是拦截请求的还是拦截响应的,那么我们可以手动使用请求的拦截器或者响应的拦截器:

/**
 * WebClient 使用 filter 拦截器
 */
@Slf4j
public class WebClientFilterDemo {
  
    private static ExchangeFilterFunction filterRequest() {
      // ExchangeFilterFunction.ofRequestProcessor 是拦截请求 Request
        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
            // 获取 attribute
            Optional<Object> myAttribute = clientRequest.attribute("myAttribute");
            System.out.println(myAttribute.get());
            return Mono.just(clientRequest);
        });
    }
  
    private static ExchangeFilterFunction logResponseStatus() {
        // ExchangeFilterFunction.ofResponseProcessor 是拦截响应 Respinse
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Response Status {}", clientResponse.statusCode());
            return Mono.just(clientResponse);
        });
    }

    public static void main(String[] args) throws InterruptedException {
        WebClient webClient = WebClient.builder().filter(logResponseStatus()).build();
        webClient.get().uri("http://127.0.0.1:8020/order/findOrderByUserId?userId={userId}", 1)
          			.attribute("myAttribute", "myAttribute")
                .exchange()
                .subscribe(r -> {
                    System.out.println(r.headers());
                    r.bodyToFlux(Order.class).subscribe(System.out::println);
                });

        // 休眠一会,否则 WebClient 中的线程池还没执行,看不到效果
        TimeUnit.SECONDS.sleep(5);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

也可以用于跨领域的问题,比如身份验证。以下示例通过静态工厂方法使用筛选器进行基本身份验证:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;

WebClient client = WebClient.builder()
        .filter(basicAuthentication("user", "password"))
        .build();
1
2
3
4
5

可以通过改变现有的 WebClient 实例来添加或删除过滤器,从而产生不影响原始实例的新 WebClient 实例。例如:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;

WebClient client = webClient.mutate()
        .filters(filterList -> {
            filterList.add(0, basicAuthentication("user", "password"));
        })
        .build();
1
2
3
4
5
6
7

# 超时时间设定

要配置连接超时

HttpClient httpClient = HttpClient.create()
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
 
WebClient webClient = WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient))
        .build();
1
2
3
4
5
6

为所有请求配置响应超时

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(2));

// Create WebClient...
1
2
3
4

为特定请求配置响应超时

WebClient.create().get()
        .uri("https://example.org/path")
        .httpRequest(httpRequest -> {
            HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
            reactorRequest.responseTimeout(Duration.ofSeconds(2));
        })
        .retrieve()
        .bodyToMono(String.class);
1
2
3
4
5
6
7
8

分别配置读取或写入超时

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(10))
                .addHandlerLast(new WriteTimeoutHandler(10)));

// Create WebClient...
1
2
3
4
5
6
7
8
9

# 异常处理

# 默认异常

当我们使用 WebClient 发送请求时, 如果接口返回的不是 200 状态(而是 4xx、5xx 这样的异常状态),则会抛出 WebClientResponseException 异常。

因此我们可以捕获 WebClientResponseException 异常。比如自定义 Spring Boot 的全局异常捕获类来捕获。

# 适配异常

我们可以通过 doOnError 方法适配所有异常,比如下面代码在发生异常时将其转为一个自定义的异常抛出(这里假设使用 RuntimeException):
















 
 
 
 




@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/xxxxx")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class) // 响应数据类型转换
                .doOnError(WebClientResponseException.class, err -> {
                    System.out.println("发生错误:" +err.getRawStatusCode() + " " + err.getResponseBodyAsString());
                    throw new RuntimeException(err.getResponseBodyAsString());
                });
        System.out.println(mono.block());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

当然也可以处理特定的异常,比如 ConnectTimeoutException 异常:
















 
 
 
 




@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public void test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/xxxxx")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class) // 响应数据类型转换
                .doOnError(ConnectTimeoutException.class, err -> {
                    System.out.println("发生错误:" +err.getRawStatusCode() + " " + err.getResponseBodyAsString());
                    throw new RuntimeException(err.getResponseBodyAsString());
                });
        System.out.println(mono.block());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 分类异常处理

上面的异常处理方法,只能处理指定的某种异常:ConnectTimeoutException。如果说我们想让异常处理相对通用一些该怎么办?有的小伙伴可能会想到拦截异常的父类 Exception,当然这也是一种办法。

.doOnError(Exception.class, err -> {
   System.out.println("发生错误:" +err.getMessage() );
});
1
2
3

我们下面为大家介绍一种,针对 HTTP 响应异常处理更友好的一种方式。通常来说,异常可以分为两种:

一种是客户端输入或访问异常,比如:访问的资源不存在 404,没有权限访问资源 403,输入的数据不符合格式等等。这种异常通常是用户访问了不该访问的资源,或者输入了不该输入的数据导致的。通常用HTTP状态码表示在 400-499 范围内。

另一种是服务端内部错误,比如:500 服务内部错误、502 网关错误等等。这种异常通常和用户没什么关系,是IT基础设施或者编程导致的异常。 所以我们只需要针对上面的两类异常进行处理即可。如下文代码所示:

  • e.is4xxClientError() 表示的是 400-499 状态码段的异常
  • e.is5xxClientError() 表示的是 500-599 状态码段的异常










 
 
 
 
 




public void test() throws Exception {
   Mono<String> mono = getWebClient()
               .get()    // 发送 GET 请求
               .uri("/postss/1")  // 服务请求路径,基于 baseurl
               .retrieve() // 获取响应体
               .onStatus(e -> e.is4xxClientError(), resp -> {
                  System.out.println("发生客户端输入错误:" + resp.statusCode().value() + " "
                              + resp.statusCode().getReasonPhrase());
                  return Mono.error(new RuntimeException("请求失败"));
               })
               .onStatus(e -> e.is5xxServerError(), resp -> {
                  System.out.println("发生服务端错误:" + resp.statusCode().value() + " "
                              + resp.statusCode().getReasonPhrase());
                  return Mono.error(new RuntimeException("服务器异常"));
               })
               .bodyToMono(String.class); // 响应数据类型转换
   System.out.println(mono.block());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 在发生异常时返回默认值

使用 onErrorReturn 返回默认值。
















 





@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public String test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/xxxxx")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class)
                .onErrorReturn("请求失败!!!"); // 失败时的默认值
 
        return mono.block();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 重试机制

# 设置重试次数

使用 retry() 方法可以设置当请求异常时的最大重试次数,如果不带参数则表示无限重试,直至成功。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public String test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/data")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(3)) // 3 秒超时
                .retry(3); // 重试 3 次
 
        return mono.block();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 设置重试时间间隔

使用 retry 方法默认情况下请求失败后会立刻重新发起请求,如果希望在每次重试前加个时间间隔(等待),可以使用 retryBackoff 方法。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public String test() {
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/data")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(3)) // 3秒超时
                .retryBackoff(3, Duration.ofSeconds(10)); // 重试 3 次,间隔 10 秒
 
        return mono.block();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 指定需要重试的异常

不管是前面的 retry 方法还是 retryBackoff 方法,设置后无论发生何种异常都会进行重试。如果需要更加精细的控制,比如指定的异常才需要重试,则可以使用 retryWhen 方法。

在使用 retryWhen 方法之前,我们项目中还需要先引入 reactor-extra 包,因为需要用到里面的 Retry 类。

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
</dependency>
1
2
3
4

注意:如果还需要设置对应的重试次数和间隔时间,需要分别通过 Retry 的 retryMax 和 backoff 方法进行设置。

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public String test() {
 
        // 定义重试条件
        Retry<?> retry = Retry.anyOf(RuntimeException.class) // 只有 RuntimeException 异常才重试
                .retryMax(3) // 重试 3 次
                .backoff(Backoff.fixed(Duration.ofSeconds(10))); // 每次重试间隔 10 秒
 
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/data")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(3)) // 3 秒超时
                .retryWhen(retry);
         
        return mono.block();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

下面样例除了 RuntimeException 异常外,发生其它一切异常都会进行重试:

@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .build();
 
    @GetMapping("/test")
    public String test() {
 
        // 定义重试条件
        Retry<?> retry = Retry.allBut(RuntimeException.class) // 除了 RuntimeException 异常都重试
                .retryMax(3) // 重试 3 次
                .backoff(Backoff.fixed(Duration.ofSeconds(10))); // 每次重试间隔 10 秒
 
        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/data")  // 请求一个不存在的路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(3)) // 3 秒超时
                .retryWhen(retry);
 
        return mono.block();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 阻塞用法

通过在结果的末尾阻塞,可以在同步模式下使用 WebClient

Person person = client.get().uri("/person/{id}", i).retrieve()
    .bodyToMono(Person.class)
    .block();

List<Person> persons = client.get().uri("/persons").retrieve()
    .bodyToFlux(Person.class)
    .collectList()
    .block();

1
2
3
4
5
6
7
8
9

如果需要进行多次调用,更有效的方法是避免单独阻塞每个响应,而是等待综合结果

Mono<Person> personMono = client.get().uri("/person/{id}", personId)
        .retrieve().bodyToMono(Person.class);

Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
        .retrieve().bodyToFlux(Hobby.class).collectList();

Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
            Map<String, String> map = new LinkedHashMap<>();
            map.put("person", person);
            map.put("hobbies", hobbies);
            return map;
        })
        .block();
1
2
3
4
5
6
7
8
9
10
11
12
13

以上仅仅是一个例子。有许多其他的模式和操作符可以组合成一个反应式的管道,进行许多远程调用,可能是嵌套的、相互依赖的,直到最后都不会阻塞。

有了 Flux 或 Mono,你就永远不必在 Spring MVC 或 Spring WebFlux 控制器中阻塞。只需从控制器方法返回结果反应类型。同样原理也适用于 Kotlin 协同程序和 Spring WebFlux,只需在控制器方法中使用暂停函数或返回流。

# HTTP Interface

上面的 WebClient 是编程式远程调用,此外 Spring 允许我们通过定义接口的方式,给任意位置发送 http 请求,实现远程调用,可以用来简化 HTTP 远程访问。需要 WebFlux 场景才可,这种方式和 OpenFeign 类似。

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1
2
3
4

定义接口

public interface BingService {

    @GetExchange(url = "/search")
    String search(@RequestParam("keyword") String keyword, @RequsetHeader("Authorization") String auth);
}
1
2
3
4
5

创建代理 & 测试

@SpringBootTest
class Boot05TaskApplicationTests {

    @Test
    void contextLoads() throws InterruptedException {
        // 1、创建客户端
        WebClient client = WebClient.builder()
                .baseUrl("https://www.youngkbt.cn")
                .codecs(clientCodecConfigurer -> {
                    clientCodecConfigurer
                            .defaultCodecs()
                            .maxInMemorySize(256*1024*1024);
                            // 响应数据量太大有可能会超出 BufferSize,所以这里设置的大一点
                })
                .build();
        // 2、创建工厂
        HttpServiceProxyFactory factory = HttpServiceProxyFactory.builder(WebClientAdapter.forClient(client)).build();
        // 3、获取代理对象
        BingService bingService = factory.createClient(BingService.class);


        // 4、测试调用
        Mono<String> search = bingService.search("youngkbt");
        System.out.println("==========");
        search.subscribe(str -> System.out.println(str));

        Thread.sleep(100000);

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

当然上面从头到尾写一个请求流程,其实最终的目的主要是调用接口的方法,从而发送请求。

但是当我们有很多类似于 BingService 的接口类来请求时候,每次都要从头到尾创建客户端、工厂、代理对象来调用接口类发送请求,这样是非常繁琐的,所以我们可以将创建客户端、工厂封装成一个配置类。

@Configuration
public class FactoryConfig {
  @Bean
  public HttpServiceProxyFactory getHttpServiceProxyFactory(){
    // 1、创建客户端
    WebClient client = WebClient.builder()
      .baseUrl("https://www.youngkbt.cn")
      .codecs(clientCodecConfigurer -> {
        clientCodecConfigurer
          .defaultCodecs()
          .maxInMemorySize(256*1024*1024);
        // 响应数据量太大有可能会超出 BufferSize,所以这里设置的大一点
      })
      .build();
    // 2、创建工厂
    return HttpServiceProxyFactory.builder(WebClientAdapter.forClient(client)).build();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

然后

@SpringBootTest
class Boot05TaskApplicationTests {
  
    @Autowired
    private FactoryConfig factoryConfig;

    @Test
    void contextLoads() throws InterruptedException {
        HttpServiceProxyFactory factory = factoryConfig.getHttpServiceProxyFactory();
        // 获取代理对象
        BingService bingService = factoryConfig.getHttpServiceProxyFactory(BingService.class);


        // 测试调用
        Mono<String> search = bingService.search("youngkbt");
        System.out.println("==========");
        search.subscribe(str -> System.out.println(str));

        Thread.sleep(100000);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>