RxJava2极速入门——Rxjava操作符详解之转换操作符

RxJava操作符——转换操作符

ReactiveX中转换操作时这样子描述的Transforming Observables : Operators that transform items that are emitted by an Observable,其含义就是将需要发射的Observable通过使用Transforming Operators这种操作相关的函数修饰符转换成真正需要发射的Observable
转换操作符常见分类如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符

map操作符

map:transform the items emitted by an Observable by applying a function to each item
map通过实现Function 接口中 apply方法将每一项所需要的发射Observable数据转化为相应实际所需的Observable
原理作用图:
RxJava2极速入门——Rxjava操作符详解之转换操作符
示例代码以及相关源码如下

private fun operatorsMap() =
         Observable.just(10, 20, 30).map {
            (it * 2).toString()
        }.subscribe {
            logIMessage("operatorsMap", it)
        }
/**
 * 运行结果:
 * com.ypz.rxjavademo I/operatorsMap: 20
 * com.ypz.rxjavademo I/operatorsMap: 40
 * com.ypz.rxjavademo I/operatorsMap: 60
 * 源码如下:
 * */
 /**
 * Returns an Observable that applies a specified function to each item emitted 
 * by the source ObservableSource and emits the results of these function applications.
 * @param <R> the output type
 * @param mapper  a function to apply to each item emitted by the ObservableSource
 * @return an Observable that emits the items from the source ObservableSource, transformed by the specified function
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
/**
 * A functional interface that takes a value and returns another value, 
 * possibly with a different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

结合示例代码以及运行结果可以的出map操作符将每一个需要发射数据通过apply的实现将元数据进行一些加工操作,将其转换为其他数据。
注意:map操作符中的实现时,应该采用一套针对相应业务场景的数学转换规则避免产生不可期望结果以及转换异常

flatMap操作符

flatMap:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
flatMap作用:通过將每一个Observable变换为一组Observables,然后发射每一个Observable。
原理作用流程如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
结合上图以及作用解析不难看出,flatMap的转换作用是将一些元数据进行加工操作,由于是merge操作所以在并发上先后顺序可能不一致,转换方法以及先后顺序会结合示例代码以及源码一起分析,相关代码如下:

private fun operatorsFlatMap() {
    logIMessage("operators-flatMap", "直接转换")
    Observable.just(1, 2, 3).flatMap {
        Observable.just(it * 2 - 1)
    }.subscribe {
        logIMessage("operators-flatMap", "直接转换value:$it")
    }
    logIMessage("operators-flatMap", "直接转换加函数变换")
    val function = Function<Int, Observable<Int>> { value ->
        logIMessage("operators-flatMap", "需要转换的Value:$value")
        Observable.range(value * 10, 2)
    }
    val biFunction = BiFunction<Int, Int, Int> { initValue, changeValue ->
        logIMessage("operators-flatMap", "转换前数值:$initValue")
        logIMessage("operators-flatMap", "转换后数值:$changeValue")
        initValue + changeValue
    }
    just(1, 2, 3).observeOn(Schedulers.io())
    		 .subscribeOn(AndroidSchedulers.mainThread())
    		 .flatMap(function, biFunction)
    		 .subscribe {logIMessage("operators-flatMap", "直接转换加函数变换value:$it")}
}
/**
 *  运行结果:
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:1
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:3
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换value:5
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:10
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:11
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:1
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:11
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:12
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:20
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:22
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:2
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:21
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:23
 * com.ypz.rxjavademo I/operators-flatMap: 需要转换的Value:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:30
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:33
 * com.ypz.rxjavademo I/operators-flatMap: 转换前数值:3
 * com.ypz.rxjavademo I/operators-flatMap: 转换后数值:31
 * com.ypz.rxjavademo I/operators-flatMap: 直接转换加函数变换value:34
 * 源码如下:
 * */

edulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        BiFunction<? super T, ? super U, ? extends R> resultSelector) {
    return flatMap(mapper, resultSelector, false, bufferSize(), bufferSize());
}
/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <R> the result type
 */
