

ReactiveX中转换操作时这样子描述的Transforming Observables : Operators that transform items that are emitted by an Observable,其含义就是将需要发射的Observable通过使用Transforming Operators这种操作相关的函数修饰符转换成真正需要发射的Observable


map:transform the items emitted by an Observable by applying a function to each item
map通过实现Function 接口中 apply方法将每一项所需要的发射Observable数据转化为相应实际所需的Observable

private fun operatorsMap() =
         Observable.just(10, 20, 30).map {
            (it * 2).toString()
        }.subscribe {
            logIMessage("operatorsMap", it)
 * 运行结果:
 * com.ypz.rxjavademo I/operatorsMap: 20
 * com.ypz.rxjavademo I/operatorsMap: 40
 * com.ypz.rxjavademo I/operatorsMap: 60
 * 源码如下:
 * */
 * Returns an Observable that applies a specified function to each item emitted 
 * by the source ObservableSource and emits the results of these function applications.
 * @param <R> the output type
 * @param mapper  a function to apply to each item emitted by the ObservableSource
 * @return an Observable that emits the items from the source ObservableSource, transformed by the specified function
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
 * A functional interface that takes a value and returns another value, 
 * possibly with a different type and allows throwing a checked exception.
 * @param <T> the input value type
 * @param <R> the output value type
public interface Function<T, R> {
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
    R apply(@NonNull T t) throws Exception;



flatMap:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

private fun operatorsFlatMap() {
    logIMessage("operators-flatMap", "直接转换")
    Observable.just(1, 2, 3).flatMap {
        Observable.just(it * 2 - 1)
    }.subscribe {
        logIMessage("operators-flatMap", "直接转换value:$it")
    logIMessage("operators-flatMap", "直接转换加函数变换")
    val function = Function<Int, Observable<Int>> { value ->
        logIMessage("operators-flatMap", "需要转换的Value:$value")
        Observable.range(value * 10, 2)
    val biFunction = BiFunction<Int, Int, Int> { initValue, changeValue ->
        logIMessage("operators-flatMap", "转换前数值:$initValue")
        logIMessage("operators-flatMap", "转换后数值:$changeValue")
        initValue + changeValue
    just(1, 2, 3).observeOn(Schedulers.io())
    		 .flatMap(function, biFunction)
    		 .subscribe {logIMessage("operators-flatMap", "直接转换加函数变换value:$it")}
 *  运行结果:
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:1
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:3
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:5
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:10
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:11
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:11
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:12
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:20
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:22
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:21
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:23
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:30
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:33
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:31
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:34
 * 源码如下:
 * */

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
public interface Function<T, R> {
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
    R apply(@NonNull T t) throws Exception;

public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        BiFunction<? super T, ? super U, ? extends R> resultSelector) {
    return flatMap(mapper, resultSelector, false, bufferSize(), bufferSize());
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <R> the result type
public interface BiFunction<T1, T2, R> {

     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result value
     * @throws Exception on error
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;

重点探讨第二种直接转换加函数变换的写法:结合示例代码和源码可以看到直接转换的核心还是通过Function这一个思维去实现的,而转换以及函数变换主用场景运用于:当业务场景不仅仅需要需要直接将元数据直接转换,并且在转换后准备发射数据的过程中,需要将元数据以及转换数据进行一个比对或者在进行一个函数操作来保证最终所需转换的发射数据是符合业务场景。从源码中可以得出函数变换是通过对BiFunction实现去达到结果,注意在函数变换中如果变换不符合正常代码运行的业务逻辑则会抛出异常终止整个事件流数据的发射,需要谨记在BiFunctionT1 T2 R分别代表转换前元数据、转换后元数据、发射最终所需的转换数据类型。



private fun operatorsConcatMap() =
        just(1, 2, 3).concatMap {
            Observable.range(it * 10, 2)
        }.subscribe {
            logIMessage("operators-concatMap", "value$it")
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-concatMap: value10
 * com.ypz.rxjavademo I/operators-concatMap: value11
 * com.ypz.rxjavademo I/operators-concatMap: value20
 * com.ypz.rxjavademo I/operators-concatMap: value21
 * com.ypz.rxjavademo I/operators-concatMap: value30
 * com.ypz.rxjavademo I/operators-concatMap: value31
 * 源码如下
 * */
 * Returns a new Observable that emits items resulting from applying a function that you supply to each item
 * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
 * that result from concatenating those resulting ObservableSources.
 * @param <R> the type of the inner ObservableSource sources and thus the output type
 * @param mapper
 *            a function that, when applied to an item emitted by the source ObservableSource, returns an
 *            ObservableSource
 * @return an Observable that emits the result of applying the transformation function to each item emitted
 *         by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);

 * Returns a new Observable that emits items resulting from applying a function that you supply to each item
 * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
 * that result from concatenating those resulting ObservableSources.
 * @param <R> the type of the inner ObservableSource sources and thus the output type
 * @param mapper
 *            a function that, when applied to an item emitted by the source ObservableSource, returns an
 *            ObservableSource
 * @param prefetch
 *            the number of elements to prefetch from the current Observable
 * @return an Observable that emits the result of applying the transformation function to each item emitted
 *         by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    if (this instanceof ScalarCallable) {
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        return ObservableScalarXMap.scalarXMap(v, mapper);
    return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));


concatMap VS flatMap

在这里引用一篇RxJava Observable tranformation: concatMap() vs flatMap()中写道的描述:
The flatMap() method creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.
Note that flatMap() may interleave the items emitted by the Observables that result from transforming the items emitted by the source Observable.
If it is important that these items not be interleaved, you can instead use the similar concatMap() method.
As you can see, the two functions are very similar and the subtle difference is how the output is created (after the mapping function is applied . flatMap() uses MERGE operator while concatMap() uses CONCAT operator meaning that the last one cares about the order of the elements, so keep an eye on that if you need ordering ????.



cast操作符:Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified

private fun operatorsCast() = Observable.just(1, 2, "string")
                    logIMessage("operators-cast", "value$it")
                    logIMessage("operators-cast", "errorMessage:${it.message
                            ?: "unknown Error Message"}")
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-cast: value1
 * com.ypz.rxjavademo I/operators-cast: value2
 * com.ypz.rxjavademo I/operators-cast: errorMessage:Cannot cast java.lang.String to java.lang.Integer
 * 源码如下:
 * */
 * Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified
 * type.
 * @param <U> the output value type cast to
 * @param clazz
 *            the target class type that {@code cast} will cast the items emitted by the source ObservableSource
 *            into before emitting them from the resulting ObservableSource
 * @return an Observable that emits each item from the source ObservableSource after converting it to the
 *         specified type
 * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
public final <U> Observable<U> cast(final Class<U> clazz) {
    ObjectHelper.requireNonNull(clazz, "clazz is null");
    return map(Functions.castFunction(clazz));
 * Returns a function that cast the incoming values via a Class object.
 * @param <T> the input value type
 * @param <U> the output and target type
 * @param target the target class
 * @return the new Function instance
public static <T, U> Function<T, U> castFunction(Class<U> target) {
    return new CastToClass<T, U>(target);

static final class ArrayListCapacityCallable<T> implements Callable<List<T>> {
    final int capacity;

    ArrayListCapacityCallable(int capacity) {
        this.capacity = capacity;

    public List<T> call() throws Exception {
        return new ArrayList<T>(capacity);



groupBy: divide an Observable into a set of Observables that each emit a different subset of items from the original Observable。

    private fun operatorsGroupBy() =
            range(1, 10).groupBy(object : Function<Int, String> {
                override fun apply(t: Int): String {
                    return if (t%2==0) "偶数" else "奇数"
            }).subscribe {
                logIMessage("operators-groupBy", "分组key:${it.key ?: "key is null"}")
                if (it.key.equals("奇数", true)) {
                    it.subscribe {
                        logIMessage("operators-groupBy", "奇数$it")
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-groupBy: 分组key:奇数
 * com.ypz.rxjavademo I/operators-groupBy: 奇数1
 * com.ypz.rxjavademo I/operators-groupBy: 分组key:偶数
 * com.ypz.rxjavademo I/operators-groupBy: 奇数3
 * om.ypz.rxjavademo I/operators-groupBy: 奇数5
 * com.ypz.rxjavademo I/operators-groupBy: 奇数7
 * com.ypz.rxjavademo I/operators-groupBy: 奇数9
 * */
 * 源码如下:
 * */
     * Groups the items emitted by an {@code ObservableSource} according to a specified criterion, and emits these
     * grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservableSource} allows only a single
     * {@link Observer} during its lifetime and if this {@code Observer} calls dispose() before the
     * source terminates, the next emission by the source having the same key will trigger a new
     * {@code GroupedObservableSource} emission.
     * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
     * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
     * {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
     * discard their buffers by applying an operator like {@link #ignoreElements} to them.
     * @param keySelector
     *            a function that extracts the key for each item
     * @param <K>
     *            the key type
     * @return an {@code ObservableSource} that emits {@link GroupedObservable}s, each of which corresponds to a
     *         unique key value and each of which emits those items from the source ObservableSource that share that
     *         key value
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) {
        return groupBy(keySelector, (Function)Functions.identity(), false, bufferSize());
 * An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}.
 * <p>
 * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
 * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
 * {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they
 * may discard their buffers by applying an operator like {@link Observable#take take}{@code (0)} to them.
 * @param <K>
 *            the type of the key
 * @param <T>
 *            the type of the items emitted by the {@code GroupedObservable}
 * @see Observable#groupBy(io.reactivex.functions.Function)
public abstract class GroupedObservable<K, T> extends Observable<T> {

    final K key;

     * Constructs a GroupedObservable with the given key.
     * @param key the key
    protected GroupedObservable(@Nullable K key) {
        this.key = key;

     * Returns the key that identifies the group of items emitted by this {@code GroupedObservable}.
     * @return the key that the items emitted by this {@code GroupedObservable} were grouped by
    public K getKey() {
        return key;



window操作符:periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time。

window count(设置window size)

window count的作用是设置窗口的大小,每一个窗口都有一个固定的大小容纳一定量的Observables

    private fun operatorsWindowCount() {
        var time = 0
        val tag = "operators-WindowCount"
        Observable.range(1, 10).window(2).subscribe(
                object : Observer<Observable<Int>> {
                    override fun onComplete() {  logIMessage(tag, "onComplete")}
                    override fun onSubscribe(d: Disposable) {}
                    override fun onNext(t: Observable<Int>) {
                        logIMessage(tag, "onNext")
                        time += 1
                        if (time % 2 == 0) Thread.sleep(3000)
                        t.subscribe {
                            logIMessage("operators-WindowCount", "onNextValue$it")
                        } }
                    override fun onError(e: Throwable) {}
     * 运行结果如下
     * com.ypz.rxjavademo I/operators-WindowCount: onNext
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue1
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue2
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue3
     * com.ypz.rxjavademo I/operators-WindowCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue5
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue6
     *  com.ypz.rxjavademo I/operators-WindowCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue7
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue8
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue9
     *  com.ypz.rxjavademo I/operators-WindowCount: onComplete
     * 相关源码如下:
     * */
     * Returns an Observable that emits windows of items it collects from the source ObservableSource. The resulting
     * ObservableSource emits windows every {@code skip} items, each containing no more than {@code count} items. When
     * the source ObservableSource completes or encounters an error, the resulting ObservableSource emits the current window
     * and propagates the notification from the source ObservableSource.
     * @param count
     *            the maximum size of each window before it should be emitted
     * @param skip
     *            how many items need to be skipped before starting a new window. Note that if {@code skip} and
     *            {@code count} are equal this is the same operation as {@link #window(long)}.
     * @return an Observable that emits windows every {@code skip} items containing at most {@code count} items
     *         from the source ObservableSource
     * @throws IllegalArgumentException if either count or skip is non-positive
    public final Observable<Observable<T>> window(long count, long skip) {
        return window(count, skip, bufferSize());
    public final Observable<Observable<T>> window(long count, long skip) {
        return window(count, skip, bufferSize());
    public final Observable<Observable<T>> window(long count, long skip, int bufferSize) {
        ObjectHelper.verifyPositive(count, "count");
        ObjectHelper.verifyPositive(skip, "skip");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableWindow<T>(this, count, skip, bufferSize));

结合运行效果以及源码可以得出:window count的用法中还有一个skip参数,关于count和skip的对比关系可以参照buffer中的count以及skip的对比,关系原理是一致。它是决定是否完整的截取数据组装window还是重复或者跳过丢失数据的方式组装window,其中最为主要的ObservableWindow这个类实现了如何组装数据的功能以及决定如何发射数据功能。

window timeSpan 定时收集数据功能

window timeSpan原理图如下:

    private fun operatorsWindowTimespan() {
        val logTag = "operators-WindowTimespan"
                .interval(1, TimeUnit.SECONDS)
                .window(3, TimeUnit.SECONDS)
    private fun getBaseWindowTimeObserver(tag: String): Observer<Observable<Long>> =
            object : Observer<Observable<Long>> {
                override fun onComplete() { logIMessage(tag, "onComplete") }

                override fun onSubscribe(d: Disposable) {}

                override fun onNext(t: Observable<Long>) {
                    logIMessage(tag, "onNext")
                    t.subscribe { logIMessage(tag, "onNextValue$it") }

                override fun onError(e: Throwable) {}
     * 运行效果如下:
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue0
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue1
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue2
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * om.ypz.rxjavademo I/operators-WindowTimespan: onNextValue3
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue5
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue6
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue7
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue8
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue9
     * com.ypz.rxjavademo I/operators-WindowTimespan: onComplete
     * 相关源码:
     * */
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
        return window(timespan, unit, Schedulers.computation(), Long.MAX_VALUE, false);
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            Scheduler scheduler, long count, boolean restart) {
        return window(timespan, unit, scheduler, count, restart, bufferSize());
    public final Observable<Observable<T>> window(
            long timespan, TimeUnit unit, Scheduler scheduler,
            long count, boolean restart, int bufferSize) {
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.verifyPositive(count, "count");
        return RxJavaPlugins.onAssembly(new ObservableWindowTimed<T>(this, timespan, timespan, unit, scheduler, count, bufferSize, restart));


timeSpan 以及count的配合使用


    private fun operatorsWindowTimesCompeteCount() {
        val tag = "operators-WindowTimesCompeteCount"
                .range(1, 6)
                .window(4, TimeUnit.SECONDS,3)
                .subscribe(object : Observer<Observable<Int>> {
                    override fun onComplete() {
                        logIMessage(tag, "onComplete")

                    override fun onSubscribe(d: Disposable) {}

                    override fun onNext(t: Observable<Int>) {
                        logIMessage(tag, "onNext")
                        t.subscribe { logIMessage(tag, "onNextValue$it") }

                    override fun onError(e: Throwable) {}
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue1
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue2
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue3
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue5
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue6
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onComplete
     * 相关源码如下
     * */
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            long count) {
        return window(timespan, unit, Schedulers.computation(), count, false);
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            Scheduler scheduler, long count, boolean restart) {
        return window(timespan, unit, scheduler, count, restart, bufferSize());
    public final Observable<Observable<T>> window(
            long timespan, TimeUnit unit, Scheduler scheduler,
            long count, boolean restart, int bufferSize) {
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.verifyPositive(count, "count");
        return RxJavaPlugins.onAssembly(
        		new ObservableWindowTimed<T>(
        			this, timespan, timespan, unit, scheduler, count, bufferSize, restart));



buffer操作符:periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time


    private fun getBaseOperatorsBufferObserver(): Observer<List<Int>> =
            object : Observer<List<Int>> {
                override fun onComplete() {


                override fun onSubscribe(d: Disposable) {


                override fun onNext(t: List<Int>) {
                    logIMessage("BaseOperatorsBufferObserver", "forEachValue$t")


                override fun onError(e: Throwable) {

    private fun operatorsBuffer() =
                    .range(1, 10).buffer(2)
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */
     * 源码如下:
     * */
     * @param count
     *            the maximum number of items in each buffer before it should be emitted
     * @return an Observable that emits connected, non-overlapping buffers, each containing at most
     *         {@code count} items from the source ObservableSource
     * @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
    public final Observable<List<T>> buffer(int count) {
        return buffer(count, count);
     * @param count
     *            the maximum size of each buffer before it should be emitted
     * @param skip
     *            how many items emitted by the source ObservableSource should be skipped before starting a new
     *            buffer. Note that when {@code skip} and {@code count} are equal, this is the same operation as
     *            {@link #buffer(int)}.
     * @return an Observable that emits buffers for every {@code skip} item from the source ObservableSource and
     *         containing at most {@code count} items
     * @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
    public final Observable<List<T>> buffer(int count, int skip) {
        return buffer(count, skip, ArrayListSupplier.<T>asCallable());
    public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
        ObjectHelper.verifyPositive(count, "count");
        ObjectHelper.verifyPositive(skip, "skip");
        ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
        return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));





    private fun operatorsBuffer_skipComparativeCount(count: Int, skip: Int) =
                    .range(1, 6)
                    .buffer(count, skip)



operatorsBuffer_skipComparativeCount(2, 2)
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */



operatorsBuffer_skipComparativeCount(3, 2)
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2, 3]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4, 5]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */




operatorsBuffer_skipComparativeCount(2, 3)
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[4, 5]
     * */
 operatorsBuffer_skipComparativeCount(1, 2)
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5]
     * */

每一次移动的角标可以:(m-1)*skip = n 其中m代表移动次数n为移动角标


scan操作符:apply a function to each item emitted by an Observable, sequentially, and emit each successive value。

    private fun operatorsScan() {
        var message = "从0加到1是:"
        Observable.range(1, 10).scan { t1: Int, t2: Int ->
            message = "从0加到${t2}是:"
        }.subscribe {
            logIMessage("operatorsScan", "$message$it")
     * 运行结果如下
     * com.ypz.rxjavademo I/operatorsScan: 从0加到1是:1
     * com.ypz.rxjavademo I/operatorsScan: 从0加到2是:3
     * com.ypz.rxjavademo I/operatorsScan: 从0加到3是:6
     * com.ypz.rxjavademo I/operatorsScan: 从0加到4是:10
     * com.ypz.rxjavademo I/operatorsScan: 从0加到5是:15
     * com.ypz.rxjavademo I/operatorsScan: 从0加到6是:21
     * com.ypz.rxjavademo I/operatorsScan: 从0加到7是:28
     * com.ypz.rxjavademo I/operatorsScan: 从0加到8是:36
     * com.ypz.rxjavademo I/operatorsScan: 从0加到9是:45
     * com.ypz.rxjavademo I/operatorsScan: 从0加到10是:55
     * 源码如下:
     * */
     * Returns an Observable that applies a specified accumulator function to the first 
     * item emitted by a source ObservableSource, 
     * then feeds the result of that function along with the second item emitted by the source
     * ObservableSource into the same function, and so on 
     * until all items have been emitted by the source ObservableSource,
     * emitting the result of each of these iterations.
     * @param accumulator
     *            an accumulator function to be invoked on each item emitted 
     *            by the source ObservableSource, whose result will be emitted 
     *            to {@link Observer}s via {@link Observer#onNext onNext} 
     *            and used in the next accumulator call
     * @return an Observable that emits the results of each call to the accumulator function
    public final Observable<T> scan(BiFunction<T, T, T> accumulator) {
        ObjectHelper.requireNonNull(accumulator, "accumulator is null");
        return RxJavaPlugins.onAssembly(new ObservableScan<T>(this, accumulator));
public final class ObservableScan<T> extends AbstractObservableWithUpstream<T, T> {
    final BiFunction<T, T, T> accumulator;
    public ObservableScan(ObservableSource<T> source, BiFunction<T, T, T> accumulator) {
        this.accumulator = accumulator;

    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new ScanObserver<T>(t, accumulator));

    static final class ScanObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        final BiFunction<T, T, T> accumulator;

        Disposable upstream;

        T value;

        boolean done;

        ScanObserver(Observer<? super T> actual, BiFunction<T, T, T> accumulator) {
            this.downstream = actual;
            this.accumulator = accumulator;

        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;


