RxJava2.x之转换操作符
参考资料 https://mcxiaoke.gitbooks.io/rxdocs/content/
1.buffer
根据缓冲容量大小发送新的观察者对象,接收的是缓冲区元素组成的集合体。
Observable.just("111", "222", "333", "444", "555") .buffer(3) .subscribe(new Observer<List<String>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<String> strings) { Log.d("buffer", "onNext: 接收的数据--->" + strings.size() + " " + strings); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); /** * 输出结果 * 03-27 14:24:30.048 14508-14508/com.example.testlink D/buffer: onNext: 接收的数据--->3 [111, 222, 333] * 03-27 14:24:30.048 14508-14508/com.example.testlink D/buffer: onNext: 接收的数据--->2 [444, 555] */
如果上游发射对象是这样的:
int[] is = new int[]{1,2,3,4,5,6,7}; Observable.fromArray(is).buffer(3)
那么下游接收的数据是:
/** * 03-27 14:33:14.358 14596-14596/com.example.testlink D/buffer: onNext: 接收的数据--->1 [[[email protected]] */
结果最终的输出集合体大小是1,内容是一个对象地址值。因为上游只发射一个数据过来。
2.Map
可以将任意发射数据转换成任意数据类型或对象,灵活度高。
Observable.just("100").map(new Function<String, Integer >() { @Override public Integer apply(String s) throws Exception { Log.d("map", "apply: " + s); return 10086; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.d("map", "onNext:接收的数据---> " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); /** * 将上游发射的 String 类型 转换成 integer 数据类型后发射给下游接收。 * 03-27 15:36:07.138 15066-15066/com.example.testlink D/apply: apply: 100 * 03-27 15:36:07.138 15066-15066/com.example.testlink D/map: onNext:接收的数据---> 10086 */
3.flatMap
同样是类型转换,与Map相比的区别就是:Map转换后发射的数据类型可以是任意,但是flatMap转换后反射出去的是一个新的观察者对象:ObservableSource。
List<String> list2 = new ArrayList<>(); list2.add("资源a"); list2.add("资源b"); list2.add("资源c"); list2.add("资源d"); user user2 = new user("yy", "20", list2); Observable.just(user2).flatMap(new Function<user, ObservableSource<user>>() { @Override public ObservableSource<user> apply(user s) throws Exception { Log.d("flatMap", "apply: " + s.age); return Observable.just(new user(s.age)); } }).subscribe(new Observer<user>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(user o) { Log.d("flatMap", "onNext:接收的数据---> " + o.name); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); /** * 03-27 15:58:52.458 15434-15434/? D/flatMap: apply: 20 * 03-27 15:58:52.458 15434-15434/? D/flatMap: onNext:接收的数据---> pp */
简单user类:
public class user { public String name; public String age; public List<String> nets; public user(String name, String age, List<String> nets) { this.age = age; this.name = name; this.nets = nets; } public user(String age) { this.age = age; this.name = "pp"; this.nets = null; } }
上面逻辑是将一个user对象同过flatMap拿到发射user对象的age,赋值给一个新的user对象,最后打印新的user的name。当然也可以转换成另外的ObservableSource类型数据。
4.GroupBy
直接字面上的意思理解就是分组。根据某种条件将发射出去的数据进行排序。
Observable.just(0, 1, 2, 3, 4, 5, 6)
.groupBy(new Function<Integer, Boolean>() {
@Override
public Boolean apply(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).subscribe(new Observer<GroupedObservable<Boolean, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(GroupedObservable<Boolean, Integer> object) {
// Log.d("groupBy", "accept1:接收的数据类型---> " + object.getKey());
object.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("groupBy", "accept2:接收的数据类型---> "+object.getKey() +" --> "+ integer);
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/**
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 0
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 1
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 2
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 3
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 4
* com.example.testlink D/groupBy: accept2:接收的数据类型---> false --> 5
* com.example.testlink D/groupBy: accept2:接收的数据类型---> true --> 6
*/
5.scan
看官方解释:
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source * ObservableSource, then feeds the result of that function along with the second item emitted by the source * ObservableSource into the same function, and so on until all items have been emitted by the source ObservableSource, * emitting the result of each of these iterations. * <p>
这个意识大概说返回一个观察对象,这个对象通过某种方法先应用在第一个发射数据,然后将这个结果应用在发射的第二项数据上,直到最后一个发射数据。假定给他定义的方法是加法,每次发射的数据源是int 值,那么最终的结果就是全部发射数据的和了。
Observable.just(1, 2, 3, 4, 5) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.d("scan", "integer: " + integer + " integer2= " + integer2); return integer + integer2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { Log.d("scan", "onNext: " + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); /** * /com.example.testlink D/scan: onNext: 1 /com.example.testlink D/scan: integer: 1 integer2= 2 /com.example.testlink D/scan: onNext: 3 /com.example.testlink D/scan: integer: 3 integer2= 3 /com.example.testlink D/scan: onNext: 6 /com.example.testlink D/scan: integer: 6 integer2= 4 /com.example.testlink D/scan: onNext: 10 /com.example.testlink D/scan: integer: 10 integer2= 5 /com.example.testlink D/scan: onNext: 15 */
这里第一次打印没有走加法运算,而是直接到next里面去了,第二次才开始走加法运算,第一个参数是这个方法产生的结果,第二个参数是发射的数据源。
6.window
Observable.just(1, 2, 3, 4, 5, 6, 7) .window(5) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Observable<Integer> ob) { ob.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d("window", "accept: 接受的数据----> " + integer); } }); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); /** * 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 1 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 2 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 3 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 4 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 5 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 6 03-27 17:42:07.418 16433-16433/com.example.testlink D/window: accept: 接受的数据----> 7 */
emmm,看到这个日志是懵的,没看出什么特别的地方。查阅一下文档后从下面这个图来理解的话,window作用是根据count的数量将发射的数据源分组。
上面就是一些常用到的转换的操作符,列举的例子都是很简单的举例。当然这并不是所有的转换操作符都在这儿,没有列举的,碰到的在以后慢慢探索。
每天进步一点点!U_QAQ_U