软件体系结构 响应式架构

2024/03/06 转载 架构 共 3944 字,约 12 分钟

代码地址:https://github.com/sa-spring/spring-webflux https://www.bilibili.com/video/BV1dt4y1y7bC

软件体系结构-12. 响应式架构

先来进行压测一下:

SpringMVC:

package com.example.webmvc;

import java.util.concurrent.TimeUnit;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
    @GetMapping("/hello/{latency}")
    public String hello(@PathVariable long latency) {
        try {
            TimeUnit.MILLISECONDS.sleep(latency); // 1
        } catch (InterruptedException e) {
            return "Error during thread sleep";
        }
        return "Welcome to reactive world ~";
    }
}

SpringWebFlux:

package com.example.webflux;

import java.time.Duration;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;

@RestController
public class HelloController {
    @GetMapping("/hello/{latency}")
    public Mono<String> hello(@PathVariable int latency) {
        return Mono.just("Welcome to reactive world ~").delayElement(Duration.ofMillis(latency)); // 1
    }
}

左边是 SpringMVC 的测试结果,右边是 SpringWebFlux。使用加特林进行压力测试。

这里其实应该是由于

  • 评论
    • 我的觉得比较方法不合理。 Flux 项目也要用 Thread.sleep(1000)试试。 耗时的业务可能在返回 Mono 之前。 正解, Thread.sleep 是阻塞了线程, Reactive 的 sleep 是让 Failure 进入等待, 让线程切换到其他 Failure 继续执行, 不会阻塞线程, 所以这里确实是不严谨, 甚至是错误的性能对比
    • 说的不太对,这个案例中 Web Mvc 应用响应时间长并不是因为 线程数量多 CPU 进行频繁的线程程上下文切换而导致响应时间变慢,而是因为 Tomcat 默认的线程池数量只有 200 个。然而这 200 个线程全部都要进行 2 秒 的阻塞。而导致后面进来的请求也进入阻塞队列中等待前面的请求处理完成

传统的 Tomcat 服务器,会搞一个线程池,对每个请求都分配一个线程来处理。

是的,传统的 Tomcat 服务器通常会使用线程池来处理客户端的请求。在 Tomcat 中,每个请求通常会分配一个线程来处理,而线程池则负责管理这些线程的创建、销毁和复用。 当客户端发起请求时,Tomcat 会从线程池中获取一个空闲的线程,并将请求分配给该线程进行处理。处理完成后,线程将被释放回线程池,以便处理下一个请求。这种机制可以确保 Tomcat 能够有效地处理大量的并发请求,而不会因为频繁地创建和销毁线程而导致资源浪费和性能下降。 通过使用线程池,Tomcat 可以根据实际负载情况动态调整线程的数量,从而更好地适应不同的工作负载。这种方式可以提高服务器的性能和稳定性,同时也减少了对系统资源的占用。

但是线程切换的成本很大。而且内存开销很大。

现在我们采用异步的方式,通过回调函数,告诉,如果成功了你干什么事,如果不成功你干什么事。

要是有 await async 就没这么烦了,本质上这俩人都是在控制数据,干一些 IO 密集型的工作。对数据进行加工,很难理解。

看成一个序列。

看下面,订阅者可以通过 request 对发布者进行控制(通过 Subscription 接口,进行 request 和 cancel),这个是区别于事件驱动架构的,那个里面不能这样。

此外,还可以在中间加上 Processor,将一个事件转换成另一个事件。

下面通过 request 用到了背压

什么是背压控制? 背压控制(Backpressure Control)是指在处理异步数据流时,当生产者产生的数据流速度大于消费者处理的速度时,消费者能够向生产者发送信号,告知其降低生产速率或者暂停生产,以防止数据积压和资源耗尽的问题。 背压控制通常用于处理大量的异步数据流,比如在流式处理、消息队列、网络通信等场景下。在这些场景中,生产者和消费者之间的数据流速度可能不同步,如果没有合适的背压控制机制,就可能导致数据积压、内存溢出、系统崩溃等问题。 背压控制的主要目标是在生产者和消费者之间建立一种协调机制,以确保生产者的生产速率与消费者的处理速率保持平衡,从而有效地管理系统资源并防止数据丢失或积压。 在实际应用中,背压控制可以通过各种方式来实现,包括但不限于以下几种方法:

  • 缓冲区管理:使用固定大小的缓冲区来控制数据流量,当缓冲区达到一定阈值时,停止接收新数据或者丢弃部分数据。
  • 流量控制:通过控制生产者的生产速率或消费者的处理速率来平衡数据流量,比如使用令牌桶算法或限流器。
  • 反压机制:消费者向生产者发送信号,告知其降低生产速率或者暂停生产,直到消费者处理完数据后再继续生产。 总的来说,背压控制是一种重要的异步编程技术,用于处理异步数据流中生产者和消费者之间速率不匹配的情况,以确保系统的稳定性、可靠性和性能。

上面的就用到了反压机制。

比如后端一直给前端发送数据,前端来不及处理,那岂不是欧耶了。

所以请求方和处理方实现调节是很重要的。

下面是实现了这个东西的框架

在 Project Reactor 中,有许多操作符(Operator)可以用于对流进行转换、过滤、组合等操作。下面举例说明几个常用的操作符:

  1. map 操作符:map 操作符用于将流中的每个元素进行映射转换,生成一个新的元素。例如,将流中的每个整数加 1:

javaCopy code Flux numbers = Flux.just(1, 2, 3, 4, 5); Flux mappedNumbers = numbers.map(num -> num + 1); mappedNumbers.subscribe(System.out::println); // 输出:2, 3, 4, 5, 6

> 1. filter 操作符:filter 操作符用于过滤流中的元素,只保留满足条件的元素。例如,过滤出流中的偶数:
> ```java
javaCopy code
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> evenNumbers = numbers.filter(num -> num % 2 == 0);
evenNumbers.subscribe(System.out::println); // 输出:2, 4
  1. flatMap 操作符:flatMap 操作符用于将流中的每个元素转换成一个新的流,并将这些新的流合并成一个单一的流。例如,将每个字符串拆分成字符并输出:

javaCopy code Flux words = Flux.just("hello", "world"); Flux characters = words.flatMap(word -> Flux.fromArray(word.toCharArray())); characters.subscribe(System.out::println); // 输出:h, e, l, l, o, w, o, r, l, d

> 1. zip 操作符:zip 操作符用于将多个流中的元素逐个配对,并将配对后的元素合并成一个新的元组流。例如,将两个流中的元素相加:
> ```java
javaCopy code
Flux<Integer> numbers1 = Flux.just(1, 2, 3);
Flux<Integer> numbers2 = Flux.just(4, 5, 6);
Flux<Integer> sum = Flux.zip(numbers1, numbers2, (num1, num2) -> num1 + num2);
sum.subscribe(System.out::println); // 输出:5, 7, 9

以上只是一小部分 Project Reactor 中的操作符,还有许多其他操作符可供使用,可以根据具体的需求选择合适的操作符来处理流。这些操作符可以使得异步编程变得更加灵活和高效。

使用 Future 进行编程的模式也可使用 Reactor 进行。

左边是命令式,右边是声明式。

右边按甚至可以在分布式环境下执行。

服务端:

客户端:

Spring 也在开发响应式的数据库连接。

文档信息

Search

    Table of Contents