实践Reactor

Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Dysprosium-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-test</artifactId>
	<scope>test</scope>
</dependency>

Mono和Flux

Reactor中的发布者(Publisher)由FluxMono两个类定义。一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果。

既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

不过,这三种信号都不是一定要具备的:

  • 首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
  • 如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
  • 如果没有错误信号和完成信号,那么就是一个无限数据流。

Flux模型

Flux

Mono模型

Mono

创建

simple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);

// 基于数组
Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);

// 基于集合
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);

// 基于Stream
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);

// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());

// 只有错误信号的空数据流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));

Flux.interval(Duration.ofMillis(200)));

generate

1
2
3
<T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
<T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
<T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

create

1
2
<T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
<T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)

push

handle

fromRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    @Test
    public void testFromRunnable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono.fromRunnable(() -> {
            try {
                logger.info("begin Runnable");
                TimeUnit.SECONDS.sleep(2);
                logger.info("end Runnable");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        })
                .subscribe(null, null, countDownLatch::countDown);
        logger.info("main over");
        countDownLatch.await(10, TimeUnit.SECONDS);
    }

fromCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Test
    public void testFromCallable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono.fromCallable(() -> {
            try {
                logger.info("begin Callable");
                TimeUnit.SECONDS.sleep(2);
                logger.info("end Callable");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Reactor!";
        }).map(str -> {
            logger.info("map: " + str);
            return str + ", Welcome!";
        })
                .subscribe(logger::info, null, countDownLatch::countDown);
        logger.info("main over");
        countDownLatch.await(10, TimeUnit.SECONDS);
    }

fromSupplier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Test
    public void testFromSupplier() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono.fromSupplier(() -> {
            try {
                logger.info("begin Supplier");
                TimeUnit.SECONDS.sleep(2);
                logger.info("end Supplier");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Reactor!";
        }).map(str -> {
            logger.info("map: " + str);
            return str + ", Welcome!";
        })
                .subscribe(logger::info, null, countDownLatch::countDown);
        logger.info("main over");
        countDownLatch.await(10, TimeUnit.SECONDS);
    }

fromFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    private CompletableFuture<String> getCompletableFuture(String name) {
        logger.info("begin getCompletableFuture");
        final CompletableFuture<String> completableFuture = new CompletableFuture<>();
        // 模拟其他线程完成future(比如Dubbo异步调用应答)
        new Thread(() -> {
            logger.info("begin sleep");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("end sleep");
            completableFuture.complete("hello, " + name);
        }).start();
        logger.info("end getCompletableFuture");
        return completableFuture;
    }

    @Test
    public void testFromFuture() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono.fromFuture(getCompletableFuture("ray"))
                // 接下来由future完成线程执行
                .map(str -> {
                    logger.info("map: " + str);
                    return str + ", welcome!";
                })
                .subscribe(logger::info, null, countDownLatch::countDown);
        logger.info("main over");
        countDownLatch.await(10, TimeUnit.SECONDS);
    }

订阅

