RxJava2.0-操作符-Creating用法
简介
操作符是RxJava中最重要的一部分,主要分为以下几类:创建操作符(Creating ),转换操作符(Transforming),过滤操作符(Filtering),组合操作符(Combining),辅助操作符(Observable
Utility),条件操作符(Conditional and Boolean),数学运算操作符(Mathematical
and Aggregate),转化操作符(Convert),链接操作符(Connectable)背压操作符(Backpressure)等;可以根据需要实现的效果,依次调用相应的操作符;
Creating操作符的主要方法分别为:
create():在subscribe()方法中通过emitter发送事件;
just():可以发送1个或者多个对象;多个对象通过循环遍历,顺序发送事件
from():常用的有fromArray和fromIterable,通过循环遍历,顺序发送事件;
range():range和rangeLong,从start到start + count通过循环遍历,顺序发送事件;
timer():发送延时事件
interval():interval(long initialDelay, long period, TimeUnit unit) 从0到initialDelay,然后每隔period时间,发送一次事件;
我们依次来了解各个方法的用法;
1)create()方法
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {//emitter发射器,
emitter.onNext("Hello World I ");
emitter.onComplete();
//emitter.onError(new Throwable("null"));
emitter.onNext("I");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(tag, "onSubscribe");
//销毁资源,后续的方法不在执行;
//d.dispose();
}
@Override
public void onNext(String s) {
Log.e(tag, s);
}
@Override
public void onError(Throwable t) {
Log.e(tag, "onError");
}
@Override
public void onComplete() {
Log.e(tag, "onComplete");
}
});
运行结果
emitter可以发送多个onNext()方法,一旦发送onComplete()或者onError()方法之后,Disposable将关闭,后续的操作将不再发送;
2)just()参数最多有10,根据先后顺序依次发送;
Observable.just("hhhh", "hhh").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(tag, s);
}
});
运行结果
Consumer接收单一值的接口;subscribe(Consumer<? super T> onNext);从LambdaObserver类中
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
可以看出只有onNext.accept(t)才会调用;也就是说只接收发送的onNext事件才会调用accept()方法;
3)from():通过循环遍历,顺序发送事件;
Integer[] arr = {1, 2, 3, 4};
Observable.fromArray(arr).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(tag, String.valueOf(integer));
}
});
List<String> list = new ArrayList<>();
list.add("Hello");
list.add("World");
//遍历一遍集合,发送
Observable.fromIterable(list).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(tag, s);
}
});
运行结果
4)range()从start到start + count通过循环遍历,顺序发送事件;
Observable.range(10, 3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(tag, String.valueOf(integer));
}
});
运行结果
5)timer()发送延时事件,内部默认指定Schedulers.computation(),也可以指定Scheduler ;ComputationScheduler内部维持一个线程池;
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(tag, "accept timer");
}
});
6)interval()从0到initialDelay,然后每隔period时间,发送一次事件,内部默认指定Schedulers.computation(),也可以指定Scheduler
Observable.interval(1, 5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(tag, String.valueOf(aLong));
}
});
运行结果
以上是Creating的常用的主要方法,其内部原理可参考RxJava2.0-Observable原理分析之Create操作符;如有问题,请多指教!