实践r2dbc-mysql

r2dbc-mysql

ReactorNettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
    public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
        requireNonNull(exchangeable, "exchangeable must not be null");

        return Mono.<Flux<T>>create(sink -> {
            if (!isConnected()) {
                exchangeable.subscribe(request -> {
                    if (request instanceof Disposable) {
                        ((Disposable) request).dispose();
                    }
                }, e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST));
                sink.error(ClientExceptions.exchangeClosed());
                return;
            }

            Flux<T> responses = responseProcessor
                .doOnSubscribe(ignored -> exchangeable.subscribe(this::emitNextRequest,
                    e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)))
                .handle(exchangeable)
                .doOnTerminate(() -> {
                    exchangeable.dispose();
                    requestQueue.run();
                });
// 放入队列
            requestQueue.submit(RequestTask.wrap(exchangeable, sink, OperatorUtils.discardOnCancel(responses)
                .doOnDiscard(ReferenceCounted.class, RELEASE)
                .doOnCancel(exchangeable::dispose)));
        }).flatMapMany(identity());
    }

RequestTask.java

1
2
3
4
// 应答回来后发送响应数据
void run() {
        sink.success(supplier);
    }