不订阅什么都不会发生(不会发出信号,因为没有订阅者发送request信号(org.reactivestreams.Subscription#request))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer); 
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 
// 订阅,上面4个其实使用的是LambdaSubscriber,自定义Subscriber一般继承BaseSubscriber
subscribe(Subscriber<? super T> actual)

LambdaSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;


/**
 * An unbounded Java Lambda adapter to {@link Subscriber}
 *
 * @param <T> the value type
 */
final class LambdaSubscriber<T>
		implements InnerConsumer<T>, Disposable {

	final Consumer<? super T>            consumer;
	final Consumer<? super Throwable>    errorConsumer;
	final Runnable                       completeConsumer;
	final Consumer<? super Subscription> subscriptionConsumer;
	final Context                        initialContext;

	volatile Subscription subscription;
	static final AtomicReferenceFieldUpdater<LambdaSubscriber, Subscription> S =
			AtomicReferenceFieldUpdater.newUpdater(LambdaSubscriber.class,
					Subscription.class,
					"subscription");

	/**
	 * Create a {@link Subscriber} reacting onNext, onError and onComplete. If no
	 * {@code subscriptionConsumer} is provided, the subscriber will automatically request
	 * Long.MAX_VALUE in onSubscribe, as well as an initial {@link Context} that will be
	 * visible by operators upstream in the chain.
	 *
	 * @param consumer A {@link Consumer} with argument onNext data
	 * @param errorConsumer A {@link Consumer} called onError
	 * @param completeConsumer A {@link Runnable} called onComplete with the actual
	 * context if any
	 * @param subscriptionConsumer A {@link Consumer} called with the {@link Subscription}
	 * to perform initial request, or null to request max
	 * @param initialContext A {@link Context} for this subscriber, or null to use the default
	 * of an {@link Context#empty() empty Context}.
	 */
	LambdaSubscriber(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer,
			@Nullable Context initialContext) {
		this.consumer = consumer;
		this.errorConsumer = errorConsumer;
		this.completeConsumer = completeConsumer;
		this.subscriptionConsumer = subscriptionConsumer;
		this.initialContext = initialContext == null ? Context.empty() : initialContext;
	}

	/**
	 * Create a {@link Subscriber} reacting onNext, onError and onComplete. If no
	 * {@code subscriptionConsumer} is provided, the subscriber will automatically request
	 * Long.MAX_VALUE in onSubscribe, as well as an initial {@link Context} that will be
	 * visible by operators upstream in the chain.
	 *
	 * @param consumer A {@link Consumer} with argument onNext data
	 * @param errorConsumer A {@link Consumer} called onError
	 * @param completeConsumer A {@link Runnable} called onComplete with the actual
	 * context if any
	 * @param subscriptionConsumer A {@link Consumer} called with the {@link Subscription}
	 * to perform initial request, or null to request max
	 */ //left mainly for the benefit of tests
	LambdaSubscriber(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer) {
		this(consumer, errorConsumer, completeConsumer, subscriptionConsumer, null);
	}

	@Override
	public Context currentContext() {
		return this.initialContext;
	}

	@Override
	public final void onSubscribe(Subscription s) {
		if (Operators.validate(subscription, s)) {
			this.subscription = s;
			if (subscriptionConsumer != null) {
				try {
					subscriptionConsumer.accept(s);
				}
				catch (Throwable t) {
					Exceptions.throwIfFatal(t);
					s.cancel();
					onError(t);
				}
			}
			else {
				s.request(Long.MAX_VALUE);
			}
		}
	}

	@Override
	public final void onComplete() {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s == Operators.cancelledSubscription()) {
			return;
		}
		if (completeConsumer != null) {
			try {
				completeConsumer.run();
			}
			catch (Throwable t) {
				Exceptions.throwIfFatal(t);
				onError(t);
			}
		}
	}

	@Override
	public final void onError(Throwable t) {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s == Operators.cancelledSubscription()) {
			Operators.onErrorDropped(t, this.initialContext);
			return;
		}
		if (errorConsumer != null) {
			errorConsumer.accept(t);
		}
		else {
			throw Exceptions.errorCallbackNotImplemented(t);
		}
	}

	@Override
	public final void onNext(T x) {
		try {
			if (consumer != null) {
				consumer.accept(x);
			}
		}
		catch (Throwable t) {
			Exceptions.throwIfFatal(t);
			this.subscription.cancel();
			onError(t);
		}
	}

	@Override
	@Nullable
	public Object scanUnsafe(Attr key) {
		if (key == Attr.PARENT) return subscription;
		if (key == Attr.PREFETCH) return Integer.MAX_VALUE;
		if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();

		return null;
	}


	@Override
	public boolean isDisposed() {
		return subscription == Operators.cancelledSubscription();
	}

	@Override
	public void dispose() {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s != null && s != Operators.cancelledSubscription()) {
			s.cancel();
		}
	}
}

BaseSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.util.context.Context;

/**
 * A simple base class for a {@link Subscriber} implementation that lets the user
 * perform a {@link #request(long)} and {@link #cancel()} on it directly. As the targeted
 * use case is to manually handle requests, the {@link #hookOnSubscribe(Subscription)} and
 * {@link #hookOnNext(Object)} hooks are expected to be implemented, but they nonetheless
 * default to an unbounded request at subscription time. If you need to define a {@link Context}
 * for this {@link BaseSubscriber}, simply override its {@link #currentContext()} method.
 * <p>
 * Override the other optional hooks {@link #hookOnComplete()},
 * {@link #hookOnError(Throwable)} and {@link #hookOnCancel()}
 * to customize the base behavior. You also have a termination hook,
 * {@link #hookFinally(SignalType)}.
 * <p>
 * Most of the time, exceptions triggered inside hooks are propagated to
 * {@link #onError(Throwable)} (unless there is a fatal exception). The class is in the
 * {@code reactor.core.publisher} package, as this subscriber is tied to a single
 * {@link org.reactivestreams.Publisher}.
 *
 * @author Simon Baslé
 */
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
                                                   Disposable {

	volatile Subscription subscription;

	static AtomicReferenceFieldUpdater<BaseSubscriber, Subscription> S =
			AtomicReferenceFieldUpdater.newUpdater(BaseSubscriber.class, Subscription.class, "subscription");

	/**
	 * Return current {@link Subscription}
	 * @return current {@link Subscription}
	 */
	protected Subscription upstream() {
		return subscription;
	}

	@Override
	public boolean isDisposed() {
		return subscription == Operators.cancelledSubscription();
	}

	/**
	 * {@link Disposable#dispose() Dispose} the {@link Subscription} by
	 * {@link Subscription#cancel() cancelling} it.
	 */
	@Override
	public void dispose() {
		cancel();
	}

	/**
	 * Hook for further processing of onSubscribe's Subscription. Implement this method
	 * to call {@link #request(long)} as an initial request. Values other than the
	 * unbounded {@code Long.MAX_VALUE} imply that you'll also call request in
	 * {@link #hookOnNext(Object)}.
	 * <p> Defaults to request unbounded Long.MAX_VALUE as in {@link #requestUnbounded()}
	 *
	 * @param subscription the subscription to optionally process
	 */
	protected void hookOnSubscribe(Subscription subscription){
		subscription.request(Long.MAX_VALUE);
	}

	/**
	 * Hook for processing of onNext values. You can call {@link #request(long)} here
	 * to further request data from the source {@link org.reactivestreams.Publisher} if
	 * the {@link #hookOnSubscribe(Subscription) initial request} wasn't unbounded.
	 * <p>Defaults to doing nothing.
	 *
	 * @param value the emitted value to process
	 */
	protected void hookOnNext(T value){
		// NO-OP
	}

	/**
	 * Optional hook for completion processing. Defaults to doing nothing.
	 */
	protected void hookOnComplete() {
		// NO-OP
	}

	/**
	 * Optional hook for error processing. Default is to call
	 * {@link Exceptions#errorCallbackNotImplemented(Throwable)}.
	 *
	 * @param throwable the error to process
	 */
	protected void hookOnError(Throwable throwable) {
		throw Exceptions.errorCallbackNotImplemented(throwable);
	}

	/**
	 * Optional hook executed when the subscription is cancelled by calling this
	 * Subscriber's {@link #cancel()} method. Defaults to doing nothing.
	 */
	protected void hookOnCancel() {
		//NO-OP
	}

	/**
	 * Optional hook executed after any of the termination events (onError, onComplete,
	 * cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)},
	 * {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks
	 * fail. Defaults to doing nothing. A failure of the callback will be caught by
	 * {@link Operators#onErrorDropped(Throwable, reactor.util.context.Context)}.
	 *
	 * @param type the type of termination event that triggered the hook
	 * ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or
	 * {@link SignalType#CANCEL})
	 */
	protected void hookFinally(SignalType type) {
		//NO-OP
	}

	@Override
	public final void onSubscribe(Subscription s) {
		if (Operators.setOnce(S, this, s)) {
			try {
				hookOnSubscribe(s);
			}
			catch (Throwable throwable) {
				onError(Operators.onOperatorError(s, throwable, currentContext()));
			}
		}
	}

	@Override
	public final void onNext(T value) {
		Objects.requireNonNull(value, "onNext");
		try {
			hookOnNext(value);
		}
		catch (Throwable throwable) {
			onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
		}
	}

	@Override
	public final void onError(Throwable t) {
		Objects.requireNonNull(t, "onError");

		if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators
				.cancelledSubscription()) {
			//already cancelled concurrently
			Operators.onErrorDropped(t, currentContext());
			return;
		}


		try {
			hookOnError(t);
		}
		catch (Throwable e) {
			e = Exceptions.addSuppressed(e, t);
			Operators.onErrorDropped(e, currentContext());
		}
		finally {
			safeHookFinally(SignalType.ON_ERROR);
		}
	}

	@Override
	public final void onComplete() {
		if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators
				.cancelledSubscription()) {
			//we're sure it has not been concurrently cancelled
			try {
				hookOnComplete();
			}
			catch (Throwable throwable) {
				//onError itself will short-circuit due to the CancelledSubscription being set above
				hookOnError(Operators.onOperatorError(throwable, currentContext()));
			}
			finally {
				safeHookFinally(SignalType.ON_COMPLETE);
			}
		}
	}

	@Override
	public final void request(long n) {
		if (Operators.validate(n)) {
			Subscription s = this.subscription;
			if (s != null) {
				s.request(n);
			}
		}
	}

	/**
	 * {@link #request(long) Request} an unbounded amount.
	 */
	public final void requestUnbounded() {
		request(Long.MAX_VALUE);
	}

	@Override
	public final void cancel() {
		if (Operators.terminate(S, this)) {
			try {
				hookOnCancel();
			}
			catch (Throwable throwable) {
				hookOnError(Operators.onOperatorError(subscription, throwable, currentContext()));
			}
			finally {
				safeHookFinally(SignalType.CANCEL);
			}
		}
	}

	void safeHookFinally(SignalType type) {
		try {
			hookFinally(type);
		}
		catch (Throwable finallyFailure) {
			Operators.onErrorDropped(finallyFailure, currentContext());
		}
	}

	@Override
	public String toString() {
		return getClass().getSimpleName();
	}
}