public interface BiFunction<T1, T2, R> {

    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}

结合第一个直接转换的示例代码以及相关运行结果源码中可以看到,其转换的核心是通过实现Function这一接口思维来实现对数据的转换的操作。
重点探讨第二种直接转换加函数变换的写法:结合示例代码和源码可以看到直接转换的核心还是通过Function这一个思维去实现的,而转换以及函数变换主用场景运用于:当业务场景不仅仅需要需要直接将元数据直接转换,并且在转换后准备发射数据的过程中,需要将元数据以及转换数据进行一个比对或者在进行一个函数操作来保证最终所需转换的发射数据是符合业务场景。从源码中可以得出函数变换是通过对BiFunction实现去达到结果,注意在函数变换中如果变换不符合正常代码运行的业务逻辑则会抛出异常终止整个事件流数据的发射,需要谨记在BiFunctionT1 T2 R分别代表转换前元数据、转换后元数据、发射最终所需的转换数据类型。

concatMap操作符

concatMapflatMap类似,但是没有merge操作所以,顺序是能够保证一致也就是在并发线程中操作时线程是安全的。
concatMap中为了保证线程安全的操作采用了concat操作避免了merge操作导致的并发问题。
其原理作用流程如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
在其流程图中可以的出,采用了concat操作不管函数变换过程中发生了什么其数据发射都会依照原数据发射的顺序的流程来进行数据的发射。

private fun operatorsConcatMap() =
        just(1, 2, 3).concatMap {
            Observable.range(it * 10, 2)
        }.subscribe {
            logIMessage("operators-concatMap", "value$it")
        }
/**
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-concatMap: value10
 * com.ypz.rxjavademo I/operators-concatMap: value11
 * com.ypz.rxjavademo I/operators-concatMap: value20
 * com.ypz.rxjavademo I/operators-concatMap: value21
 * com.ypz.rxjavademo I/operators-concatMap: value30
 * com.ypz.rxjavademo I/operators-concatMap: value31
 * 源码如下
 * */
/**
 * Returns a new Observable that emits items resulting from applying a function that you supply to each item
 * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
 * that result from concatenating those resulting ObservableSources.
 * @param <R> the type of the inner ObservableSource sources and thus the output type
 * @param mapper
 *            a function that, when applied to an item emitted by the source ObservableSource, returns an
 *            ObservableSource
 * @return an Observable that emits the result of applying the transformation function to each item emitted
 *         by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

/**
 * Returns a new Observable that emits items resulting from applying a function that you supply to each item
 * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
 * that result from concatenating those resulting ObservableSources.
 * @param <R> the type of the inner ObservableSource sources and thus the output type
 * @param mapper
 *            a function that, when applied to an item emitted by the source ObservableSource, returns an
 *            ObservableSource
 * @param prefetch
 *            the number of elements to prefetch from the current Observable
 * @return an Observable that emits the result of applying the transformation function to each item emitted
 *         by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}

在示例代码以及运行结果可以出,concatMapFlatMap对发射的数据进行转换时候其核心还是Functionapply的实现,在concatMap能够实现线程安全这一方法其实得益于ObservableScalarXMap.scalarXMap的实现。其通过实现核心静态类ScalarXMapObservablesubscribeActual方法去保证线程的安全。

concatMap VS flatMap

在这里引用一篇RxJava Observable tranformation: concatMap() vs flatMap()中写道的描述:
The flatMap() method creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.
Note that flatMap() may interleave the items emitted by the Observables that result from transforming the items emitted by the source Observable.
If it is important that these items not be interleaved, you can instead use the similar concatMap() method.
As you can see, the two functions are very similar and the subtle difference is how the output is created (after the mapping function is applied . flatMap() uses MERGE operator while concatMap() uses CONCAT operator meaning that the last one cares about the order of the elements, so keep an eye on that if you need ordering ????.

在其描述中可以的出:
flatMap是通过实现其apply为原始Observable中每一个需要发射的数据,通过apply进行函数变换并返回一个可发射的Observable,然后通过merge操作将函数变换所得的Observable合并在一起,然后发送这些结果。
flatMap由于牵涉到merge操作可能会导致Observable发射交错,如果不想Observable交错可以使用concatMap避免数据错误。因为concatMap使用的是concat操作只关心函数变换后的Observable顺序发射
以下结合merge以及concat原理图对比:
RxJava2极速入门——Rxjava操作符详解之转换操作符
RxJava2极速入门——Rxjava操作符详解之转换操作符
结合上述两图其实就可以很清晰的得出为什么concat能够保证线程安全merge不能保证线程安全了。

cast操作符

cast操作符:Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified
type.在发射之前强制将Observable发射的所有数据转换为指定类型。
原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
示例代码以及相关部分源码如下:

private fun operatorsCast() = Observable.just(1, 2, "string")
        .cast(Integer::class.java)//订阅之后才能发起强转
        .subscribe(
                {
                    logIMessage("operators-cast", "value$it")
                },
                {
                    logIMessage("operators-cast", "errorMessage:${it.message
                            ?: "unknown Error Message"}")
                })
/**
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-cast: value1
 * com.ypz.rxjavademo I/operators-cast: value2
 * com.ypz.rxjavademo I/operators-cast: errorMessage:Cannot cast java.lang.String to java.lang.Integer
 * 源码如下:
 * */
