如何在调用rx.Observable.sample()时接收最后的序列发射?

如何在调用rx.Observable.sample()时接收最后的序列发射?

问题描述:

我正在从远程URL读取文件并使用RxJava报告下载进度。文件编写器Observable发出一系列DownloadProgress对象。由于很多项目正在发射,我使用Observable.sample()来管理背压。这非常有效 - UI更新以恒定的速率进行,并且没有背压问题,但最后的进度更新几乎总是被忽略。如何在调用rx.Observable.sample()时接收最后的序列发射?

我想收到Observable序列中的最后一项,以便我可以用最终进度更新UI。确保始终发出Observable序列中最后一项的最佳方法是什么?

Observable<Response> fileReader = 
     Rx.okHttpGetRequest(url); 
OkHttpResponseWriter fileWriter = 
     Rx.okHttpResponseWriter(outFile); 

Subscription subscription = fileReader.flatMap(fileWriter) 
     .sample(1, TimeUnit.SECONDS) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Subscriber<DownloadProgress>() { 
      @Override 
      public void onCompleted() {} 
      @Override 
      public void onError(Throwable e) {} 
      @Override 
      public void onNext(DownloadProgress progress) { 
       // I want to receive an update every one second 
       // I also want to always receive the last progress update 
      } 
     }); 
+1

就处理得'onCompleted' ?如果我理解正确,您需要最后一次更新来处理完成的下载? – 2014-10-01 16:35:20

+1

'DownloadProgress'对象包含我想要向用户公开的信息,包括从文件中读取的总字节数。 'onCompleted'通知用户序列已完成,但不会重新发送序列中的最后一个项目。假设从该值读取数据至关重要。 – weefbellington 2014-10-01 19:04:17

另外,使用buffer代替sample,将让你发射的“额外“最后的人。我建议这样做,这样你就不会失去任何可组合性,也不会为将来的开发人员(包括未来的你)带来麻烦。

private Subscriber<List<Integer>> loggingSubscriber2 = new SimpleSubscriber<List<Integer>>() { 
    @Override 
    public void onNext(List<Integer> integers) { 
     Log.v(TAG, String.valueOf(integers.get(integers.size() - 1))); 
    } 
}; 

private void startObservableTests() { 
    Observable<Integer> fileObserver = Observable.create(
      new Observable.OnSubscribe<Integer>() { 
       @Override 
       public void call(Subscriber<? super Integer> subscriber) { 
        for (int i = 1; i <= 9; i++) { 
         subscriber.onNext(i); 
         try { 
          Thread.sleep(100); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        subscriber.onCompleted(); 
       } 
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 

    fileObserver.buffer(500, TimeUnit.MILLISECONDS).subscribe(loggingSubscriber2); 
} 
+0

这是一个非常聪明的解决方案,我永远不会想到。我发布了一个使用'window'而不是'buffer'的fancier版本,但它基本上是相同的技术。 – weefbellington 2014-10-02 13:40:45

这是我结束了:

private void startObservableTests() { 

    Observable<Integer> fileObserver = Observable.create(
      new Observable.OnSubscribe<Integer>() { 
       @Override 
       public void call(Subscriber<? super Integer> subscriber) { 
        for (int i = 1; i <= 9; i++) { 
         subscriber.onNext(i); 
         try { 
          Thread.sleep(100); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        subscriber.onCompleted(); 
       } 
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 


    fileObserver.sample(500, TimeUnit.MILLISECONDS).subscribe(new SuperSubscriber(fileObserver)); 
} 

private class SuperSubscriber extends Subscriber<Integer> { 

    Observable obs; 

    public SuperSubscriber(Observable<Integer> fileObserver) { 
     obs = fileObserver; 
    } 

    @Override 
    public void onCompleted() { 
     obs.last().subscribe(new SimpleSubscriber<Integer>() { 
      @Override 
      public void onNext(Integer o) { 
       Log.v(TAG, "final value was " + o); 
      } 
     }); 
    } 

    @Override 
    public void onError(Throwable e) { 

    } 

    @Override 
    public void onNext(Integer o) { 
     Log.v(TAG, "got a " + o); 
    } 
} 

这是输出:

10-01 15:18:36.893 V/TAG (26129): got a 5 
10-01 15:18:38.214 V/TAG (26129): final value was 9 

以@Travis发布的优秀解决方案为基础,以下是我最终的代码。代替buffer,我组合使用windowswitchOnNext做一些进一步的处理:

Observable<Response> fileReader = 
    Rx.okHttpGetRequest(fileInfo.getUrl()); 
OkHttpResponseWriter fileWriter = 
    Rx.okHttpResponseWriter(outFile, totalBytesRead); 
Subscription subscription = Observable.switchOnNext(fileReader 
     .flatMap(fileWriter) 
     .window(1, TimeUnit.SECONDS)) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Subscriber<DownloadProgress>() { 
      @Override 
      public void onCompleted() {} 
      @Override 
      public void onError(Throwable e) {} 
      @Override 
      public void onNext(DownloadProgress progress) { 
       // always emits the most recent DownloadProgress, even the last one! 
      } 
}); 

说明:

  • fileReader需要URL,创建一个HTTP请求并发出Response
  • fileReader.flatMap(fileWriter)Response并将字节流写入磁盘,发出一系列DownloadProgress对象
  • window(1, TimeUnit.Seconds)发出List<DownloadProgress>对象每一秒,然后包装并重新发出他们为Observable<DownloadProgress>
  • Observable.switchOnNext()注意到最近发出Observable<DownloadProgress>并发出最后DownloadProgress对象序列中
+0

我得到一个MissingBackpressureException – user949884 2015-02-24 12:00:05

+0

我有什么想法吗? – user949884 2015-02-24 12:23:56