操作符

通常情况下,我们需要对源发布者发出的原始数据进行多个阶段的处理,并最终得到我们需要的结果数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐由原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于源发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个一个的操作符(Operator)。每一个操作符对上游Publisher发出的数据进行相应的处理(Function等),然后生产新的数据,同时组成新的Publisher,以此类推。Flux和Mono都提供了丰富的操作符(operator)。

另外,操作符也是Publisher

InnerOperator

同步或异步

同步操作符,意味者一个元素一个元素的依次同步处理,生成的结果是顺序的。

异步操作符,意味着一个元素一个元素的依次异步处理,生成的结果是不保证顺序的。

转换

map

同步对元素进行函数处理生成新的元素

1
2
3
4
5
StepVerifier.create(
                Flux.range(0, 6)
                        .map(i -> i * 2)
                        .doOnNext(System.out::println))
                .expectNext(0, 2, 4, 6, 8, 10).verifyComplete();

flatMap

异步对元素进行函数处理生成新的Publisher,所有的Publisher组成一个更大的Publisher。

不保证顺序

flatMapForFlux

1
2
3
4
5
6
7
// d/a/e/b/f/c
        StepVerifier.create(
                Flux.just("abc", "def")
                        .flatMap(i -> Flux.fromArray(i.split("\s*"))
                        .delayElements(Duration.ofMillis(100)))
                        .doOnNext(System.out::println))
                .expectNextCount(6).verifyComplete();