/**
 * Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified
 * type.
 * @param <U> the output value type cast to
 * @param clazz
 *            the target class type that {@code cast} will cast the items emitted by the source ObservableSource
 *            into before emitting them from the resulting ObservableSource
 * @return an Observable that emits each item from the source ObservableSource after converting it to the
 *         specified type
 * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> cast(final Class<U> clazz) {
    ObjectHelper.requireNonNull(clazz, "clazz is null");
    return map(Functions.castFunction(clazz));
}
/**
 * Returns a function that cast the incoming values via a Class object.
 * @param <T> the input value type
 * @param <U> the output and target type
 * @param target the target class
 * @return the new Function instance
 */
public static <T, U> Function<T, U> castFunction(Class<U> target) {
    return new CastToClass<T, U>(target);
}

static final class ArrayListCapacityCallable<T> implements Callable<List<T>> {
    final int capacity;

    ArrayListCapacityCallable(int capacity) {
        this.capacity = capacity;
    }

    @Override
    public List<T> call() throws Exception {
        return new ArrayList<T>(capacity);
    }
}

在示例代码运行结果以及相关源码中可以看到,cast操作符通过castFunction的实现强制将发射的数据转换为实际所需的数据的类型,但是会出现ClassCastException这一异常,所以在使用cast操作符时候要确保数据转换是没有问题的最好也把onError给实现了。如果不实现onError则会运行过程中很大程度上会出现RuntimeException原因在于类型转换异常的时候onError事件并没有实现所导致的。

groupBy操作符

groupBy: divide an Observable into a set of Observables that each emit a different subset of items from the original Observable。
从官网的描述中可以得出,将原来的Observable看成是一个集合,通过groupBy的实现将这个集合分割成不同的子集。
原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
从图片中得出groupBy作用就是将Observable转换成不同分组的Observables
示例代码以及相关源码如下:

    private fun operatorsGroupBy() =
            range(1, 10).groupBy(object : Function<Int, String> {
                override fun apply(t: Int): String {
                    return if (t%2==0) "偶数" else "奇数"
                }
            }).subscribe {
                logIMessage("operators-groupBy", "分组key:${it.key ?: "key is null"}")
                if (it.key.equals("奇数", true)) {
                    it.subscribe {
                        logIMessage("operators-groupBy", "奇数$it")
                    }
                }
            }
