packagereactor.core.publisher;importjava.util.Objects;importjava.util.function.BiFunction;importjava.util.function.Function;importorg.reactivestreams.Publisher;importreactor.core.CoreSubscriber;importreactor.core.Scannable;importreactor.util.annotation.Nullable;/**
* A decorating {@link Flux} {@link Publisher} that exposes {@link Flux} API over an
* arbitrary {@link Publisher}. Useful to create operators which return a {@link Flux}.
*
* @param <I> delegate {@link Publisher} type
* @param <O> produced type
*/publicabstractclassFluxOperator<I,O>extendsFlux<O>implementsScannable{protectedfinalFlux<?extendsI>source;/**
* Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher}
*
* @param source the {@link Publisher} to decorate
*/protectedFluxOperator(Flux<?extendsI>source){this.source=Objects.requireNonNull(source);}@Override@NullablepublicObjectscanUnsafe(Attrkey){if(key==Attr.PREFETCH)returngetPrefetch();if(key==Attr.PARENT)returnsource;returnnull;}}
packagereactor.core.publisher;importorg.reactivestreams.Publisher;importreactor.core.CorePublisher;importreactor.core.CoreSubscriber;importreactor.core.Scannable;importreactor.util.annotation.Nullable;abstractclassInternalFluxOperator<I,O>extendsFluxOperator<I,O>implementsScannable,OptimizableOperator<O,I>{@NullablefinalOptimizableOperator<?,I>optimizableOperator;/**
* Build a {@link InternalFluxOperator} wrapper around the passed parent {@link Publisher}
*
* @param source the {@link Publisher} to decorate
*/protectedInternalFluxOperator(Flux<?extendsI>source){super(source);this.optimizableOperator=sourceinstanceofOptimizableOperator?(OptimizableOperator)source:null;}@Override@SuppressWarnings("unchecked")publicfinalvoidsubscribe(CoreSubscriber<?superO>subscriber){OptimizableOperatoroperator=this;while(true){subscriber=operator.subscribeOrReturn(subscriber);if(subscriber==null){// null means "I will subscribe myself", returning...return;}OptimizableOperatornewSource=operator.nextOptimizableSource();if(newSource==null){operator.source().subscribe(subscriber);return;}operator=newSource;}}@NullablepublicabstractCoreSubscriber<?superI>subscribeOrReturn(CoreSubscriber<?superO>actual);@OverridepublicfinalCorePublisher<?extendsI>source(){returnsource;}@OverridepublicfinalOptimizableOperator<?,?extendsI>nextOptimizableSource(){returnoptimizableOperator;}@Override@NullablepublicObjectscanUnsafe(Attrkey){if(key==Attr.PREFETCH)returngetPrefetch();if(key==Attr.PARENT)returnsource;returnnull;}}
packagereactor.core.publisher;importjava.util.Objects;importjava.util.function.Function;importorg.reactivestreams.Subscription;importreactor.core.CorePublisher;importreactor.core.CoreSubscriber;importreactor.core.Fuseable;importreactor.util.annotation.Nullable;/**
* Maps the values of the source publisher one-on-one via a mapper function.
*
* @param <T> the source value type
* @param <R> the result value type
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/finalclassFluxMap<T,R>extendsInternalFluxOperator<T,R>{finalFunction<?superT,?extendsR>mapper;/**
* Constructs a FluxMap instance with the given source and mapper.
*
* @param source the source Publisher instance
* @param mapper the mapper function
*
* @throws NullPointerException if either {@code source} or {@code mapper} is null.
*/FluxMap(Flux<?extendsT>source,Function<?superT,?extendsR>mapper){super(source);this.mapper=Objects.requireNonNull(mapper,"mapper");}@Override@SuppressWarnings("unchecked")publicCoreSubscriber<?superT>subscribeOrReturn(CoreSubscriber<?superR>actual){if(actualinstanceofFuseable.ConditionalSubscriber){Fuseable.ConditionalSubscriber<?superR>cs=(Fuseable.ConditionalSubscriber<?superR>)actual;returnnewMapConditionalSubscriber<>(cs,mapper);}returnnewMapSubscriber<>(actual,mapper);}staticfinalclassMapSubscriber<T,R>implementsInnerOperator<T,R>{finalCoreSubscriber<?superR>actual;finalFunction<?superT,?extendsR>mapper;booleandone;Subscriptions;MapSubscriber(CoreSubscriber<?superR>actual,Function<?superT,?extendsR>mapper){this.actual=actual;this.mapper=mapper;}@OverridepublicvoidonSubscribe(Subscriptions){if(Operators.validate(this.s,s)){this.s=s;actual.onSubscribe(this);}}@OverridepublicvoidonNext(Tt){if(done){Operators.onNextDropped(t,actual.currentContext());return;}Rv;try{v=Objects.requireNonNull(mapper.apply(t),"The mapper returned a null value.");}catch(Throwablee){Throwablee_=Operators.onNextError(t,e,actual.currentContext(),s);if(e_!=null){onError(e_);}else{s.request(1);}return;}actual.onNext(v);}@OverridepublicvoidonError(Throwablet){if(done){Operators.onErrorDropped(t,actual.currentContext());return;}done=true;actual.onError(t);}@OverridepublicvoidonComplete(){if(done){return;}done=true;actual.onComplete();}@Override@NullablepublicObjectscanUnsafe(Attrkey){if(key==Attr.PARENT)returns;if(key==Attr.TERMINATED)returndone;returnInnerOperator.super.scanUnsafe(key);}@OverridepublicCoreSubscriber<?superR>actual(){returnactual;}@Overridepublicvoidrequest(longn){s.request(n);}@Overridepublicvoidcancel(){s.cancel();}}staticfinalclassMapConditionalSubscriber<T,R>implementsFuseable.ConditionalSubscriber<T>,InnerOperator<T,R>{finalFuseable.ConditionalSubscriber<?superR>actual;finalFunction<?superT,?extendsR>mapper;booleandone;Subscriptions;MapConditionalSubscriber(Fuseable.ConditionalSubscriber<?superR>actual,Function<?superT,?extendsR>mapper){this.actual=actual;this.mapper=mapper;}@OverridepublicvoidonSubscribe(Subscriptions){if(Operators.validate(this.s,s)){this.s=s;actual.onSubscribe(this);}}@OverridepublicvoidonNext(Tt){if(done){Operators.onNextDropped(t,actual.currentContext());return;}Rv;try{v=Objects.requireNonNull(mapper.apply(t),"The mapper returned a null value.");}catch(Throwablee){Throwablee_=Operators.onNextError(t,e,actual.currentContext(),s);if(e_!=null){onError(e_);}else{s.request(1);}return;}actual.onNext(v);}@OverridepublicbooleantryOnNext(Tt){if(done){Operators.onNextDropped(t,actual.currentContext());returntrue;}Rv;try{v=Objects.requireNonNull(mapper.apply(t),"The mapper returned a null value.");returnactual.tryOnNext(v);}catch(Throwablee){Throwablee_=Operators.onNextError(t,e,actual.currentContext(),s);if(e_!=null){done=true;actual.onError(e_);returntrue;}else{returnfalse;}}}@OverridepublicvoidonError(Throwablet){if(done){Operators.onErrorDropped(t,actual.currentContext());return;}done=true;actual.onError(t);}@OverridepublicvoidonComplete(){if(done){return;}done=true;actual.onComplete();}@Override@NullablepublicObjectscanUnsafe(Attrkey){if(key==Attr.PARENT)returns;if(key==Attr.TERMINATED)returndone;returnInnerOperator.super.scanUnsafe(key);}@OverridepublicCoreSubscriber<?superR>actual(){returnactual;}@Overridepublicvoidrequest(longn){s.request(n);}@Overridepublicvoidcancel(){s.cancel();}}}
packagereactor.core.publisher;importorg.reactivestreams.Publisher;importreactor.core.CorePublisher;importreactor.core.CoreSubscriber;importreactor.core.Scannable;importreactor.util.annotation.Nullable;/**
* A decorating {@link Mono} {@link Publisher} that exposes {@link Mono} API over an
* arbitrary {@link Publisher} Useful to create operators which return a {@link Mono}.
*
* @param <I> delegate {@link Publisher} type
* @param <O> produced type
*/abstractclassInternalMonoOperator<I,O>extendsMonoOperator<I,O>implementsScannable,OptimizableOperator<O,I>{@NullablefinalOptimizableOperator<?,I>optimizableOperator;protectedInternalMonoOperator(Mono<?extendsI>source){super(source);this.optimizableOperator=sourceinstanceofOptimizableOperator?(OptimizableOperator)source:null;}@Override@SuppressWarnings("unchecked")publicfinalvoidsubscribe(CoreSubscriber<?superO>subscriber){OptimizableOperatoroperator=this;while(true){subscriber=operator.subscribeOrReturn(subscriber);if(subscriber==null){// null means "I will subscribe myself", returning...return;}OptimizableOperatornewSource=operator.nextOptimizableSource();if(newSource==null){operator.source().subscribe(subscriber);return;}operator=newSource;}}@NullablepublicabstractCoreSubscriber<?superI>subscribeOrReturn(CoreSubscriber<?superO>actual);@OverridepublicfinalCorePublisher<?extendsI>source(){returnsource;}@OverridepublicfinalOptimizableOperator<?,?extendsI>nextOptimizableSource(){returnoptimizableOperator;}}
packagereactor.core.publisher;importjava.util.Objects;importjava.util.function.Function;importreactor.core.CoreSubscriber;importreactor.core.Fuseable;/**
* Maps the values of the source publisher one-on-one via a mapper function.
*
* @param <T> the source value type
* @param <R> the result value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/finalclassMonoMap<T,R>extendsInternalMonoOperator<T,R>{finalFunction<?superT,?extendsR>mapper;/**
* Constructs a StreamMap instance with the given source and mapper.
*
* @param source the source Publisher instance
* @param mapper the mapper function
* @throws NullPointerException if either {@code source} or {@code mapper} is null.
*/MonoMap(Mono<?extendsT>source,Function<?superT,?extendsR>mapper){super(source);this.mapper=Objects.requireNonNull(mapper,"mapper");}@Override@SuppressWarnings("unchecked")publicCoreSubscriber<?superT>subscribeOrReturn(CoreSubscriber<?superR>actual){if(actualinstanceofFuseable.ConditionalSubscriber){Fuseable.ConditionalSubscriber<?superR>cs=(Fuseable.ConditionalSubscriber<?superR>)actual;returnnewFluxMap.MapConditionalSubscriber<>(cs,mapper);}returnnewFluxMap.MapSubscriber<>(actual,mapper);}}
InnerOperator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
packagereactor.core.publisher;importreactor.util.context.Context;/**
*
* @param <I> input operator consumed type
* @param <O> output operator produced type
*
* @author Stephane Maldini
*/interfaceInnerOperator<I,O>extendsInnerConsumer<I>,InnerProducer<O>{@OverridedefaultContextcurrentContext(){returnactual().currentContext();}}