@Overridepublic<T>Flux<T>exchange(FluxExchangeable<T>exchangeable){requireNonNull(exchangeable,"exchangeable must not be null");returnMono.<Flux<T>>create(sink->{if(!isConnected()){exchangeable.subscribe(request->{if(requestinstanceofDisposable){((Disposable)request).dispose();}},e->requests.emitError(e,Sinks.EmitFailureHandler.FAIL_FAST));sink.error(ClientExceptions.exchangeClosed());return;}Flux<T>responses=responseProcessor.doOnSubscribe(ignored->exchangeable.subscribe(this::emitNextRequest,e->requests.emitError(e,Sinks.EmitFailureHandler.FAIL_FAST))).handle(exchangeable).doOnTerminate(()->{exchangeable.dispose();requestQueue.run();});// 放入队列requestQueue.submit(RequestTask.wrap(exchangeable,sink,OperatorUtils.discardOnCancel(responses).doOnDiscard(ReferenceCounted.class,RELEASE).doOnCancel(exchangeable::dispose)));}).flatMapMany(identity());}