/**
 * 运行结果如下:
 * com.ypz.rxjavademo I/operators-groupBy: 分组key:奇数
 * com.ypz.rxjavademo I/operators-groupBy: 奇数1
 * com.ypz.rxjavademo I/operators-groupBy: 分组key:偶数
 * com.ypz.rxjavademo I/operators-groupBy: 奇数3
 * om.ypz.rxjavademo I/operators-groupBy: 奇数5
 * com.ypz.rxjavademo I/operators-groupBy: 奇数7
 * com.ypz.rxjavademo I/operators-groupBy: 奇数9
 * */
/**
 * 源码如下:
 * */
    /**
     * Groups the items emitted by an {@code ObservableSource} according to a specified criterion, and emits these
     * grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservableSource} allows only a single
     * {@link Observer} during its lifetime and if this {@code Observer} calls dispose() before the
     * source terminates, the next emission by the source having the same key will trigger a new
     * {@code GroupedObservableSource} emission.
     * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
     * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
     * {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
     * discard their buffers by applying an operator like {@link #ignoreElements} to them.
     *
     * @param keySelector
     *            a function that extracts the key for each item
     * @param <K>
     *            the key type
     * @return an {@code ObservableSource} that emits {@link GroupedObservable}s, each of which corresponds to a
     *         unique key value and each of which emits those items from the source ObservableSource that share that
     *         key value
     */
    @SuppressWarnings({ "unchecked", "rawtypes" })
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) {
        return groupBy(keySelector, (Function)Functions.identity(), false, bufferSize());
    }
    /**
 * An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}.
 * <p>
 * <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
 * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
 * {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they
 * may discard their buffers by applying an operator like {@link Observable#take take}{@code (0)} to them.
 *
 * @param <K>
 *            the type of the key
 * @param <T>
 *            the type of the items emitted by the {@code GroupedObservable}
 * @see Observable#groupBy(io.reactivex.functions.Function)
 */
public abstract class GroupedObservable<K, T> extends Observable<T> {

    final K key;

    /**
     * Constructs a GroupedObservable with the given key.
     * @param key the key
     */
    protected GroupedObservable(@Nullable K key) {
        this.key = key;
    }

    /**
     * Returns the key that identifies the group of items emitted by this {@code GroupedObservable}.
     *
     * @return the key that the items emitted by this {@code GroupedObservable} were grouped by
     */
    @Nullable
    public K getKey() {
        return key;
    }
}

结合源码、示例代码以及其运行结果可以看出在示例代码中使用对[1,10]区间的元素进行奇偶数区分,注意其执行顺序,每个分组回调仅仅回调一次,分组回调顺序与元数据发射顺序有关,各分组第一个发射数据在元数据数据排序有关,谁先则哪个分组先回调。源码通过对Functtion的实现进行分组,Function实现返回值作为分组的key,以key作为唯一标识进行分组。

window操作符

window操作符:periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time。
每隔一段时间,将这段时间内观察到的Observable组装到一个窗口,发射这些窗口。
原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符

window count(设置window size)

