Reactor
Reactive Programming
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
1# 传统模式下编程:a 的值在执行后被确定。
2a = b + c
3
4# 响应式编程
5a 的值随着 b、c 值的变化而变化。
Project Reactor
Reactor 是由 Pivotal 团队开发的,第四代 Reactive 库,用于根据 Reactive Streams 规范在 JVM 上构建非阻塞应用程序。
核心的概念
Operators - Publisher / Subscriber
- Nothing Happens Util You subscribe()
- Flux [ 0..N ] - onNext()、onComplete()、onError()
- Mono [ 0..1 ] - onNext()、onComplete()、onError()
Backpressure
- Subscription
- onRequest()、onCancel()、onDispose()
线程调度 Schedulers
- immediate() / single() / newSingle()
- elastic() / parallel() / newParallel() :线程池相关。
错误处理
- onError / onErrorReturn / onErrorResume
- doOnError / doFinally
示例
引入依赖
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <parent>
6 <groupId>org.springframework.boot</groupId>
7 <artifactId>spring-boot-starter-parent</artifactId>
8 <version>2.1.2.RELEASE</version>
9 <relativePath/> <!-- lookup parent from repository -->
10 </parent>
11 <groupId>geektime.spring.reactor</groupId>
12 <artifactId>simple-reactor-demo</artifactId>
13 <version>0.0.1-SNAPSHOT</version>
14 <name>simple-reactor-demo</name>
15 <description>Demo project for Spring Boot</description>
16
17 <properties>
18 <java.version>1.8</java.version>
19 </properties>
20
21 <dependencies>
22 <dependency>
23 <groupId>org.springframework.boot</groupId>
24 <artifactId>spring-boot-starter</artifactId>
25 </dependency>
26 <dependency>
27 <groupId>org.projectlombok</groupId>
28 <artifactId>lombok</artifactId>
29 </dependency>
30 <dependency>
31 <groupId>io.projectreactor</groupId>
32 <artifactId>reactor-core</artifactId>
33 </dependency>
34 <dependency>
35 <groupId>org.springframework.boot</groupId>
36 <artifactId>spring-boot-starter-test</artifactId>
37 <scope>test</scope>
38 </dependency>
39 </dependencies>
40
41 <build>
42 <plugins>
43 <plugin>
44 <groupId>org.springframework.boot</groupId>
45 <artifactId>spring-boot-maven-plugin</artifactId>
46 </plugin>
47 </plugins>
48 </build>
49</project>
启动类
1package geektime.spring.reactor.simple;
2
3import lombok.extern.slf4j.Slf4j;
4import org.springframework.boot.ApplicationArguments;
5import org.springframework.boot.ApplicationRunner;
6import org.springframework.boot.SpringApplication;
7import org.springframework.boot.autoconfigure.SpringBootApplication;
8import reactor.core.publisher.Flux;
9import reactor.core.publisher.Mono;
10import reactor.core.scheduler.Schedulers;
11
12@SpringBootApplication
13@Slf4j
14public class SimpleReactorDemoApplication implements ApplicationRunner {
15
16 public static void main(String[] args) {
17 SpringApplication.run(SimpleReactorDemoApplication.class, args);
18 }
19
20 @Override
21 public void run(ApplicationArguments args) throws Exception {
22 //创建1~6的序列
23 Flux.range(1, 6)
24 //在每次执行request的时候打印请求多少个数的日志
25 .publishOn(Schedulers.elastic())
26 .doOnRequest(n -> log.info("Request {} number", n)) // 注意顺序造成的区别
27 .doOnComplete(() -> log.info("Publisher COMPLETE 1"))//执行完序列1~6之后打印 Publisher COMPLETE 1
28 .map(i -> {
29 log.info("Publish {}, {}", Thread.currentThread(), i);//map执行是在哪个线程上,打印线程名字
30 return 10 / (i - 3);
31// return i;
32 })
33 .doOnComplete(() -> log.info("Publisher COMPLETE 2"))//执行完序列1~6之后打印 Publisher COMPLETE 2
34 .subscribeOn(Schedulers.single())
35 .onErrorResume(e -> {
36 log.error("Exception {}", e.toString());
37 return Mono.just(-1);
38 })
39// .onErrorReturn(-1)
40 .subscribe(i -> log.info("Subscribe {}: {}", Thread.currentThread(), i),
41 e -> log.error("error {}", e.toString()),
42 () -> log.info("Subscriber COMPLETE"),
43 s -> s.request(4)
44 );
45 Thread.sleep(2000);
46 }
47}
48
49
控制台输出
12020-01-15 16:26:04.248 INFO 17748 --- [ single-1] g.s.r.s.SimpleReactorDemoApplication : Request 4 number
22020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 1
32020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -5
42020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 2
52020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -10
62020-01-15 16:26:04.250 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Publish Thread[elastic-2,5,main], 3
72020-01-15 16:26:04.252 ERROR 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Exception java.lang.ArithmeticException: / by zero
82020-01-15 16:26:04.253 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscribe Thread[elastic-2,5,main]: -1
92020-01-15 16:26:04.253 INFO 17748 --- [ elastic-2] g.s.r.s.SimpleReactorDemoApplication : Subscriber COMPLETE