使用RxJava更新并保留现有数据

使用RxJava更新并保留现有数据

问题描述:

我正在研究将我的(Android)应用程序中的某些逻辑转换为使用RxJava,但是我正在努力想出一种方法来执行一些更高级的逻辑。使用RxJava更新并保留现有数据

用例如下:我想向用户展示一种Feed。 Feed包含来自不同来源的项目,例如消息,文章等。由于API限制,应用程序本身必须收集各个资源并将其显示在一个列表中。是

例如,假设我在饲料项目如下:

class FeedItem { 
    Type feedItem; // Type of the item, e.g. article, message, etc. 
    ... 
} 

目前,该饲料是建立在一个单独的线程,并且UI使用监听器当饲料已更新通知。为了让你了解它是如何完成的,下面是一些(伪)Java代码(线程和其他管理代码为了清楚起见而被省略)。

class FeedProducer { 
    List<FeedItem> currentData = new ArrayList(); 

    public void refreshData() { 
     for (FeedSource source: getFeedSources()) { 
      Type sourceType = source.getType(); 
      // Remove existing items 
      currentData.removeIf(item -> item.feedItem.equals(sourceType)); 
      List<FeedItem> newItems = source.produceItems(); 
      // Add the new items 
      currentData.addAll(newItems); 
      // Notify the UI things have changed 
      notifyDataChanged(currentData); 
     } 
     // Notify the UI we are done loading 
     notifyLoadingComplete(); 
    } 
} 

这种方法refreshData()将在每次用户想要刷新数据时被调用。这样,可以仅更新一些源,而其他源保持不变(例如通过更改返回值getFeedSources())。

这些来源也在应用程序的其他部分单独使用;我已经将它们转换为Observables那里。这使事情变得更容易,例如如果数据库发生更改,则Observable简单地将更改推送到UI。

因此,我的问题是如何(优雅地)将这些可观察的源代码合并到一个Observable中,但是前面的结果存在一个“全局”状态。我研究过各种结合运营商,但还没有找到我需要的。我很抱歉,如果我忽略了一些显而易见的事情,因为我对RxJava相当陌生。

天真的办法是,来电保存最后一个列表,并给它作为参数,当您正在请求新的数据:

public class ReactiveMultipleSources { 

    // region Classes 
    public enum SourceType { 
     TYPE_ARTICLE, 
     TYPE_MESSAGE, 
     TYPE_VIDEO 
    } 

    public static class Feed { 
     private SourceType sourceType; 
     private String content; 

     Feed(SourceType sourceType, String content) { 
      this.sourceType = sourceType; 
      this.content = content; 
     } 

     SourceType getSourceType() { 
      return sourceType; 
     } 
    } 
    // endregion 

    public static void main(String[] args) throws InterruptedException { 
     final List<Feed>[] currentList = new List[]{new ArrayList()}; 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 
     System.out.println(); 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 


    } 

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) { 
     return Observable.fromIterable(getSourceTypes()) 
       .observeOn(Schedulers.io()) 
       // Get List<Feed> forEach sourceType 
       .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType) 
       .observeOn(Schedulers.computation()) 
       // Get list of "List of Feed for sourceType", = List<List<Feed>> 
       .toList() 
       .map(lists -> { 
        for (List<Feed> list : lists) { 
         SourceType sourceType = list.get(0).getSourceType(); 
         // Remove items of currentFeed whose sourceType has new List<Feed> 
         currentFeed.removeIf(temp -> temp.getSourceType() == sourceType); 
         // Add new items 
         currentFeed.addAll(list); 
        } 
        return currentFeed; 
       }) 
       .toObservable(); 
    } 

    // region Helper 
    private static List<SourceType> getSourceTypes() { 
     return new ArrayList<>(Arrays.asList(SourceType.values())); 
    } 

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) { 
     String content; 
     if (sourceType == SourceType.TYPE_ARTICLE) 
      content = "article "; 
     else if (sourceType == SourceType.TYPE_MESSAGE) 
      content = "message "; 
     else if (sourceType == SourceType.TYPE_VIDEO) 
      content = "video "; 
     else 
      content = "article "; 

     Feed feed1 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed2 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed3 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed4 = new Feed(sourceType, content + createRandomInt()); 

     return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4)); 
    } 

    // For simulating different items each time List<Feed> is required 
    private static int createRandomInt() { 
     return ThreadLocalRandom.current().nextInt(0, 21); 
    } 
    // endregion 
} 

输出示例:

article 19 
article 15 
article 18 
article 18 
message 3 
message 2 
message 9 
message 1 
video 19 
video 17 
video 18 
video 11 

article 0 
article 4 
article 18 
article 15 
message 11 
message 16 
message 16 
message 4 
video 1 
video 7 
video 20 
video 2 

如果您有3个单独的任务返回[0, 1],[10, 11][20, 21],您希望将它们合并到一个列表中。在这种情况下,您可以使用zip操作。

public class TestRx { 
    public static void main(String[] args) { 
     // some individual observables. 
     Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1)); 
     Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11)); 
     Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21)); 

     Observable.zip(observable1, observable2, observable3, 
       new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() { 
        @Override 
        public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) { 
         // TODO: Remove existing items 

         // merge all lists 
         List<Integer> mergedList = new ArrayList<>(); 
         mergedList.addAll(list1); 
         mergedList.addAll(list2); 
         mergedList.addAll(list3); 
         return mergedList; 
        } 
       }) 
       .subscribe(new Observer<List<Integer>>() { 
        @Override 
        public void onNext(List<Integer> mergedList) { 
         System.out.println(mergedList); 
         // TODO: notifyDataChanged(mergedList) 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.out.println(throwable.toString()); 
         // TODO: handle exceptions 
        } 

        @Override 
        public void onCompleted() { 
         // TODO: notifyLoadingComplete() 
        } 
       }); 
    } 
} 

因此,它打印像这样[0, 1, 10, 11, 20, 21]