window count的作用是设置窗口的大小,每一个窗口都有一个固定的大小容纳一定量的Observables
其原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
结合上图的所写的示例代码以及相关源码:

    private fun operatorsWindowCount() {
        var time = 0
        val tag = "operators-WindowCount"
        Observable.range(1, 10).window(2).subscribe(
                object : Observer<Observable<Int>> {
                    override fun onComplete() {  logIMessage(tag, "onComplete")}
                    override fun onSubscribe(d: Disposable) {}
                    override fun onNext(t: Observable<Int>) {
                        logIMessage(tag, "onNext")
                        time += 1
                        if (time % 2 == 0) Thread.sleep(3000)
                        t.subscribe {
                            logIMessage("operators-WindowCount", "onNextValue$it")
                        } }
                    override fun onError(e: Throwable) {}
                })
    }
    /**
     * 运行结果如下
     * com.ypz.rxjavademo I/operators-WindowCount: onNext
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue1
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue2
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue3
     * com.ypz.rxjavademo I/operators-WindowCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue5
     * com.ypz.rxjavademo I/operators-WindowCount: onNextValue6
     *  com.ypz.rxjavademo I/operators-WindowCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue7
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue8
     *  com.ypz.rxjavademo I/operators-WindowCount: onNextValue9
     *  com.ypz.rxjavademo I/operators-WindowCount: onComplete
     * 相关源码如下:
     * */
       /**
     * Returns an Observable that emits windows of items it collects from the source ObservableSource. The resulting
     * ObservableSource emits windows every {@code skip} items, each containing no more than {@code count} items. When
     * the source ObservableSource completes or encounters an error, the resulting ObservableSource emits the current window
     * and propagates the notification from the source ObservableSource.
     * @param count
     *            the maximum size of each window before it should be emitted
     * @param skip
     *            how many items need to be skipped before starting a new window. Note that if {@code skip} and
     *            {@code count} are equal this is the same operation as {@link #window(long)}.
     * @return an Observable that emits windows every {@code skip} items containing at most {@code count} items
     *         from the source ObservableSource
     * @throws IllegalArgumentException if either count or skip is non-positive
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<Observable<T>> window(long count, long skip) {
        return window(count, skip, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<Observable<T>> window(long count, long skip) {
        return window(count, skip, bufferSize());
    }
        @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<Observable<T>> window(long count, long skip, int bufferSize) {
        ObjectHelper.verifyPositive(count, "count");
        ObjectHelper.verifyPositive(skip, "skip");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableWindow<T>(this, count, skip, bufferSize));
    }

结合运行效果以及源码可以得出:window count的用法中还有一个skip参数,关于count和skip的对比关系可以参照buffer中的count以及skip的对比,关系原理是一致。它是决定是否完整的截取数据组装window还是重复或者跳过丢失数据的方式组装window,其中最为主要的ObservableWindow这个类实现了如何组装数据的功能以及决定如何发射数据功能。

window timeSpan 定时收集数据功能

window timeSpan原理图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符

    private fun operatorsWindowTimespan() {
        val logTag = "operators-WindowTimespan"
        Observable
                .interval(1, TimeUnit.SECONDS)
                .take(10)
                .window(3, TimeUnit.SECONDS)
                .subscribe(getBaseWindowTimeObserver(logTag))
    }
   //后续跟TimeSpan有关的都是使用这个Observer 
    private fun getBaseWindowTimeObserver(tag: String): Observer<Observable<Long>> =
            object : Observer<Observable<Long>> {
                override fun onComplete() { logIMessage(tag, "onComplete") }

                override fun onSubscribe(d: Disposable) {}

                override fun onNext(t: Observable<Long>) {
                    logIMessage(tag, "onNext")
                    t.subscribe { logIMessage(tag, "onNextValue$it") }
                }

                override fun onError(e: Throwable) {}
            }
    /**
     * 运行效果如下:
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue0
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue1
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue2
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * om.ypz.rxjavademo I/operators-WindowTimespan: onNextValue3
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue5
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue6
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue7
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue8
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNext
     * com.ypz.rxjavademo I/operators-WindowTimespan: onNextValue9
     * com.ypz.rxjavademo I/operators-WindowTimespan: onComplete
     * 相关源码:
     * */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.COMPUTATION)
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
        return window(timespan, unit, Schedulers.computation(), Long.MAX_VALUE, false);
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            Scheduler scheduler, long count, boolean restart) {
        return window(timespan, unit, scheduler, count, restart, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<Observable<T>> window(
            long timespan, TimeUnit unit, Scheduler scheduler,
            long count, boolean restart, int bufferSize) {
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.verifyPositive(count, "count");
        return RxJavaPlugins.onAssembly(new ObservableWindowTimed<T>(this, timespan, timespan, unit, scheduler, count, bufferSize, restart));
    }

结合运行结果以及源码可以的timeSpan的作用就是每隔一段时间将观察到的数据进行组装进入一个window然后将其发射出去,其核心源码就是的得益于ObservableWindowTimed的实现。

timeSpan 以及count的配合使用

timeSpan以及count的配合使用就是固定每隔一段时间将观察到的数据封装到固定大小的window中去,当观察到的数据量大于window的size的时候,则取最前面的数据量刚刚等于window的size,其余的则数据则封装到下一个window中去以此类推。
其原理图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
示例代码以及源码如下:

    private fun operatorsWindowTimesCompeteCount() {
        val tag = "operators-WindowTimesCompeteCount"
        Observable
                .range(1, 6)
                .window(4, TimeUnit.SECONDS,3)
                .subscribe(object : Observer<Observable<Int>> {
                    override fun onComplete() {
                        logIMessage(tag, "onComplete")
                    }

                    override fun onSubscribe(d: Disposable) {}

                    override fun onNext(t: Observable<Int>) {
                        logIMessage(tag, "onNext")
                        t.subscribe { logIMessage(tag, "onNextValue$it") }
                    }

                    override fun onError(e: Throwable) {}
                })
    }
    /**
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue1
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue2
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue3
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue4
     * com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue5
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNextValue6
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onNext
     *  com.ypz.rxjavademo I/operators-WindowTimesCompeteCount: onComplete
     * 相关源码如下
     * */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.COMPUTATION)
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            long count) {
        return window(timespan, unit, Schedulers.computation(), count, false);
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<Observable<T>> window(long timespan, TimeUnit unit,
            Scheduler scheduler, long count, boolean restart) {
        return window(timespan, unit, scheduler, count, restart, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<Observable<T>> window(
            long timespan, TimeUnit unit, Scheduler scheduler,
            long count, boolean restart, int bufferSize) {
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.verifyPositive(count, "count");
        return RxJavaPlugins.onAssembly(
        		new ObservableWindowTimed<T>(
        			this, timespan, timespan, unit, scheduler, count, bufferSize, restart));
    }

结合示例代码以及运行代码,不难得出timeSpan和count的搭配使用是固定观测时间、固定的窗口的大小组装数据然后发射窗口,timeSpan和count的搭配使用是基于ObservableWindowTimed的实现所得到的其中一种window效果。

buffer操作符

buffer操作符:periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
与Window相似通过使用活动窗口将需要发射的Observable封装到固定大小的活动窗口,然后发射这些窗口。
原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符

buffer基本使用

	//这是个BaseObserver后面skip和count的比较都会用到
    private fun getBaseOperatorsBufferObserver(): Observer<List<Int>> =
            object : Observer<List<Int>> {
                override fun onComplete() {

                }

                override fun onSubscribe(d: Disposable) {

                }

                override fun onNext(t: List<Int>) {
                    logIMessage("BaseOperatorsBufferObserver", "forEachValue$t")

                }

                override fun onError(e: Throwable) {

                }
            }
            
    private fun operatorsBuffer() =
            Observable
                    .range(1, 10).buffer(2)
                    .subscribe(getBaseOperatorsBufferObserver())
    /**
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */
     
    /**
     * 源码如下:
     * */
     
    /**
     * @param count
     *            the maximum number of items in each buffer before it should be emitted
     * @return an Observable that emits connected, non-overlapping buffers, each containing at most
     *         {@code count} items from the source ObservableSource
     * @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<List<T>> buffer(int count) {
        return buffer(count, count);
    }
    /**
     * @param count
     *            the maximum size of each buffer before it should be emitted
     * @param skip
     *            how many items emitted by the source ObservableSource should be skipped before starting a new
     *            buffer. Note that when {@code skip} and {@code count} are equal, this is the same operation as
     *            {@link #buffer(int)}.
     * @return an Observable that emits buffers for every {@code skip} item from the source ObservableSource and
     *         containing at most {@code count} items
     * @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<List<T>> buffer(int count, int skip) {
        return buffer(count, skip, ArrayListSupplier.<T>asCallable());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
        ObjectHelper.verifyPositive(count, "count");
        ObjectHelper.verifyPositive(skip, "skip");
        ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
        return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));
    }

结合示例代码以及其运行结果、源码分析可得buffer将每一个发射的数据根据count以及skip的大小进行组装进入一个list集合并按顺序将这些list返回其最终的实现是通过实现其内部的ArrayListSupplier.asCallable()然后返回需要注意的是count和skip必须大于0否则会出现异常

buffer中skip和count的比较

为什么要分析skip和count比较?

因为skipcountbuffer中缺一不可,buffer作用就像一个活动窗口收集数据,既然是活动窗口那么count决定了窗口的sizeskip决定了窗口是否要跳过元数据角标去获取数据
在分析count与skip的大小比较时先来一段Base代码:

    private fun operatorsBuffer_skipComparativeCount(count: Int, skip: Int) =
            Observable
                    .range(1, 6)
                    .buffer(count, skip)
                    .subscribe(getBaseOperatorsBufferObserver())

当count等于skip时

当count等于skip时,遍历所有窗口以及输出发现其所有数据都是按照原数据顺序以及skip多小进行平均移动截取数据。也就是平均分段截取数据集。
示例代码以及运行结果如下:

operatorsBuffer_skipComparativeCount(2, 2)
    /**
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */

当count大于skip时

先来看看示例代码以及运行结果

operatorsBuffer_skipComparativeCount(3, 2)
    /**
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2, 3]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3, 4, 5]
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5, 6]
     * */

在示例代码中可以明显看到count大于skip时,可以看到数据的截取是重复的。
将元数据每一项看出是一个列表,
第一次输出对应的0、1、2的角标,
第二次输出对应的2、3、4的角标,
第三次次输出对应的4、5的角标,
每一次移动的起始角标都是取自上一次移动的窗口末位元素对应的角标。直至可被截取元素的size小于活动的窗口size

当count小于skip时

先来看看示例代码以及运行结果

operatorsBuffer_skipComparativeCount(2, 3)
    /**
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1, 2]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[4, 5]
     * */
 
 operatorsBuffer_skipComparativeCount(1, 2)
    /**
     * 运行结果如下:
     * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[1]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[3]
	 * com.ypz.rxjavademo I/BaseOperatorsBufferObserver: forEachValue[5]
     * */

在示例代码中可以明显看到count小于skip时,可以看到数据的截取是丢失的。
以count为2和skip为3的示例将元数据看出是一个列表,
第一次输出对应的0、1的角标,
第二次输出对应的3、4的角标,
第一次移动的起始角标是0
第二移动的起始角标是3
再结合count为1以及skip为2的运行结果可得:
每一次移动的角标可以:(m-1)*skip = n 其中m代表移动次数n为移动角标
直到n的角标大于等于元素集的size的时候停止截取

scan操作符

scan操作符:apply a function to each item emitted by an Observable, sequentially, and emit each successive value。
通过实现apply进行函数变换达到累计效果
scan原理作用图如下:
RxJava2极速入门——Rxjava操作符详解之转换操作符
以下是示例代码作用图:
RxJava2极速入门——Rxjava操作符详解之转换操作符
示例代码以及相关源码如下:

    private fun operatorsScan() {
        var message = "从0加到1是:"
        Observable.range(1, 10).scan { t1: Int, t2: Int ->
            message = "从0加到${t2}是:"
            t1+t2
        }.subscribe {
            logIMessage("operatorsScan", "$message$it")
        }
    }
    /**
     * 运行结果如下
     * com.ypz.rxjavademo I/operatorsScan: 从0加到1是:1
     * com.ypz.rxjavademo I/operatorsScan: 从0加到2是:3
     * com.ypz.rxjavademo I/operatorsScan: 从0加到3是:6
     * com.ypz.rxjavademo I/operatorsScan: 从0加到4是:10
     * com.ypz.rxjavademo I/operatorsScan: 从0加到5是:15
     * com.ypz.rxjavademo I/operatorsScan: 从0加到6是:21
     * com.ypz.rxjavademo I/operatorsScan: 从0加到7是:28
     * com.ypz.rxjavademo I/operatorsScan: 从0加到8是:36
     * com.ypz.rxjavademo I/operatorsScan: 从0加到9是:45
     * com.ypz.rxjavademo I/operatorsScan: 从0加到10是:55
     * 源码如下:
     * */
    /**
     * Returns an Observable that applies a specified accumulator function to the first 
     * item emitted by a source ObservableSource, 
     * then feeds the result of that function along with the second item emitted by the source
     * ObservableSource into the same function, and so on 
     * until all items have been emitted by the source ObservableSource,
     * emitting the result of each of these iterations.
     * @param accumulator
     *            an accumulator function to be invoked on each item emitted 
     *            by the source ObservableSource, whose result will be emitted 
     *            to {@link Observer}s via {@link Observer#onNext onNext} 
     *            and used in the next accumulator call
     * @return an Observable that emits the results of each call to the accumulator function
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> scan(BiFunction<T, T, T> accumulator) {
        ObjectHelper.requireNonNull(accumulator, "accumulator is null");
        return RxJavaPlugins.onAssembly(new ObservableScan<T>(this, accumulator));
    }
    
public final class ObservableScan<T> extends AbstractObservableWithUpstream<T, T> {
    final BiFunction<T, T, T> accumulator;
    public ObservableScan(ObservableSource<T> source, BiFunction<T, T, T> accumulator) {
        super(source);
        this.accumulator = accumulator;
    }

    @Override
    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new ScanObserver<T>(t, accumulator));
    }

    static final class ScanObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        final BiFunction<T, T, T> accumulator;

        Disposable upstream;

        T value;

        boolean done;

        ScanObserver(Observer<? super T> actual, BiFunction<T, T, T> accumulator) {
            this.downstream = actual;
            this.accumulator = accumulator;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this);
            }
        }
     }
   }

结合作用图以及源码其实可以得到scan的作用就是一个函数累计的作用通过不断将上一步的计算结果传入的当前进行计算返回返回计算结果,其核心计算过程通过对BiFunction的实现,然后利用其ObservableScan去元数据在事件发送过程通过其内部实现去调用BiFunction进行数据转换。需要谨记在scan中的BiFunction中t1代表上一次的计算结果,但如果是第一次计算则会为0,t2代表当前需要发射的元数据。

总结

map是通过对Function中apply的实现达到函数转换的效果。
flatMap是通过对Function中apply的实现达到函数转换的效果,但是如果需要对转换后的函数在做一次转换或者观察操作则需要实现BiFunction,Function在BiFunction前调用;flatMap操作时merge操作线程存在不安全。
concatMap是通过Function中apply的实现达到函数变换的效果,对比与flatMap由于其concat操作,能够保证线程安全。
cast是通过Function中内部castFunction以及ArrayListCapacityCallable的实现,但是其底层还是通过map去实现的。有明确类型转换时且不需要使用数据计算转换的时候建议使用cast,但必须实现onError避免出现类型转换异常。
groupBy是通过Fuction中apply实现得到数据进行分组的作用,注意每一组分组只会回调一次,每一组分组回调顺序与其组内第一个成员所以元数据集合中所在角标有关,谁角标最小最先回调。
window观测数据将数据封装进入一个个窗口并发射出去,可定义窗口大小,定时观测等用法注意分别有两个实现的Observer,不要混淆其实现的原理。
buffer类似window的效果,buffer的实现是通过其内部ArrayListSupplier通过声明并实现Function以及Callable这两个接口而达到的效果,注意skip和count的大小关系要适当使用避免数据丢失或重复的问题。
scan一种函数累计的效果,通过对BiFunction的实现以及其内部封装好的ObservableScan这一个类的实现所达到的函数累计效果,注意在首次计算是要清楚知道t1为0时应该如何处理,当实现除法相关的函数累计效果应当避免某一回调参数为0,以此避免算术异常。
注意:使用任何转换操作符进行转换操作最好实现onError的传递的事件避免出现运行异常导致程序奔溃。
最后附上源码地址