flatMapSequential

类同flatMap,并行消费,保证顺序

flatMapSequential

1
2
3
4
5
6
7
// a/b/c/d/e/f
        StepVerifier.create(
                Flux.just("abc", "def")
                        .concatMap(i -> Flux.fromArray(i.split("\s*"))
                        .delayElements(Duration.ofMillis(100)))
                        .doOnNext(System.out::println))
                .expectNextCount(6).verifyComplete();

concatMap

类同flatMap,串行消费,保证顺序

concatMap

1
2
3
4
5
6
7
// a/b/c/d/e/f
        StepVerifier.create(
                Flux.just("abc", "def")
                        .flatMapSequential(i -> Flux.fromArray(i.split("\s*"))
                        .delayElements(Duration.ofMillis(100)))
                        .doOnNext(System.out::println))
                .expectNextCount(6).verifyComplete();

合并

zip

zipWith

过滤

filter

同步

take

first

last

sample

skip

limitRequest

拆分

基于时间的操作

delayElement

timeout

doOn*

doOnNext

doOnError

doOnComplete

doOnSubscribe

doOnCancel

error-handling operators

timeout

onErrorReturn

onErrorResume

onErrorMap

doOnError

using

doFinally

retry

对于发生错误信号的流进行重试,retry对于上游Flux是采取的重订阅(re-subscribing)的方式,因此重试之后实际上是一个不同的序列了, 发出错误信号的序列仍然是终止了的。再一次从新订阅了原始的数据流。

1
2
3
4
5
Flux.range(1, 6)
    .map(i -> 10 / (3 - i))
    .retry(1)
    .subscribe(System.out::println, System.err::println);
Thread.sleep(100);  // 确保序列执行完

输入如下:

1
2
3
4
5
5
10
5
10
java.lang.ArithmeticException: / by zero

回到同步的世界

Flux#blockFirst

Flux#blockLast

Flux#toIterable

Flux#toStream

Mono#block

Mono#toFuture

调度器与线程模型

Scheduler

调度器,类似Executor

当前线程

可重用的单线程

弹性线程池

固定大小的线程池

Schedulers

调度器工厂类

publishOn和subscribeOn

一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器),比如Flux.interval默认使用固定大小线程池

Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:publishOnsubscribeOn。它们都接受一个 Scheduler作为参数,从而可以改变调度器。但是publishOn在链中出现的位置是有讲究的,而subscribeOn 则无所谓。

publishOn会影响链中其后的操作符。

subscribeOn无论出现在什么位置,会影响从源头开始的执行环境,直到被publishOn切换。

错误处理

背压(back pressure)

测试

StepVerifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Flux<Integer> generateFluxFrom1To6() {
        return Flux.just(1, 2, 3, 4, 5, 6);
    }
    private Mono<Integer> generateMonoWithError() {
        return Mono.error(new Exception("some error"));
    }
    @Test
    public void testViaStepVerifier() {
        StepVerifier.create(generateFluxFrom1To6())
                .expectNext(1, 2, 3, 4, 5, 6)
                .expectComplete()
                .verify();
        StepVerifier.create(generateMonoWithError())
                .expectErrorMessage("some error")
                .verify();
    }

PublisherProbe

TestPublisher

调试

1
2
// 开启全局操作debug
Hooks.onOperatorDebug();
1
2
3
4
5
6
// 开启特定检查点操作debug
checkpoint();
// 增加标识,同时会省略掉stack trace
checkpoint("description");
// 增加标识,且不省略stack trace
checkpoint("description", true);

参考

(4)Reactor 3快速上手——响应式Spring的道法术器

Reactor 3 Reference Guide