Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification
1 |
|
Mono和Flux
Reactor中的发布者(Publisher
)由Flux
和Mono
两个类定义。一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果。
既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
不过,这三种信号都不是一定要具备的:
- 首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
- 如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
- 如果没有错误信号和完成信号,那么就是一个无限数据流。
Flux模型
Mono模型
创建
simple
1 |
|
generate
1 |
|
create
1 |
|
push
handle
fromRunnable
1 |
|
fromCallable
1 |
|
fromSupplier
1 |
|
fromFuture
1 |
|
订阅
不订阅什么都不会发生(不会发出信号,因为没有订阅者发送request信号(org.reactivestreams.Subscription#request
))。
1 |
|
LambdaSubscriber
1 |
|
BaseSubscriber
1 |
|
操作符
通常情况下,我们需要对源发布者
发出的原始数据
进行多个阶段的处理
,并最终得到我们需要的结果数据
。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐由原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于源发布者
,消费者就相当于订阅者
,流水线上的一道道工序就相当于一个一个的操作符(Operator)
。每一个操作符对上游Publisher发出的数据进行相应的处理(Function等),然后生产新的数据,同时组成新的Publisher,以此类推。Flux和Mono都提供了丰富的操作符(operator)。
另外,操作符也是Publisher
InnerOperator
同步或异步
同步操作符,意味者一个元素一个元素的依次同步处理,生成的结果是顺序的。
异步操作符,意味着一个元素一个元素的依次异步处理,生成的结果是不保证顺序的。
转换
map
同步对元素进行函数处理生成新的元素
1 |
|
flatMap
异步对元素进行函数处理生成新的Publisher,所有的Publisher组成一个更大的Publisher。
不保证顺序
1 |
|
flatMapSequential
类同flatMap,并行消费,保证顺序
1 |
|
concatMap
类同flatMap,串行消费,保证顺序
1 |
|
合并
zip
zipWith
过滤
filter
同步
take
first
last
sample
skip
limitRequest
拆分
基于时间的操作
delayElement
timeout
doOn*
doOnNext
doOnError
doOnComplete
doOnSubscribe
doOnCancel
error-handling operators
timeout
onErrorReturn
onErrorResume
onErrorMap
doOnError
using
doFinally
retry
对于发生错误信号的流进行重试,retry对于上游Flux是采取的重订阅(re-subscribing)的方式,因此重试之后实际上是一个不同的序列了, 发出错误信号的序列仍然是终止了的。再一次从新订阅了原始的数据流。
1 |
|
输入如下:
1 |
|
回到同步的世界
Flux#blockFirst
Flux#blockLast
Flux#toIterable
Flux#toStream
Mono#block
Mono#toFuture
调度器与线程模型
Scheduler
调度器,类似Executor
当前线程
可重用的单线程
弹性线程池
固定大小的线程池
Schedulers
调度器工厂类
publishOn和subscribeOn
一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器),比如Flux.interval
默认使用固定大小线程池
Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:publishOn
和subscribeOn
。它们都接受一个 Scheduler
作为参数,从而可以改变调度器。但是publishOn
在链中出现的位置是有讲究的,而subscribeOn
则无所谓。
publishOn
会影响链中其后
的操作符。
subscribeOn
无论出现在什么位置,会影响从源头开始的执行环境,直到被publishOn
切换。
错误处理
背压(back pressure)
测试
StepVerifier
1 |
|
PublisherProbe
TestPublisher
调试
1 |
|
1 |
|