如何在调用rx.Observable.sample()时接收最后的序列发射?
我正在从远程URL读取文件并使用RxJava报告下载进度。文件编写器Observable发出一系列DownloadProgress对象。由于很多项目正在发射,我使用Observable.sample()来管理背压。这非常有效 - UI更新以恒定的速率进行,并且没有背压问题,但最后的进度更新几乎总是被忽略。如何在调用rx.Observable.sample()时接收最后的序列发射?
我想收到Observable序列中的最后一项,以便我可以用最终进度更新UI。确保始终发出Observable序列中最后一项的最佳方法是什么?
Observable<Response> fileReader =
Rx.okHttpGetRequest(url);
OkHttpResponseWriter fileWriter =
Rx.okHttpResponseWriter(outFile);
Subscription subscription = fileReader.flatMap(fileWriter)
.sample(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<DownloadProgress>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(DownloadProgress progress) {
// I want to receive an update every one second
// I also want to always receive the last progress update
}
});
另外,使用buffer
代替sample
,将让你发射的“额外“最后的人。我建议这样做,这样你就不会失去任何可组合性,也不会为将来的开发人员(包括未来的你)带来麻烦。
private Subscriber<List<Integer>> loggingSubscriber2 = new SimpleSubscriber<List<Integer>>() {
@Override
public void onNext(List<Integer> integers) {
Log.v(TAG, String.valueOf(integers.get(integers.size() - 1)));
}
};
private void startObservableTests() {
Observable<Integer> fileObserver = Observable.create(
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 1; i <= 9; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
fileObserver.buffer(500, TimeUnit.MILLISECONDS).subscribe(loggingSubscriber2);
}
这是一个非常聪明的解决方案,我永远不会想到。我发布了一个使用'window'而不是'buffer'的fancier版本,但它基本上是相同的技术。 – weefbellington 2014-10-02 13:40:45
这是我结束了:
private void startObservableTests() {
Observable<Integer> fileObserver = Observable.create(
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 1; i <= 9; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
fileObserver.sample(500, TimeUnit.MILLISECONDS).subscribe(new SuperSubscriber(fileObserver));
}
private class SuperSubscriber extends Subscriber<Integer> {
Observable obs;
public SuperSubscriber(Observable<Integer> fileObserver) {
obs = fileObserver;
}
@Override
public void onCompleted() {
obs.last().subscribe(new SimpleSubscriber<Integer>() {
@Override
public void onNext(Integer o) {
Log.v(TAG, "final value was " + o);
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer o) {
Log.v(TAG, "got a " + o);
}
}
这是输出:
10-01 15:18:36.893 V/TAG (26129): got a 5
10-01 15:18:38.214 V/TAG (26129): final value was 9
以@Travis发布的优秀解决方案为基础,以下是我最终的代码。代替buffer
,我组合使用window
与switchOnNext
做一些进一步的处理:
Observable<Response> fileReader =
Rx.okHttpGetRequest(fileInfo.getUrl());
OkHttpResponseWriter fileWriter =
Rx.okHttpResponseWriter(outFile, totalBytesRead);
Subscription subscription = Observable.switchOnNext(fileReader
.flatMap(fileWriter)
.window(1, TimeUnit.SECONDS))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<DownloadProgress>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(DownloadProgress progress) {
// always emits the most recent DownloadProgress, even the last one!
}
});
说明:
-
fileReader
需要URL,创建一个HTTP请求并发出Response
-
fileReader.flatMap(fileWriter)
取Response
并将字节流写入磁盘,发出一系列DownloadProgress
对象 -
window(1, TimeUnit.Seconds)
发出List<DownloadProgress>
对象每一秒,然后包装并重新发出他们为Observable<DownloadProgress>
-
Observable.switchOnNext()
注意到最近发出Observable<DownloadProgress>
并发出最后DownloadProgress
对象序列中
我得到一个MissingBackpressureException – user949884 2015-02-24 12:00:05
我有什么想法吗? – user949884 2015-02-24 12:23:56
就处理得'onCompleted' ?如果我理解正确,您需要最后一次更新来处理完成的下载? – 2014-10-01 16:35:20
'DownloadProgress'对象包含我想要向用户公开的信息,包括从文件中读取的总字节数。 'onCompleted'通知用户序列已完成,但不会重新发送序列中的最后一个项目。假设从该值读取数据至关重要。 – weefbellington 2014-10-01 19:04:17