目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Reactor

Reactive Programming

 响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

1# 传统模式下编程:a 的值在执行后被确定。
2a = b + c 
3
4# 响应式编程
5a 的值随着 b、c 值的变化而变化。

Project Reactor

 Reactor 是由 Pivotal 团队开发的,第四代 Reactive 库,用于根据 Reactive Streams 规范在 JVM 上构建非阻塞应用程序。

核心的概念

Operators - Publisher / Subscriber

  • Nothing Happens Util You subscribe()
  • Flux [ 0..N ] - onNext()、onComplete()、onError()
  • Mono [ 0..1 ] - onNext()、onComplete()、onError()

Backpressure

  • Subscription
  • onRequest()、onCancel()、onDispose()

线程调度 Schedulers

  • immediate() / single() / newSingle()
  • elastic() / parallel() / newParallel() :线程池相关。

错误处理

  • onError / onErrorReturn / onErrorResume
  • doOnError / doFinally

示例

引入依赖

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4	<modelVersion>4.0.0</modelVersion>
 5	<parent>
 6		<groupId>org.springframework.boot</groupId>
 7		<artifactId>spring-boot-starter-parent</artifactId>
 8		<version>2.1.2.RELEASE</version>
 9		<relativePath/> <!-- lookup parent from repository -->
10	</parent>
11	<groupId>geektime.spring.reactor</groupId>
12	<artifactId>simple-reactor-demo</artifactId>
13	<version>0.0.1-SNAPSHOT</version>
14	<name>simple-reactor-demo</name>
15	<description>Demo project for Spring Boot</description>
16
17	<properties>
18		<java.version>1.8</java.version>
19	</properties>
20
21	<dependencies>
22		<dependency>
23			<groupId>org.springframework.boot</groupId>
24			<artifactId>spring-boot-starter</artifactId>
25		</dependency>
26		<dependency>
27			<groupId>org.projectlombok</groupId>
28			<artifactId>lombok</artifactId>
29		</dependency>
30		<dependency>
31			<groupId>io.projectreactor</groupId>
32			<artifactId>reactor-core</artifactId>
33		</dependency>
34		<dependency>
35			<groupId>org.springframework.boot</groupId>
36			<artifactId>spring-boot-starter-test</artifactId>
37			<scope>test</scope>
38		</dependency>
39	</dependencies>
40
41	<build>
42		<plugins>
43			<plugin>
44				<groupId>org.springframework.boot</groupId>
45				<artifactId>spring-boot-maven-plugin</artifactId>
46			</plugin>
47		</plugins>
48	</build>
49</project>

启动类

 1package geektime.spring.reactor.simple;
 2
 3import lombok.extern.slf4j.Slf4j;
 4import org.springframework.boot.ApplicationArguments;
 5import org.springframework.boot.ApplicationRunner;
 6import org.springframework.boot.SpringApplication;
 7import org.springframework.boot.autoconfigure.SpringBootApplication;
 8import reactor.core.publisher.Flux;
 9import reactor.core.publisher.Mono;
10import reactor.core.scheduler.Schedulers;
11
12@SpringBootApplication
13@Slf4j
14public class SimpleReactorDemoApplication implements ApplicationRunner {
15
16	public static void main(String[] args) {
17		SpringApplication.run(SimpleReactorDemoApplication.class, args);
18	}
19
20	@Override
21	public void run(ApplicationArguments args) throws Exception {
22		//创建1~6的序列
23		Flux.range(1, 6)
24				//在每次执行request的时候打印请求多少个数的日志
25				.publishOn(Schedulers.elastic())
26				.doOnRequest(n -> log.info("Request {} number", n)) // 注意顺序造成的区别
27				.doOnComplete(() -> log.info("Publisher COMPLETE 1"))//执行完序列1~6之后打印 Publisher COMPLETE 1
28				.map(i -> {
29					log.info("Publish {}, {}", Thread.currentThread(), i);//map执行是在哪个线程上,打印线程名字
30					return 10 / (i - 3);
31//					return i;
32				})
33				.doOnComplete(() -> log.info("Publisher COMPLETE 2"))//执行完序列1~6之后打印 Publisher COMPLETE 2
34				.subscribeOn(Schedulers.single())
35				.onErrorResume(e -> {
36					log.error("Exception {}", e.toString());
37					return Mono.just(-1);
38				})
39//				.onErrorReturn(-1)
40				.subscribe(i -> log.info("Subscribe {}: {}", Thread.currentThread(), i),
41						e -> log.error("error {}", e.toString()),
42						() -> log.info("Subscriber COMPLETE"),
43						s -> s.request(4)
44				);
45		Thread.sleep(2000);
46	}
47}
48
49

控制台输出

12020-01-15 16:26:04.248  INFO 17748 --- [       single-1] g.s.r.s.SimpleReactorDemoApplication     : Request 4 number
22020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 1
32020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -5
42020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 2
52020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -10
62020-01-15 16:26:04.250  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Publish Thread[elastic-2,5,main], 3
72020-01-15 16:26:04.252 ERROR 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Exception java.lang.ArithmeticException: / by zero
82020-01-15 16:26:04.253  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscribe Thread[elastic-2,5,main]: -1
92020-01-15 16:26:04.253  INFO 17748 --- [      elastic-2] g.s.r.s.SimpleReactorDemoApplication     : Subscriber COMPLETE

作者:Soulboy