使用RxJava更新并保留现有数据
问题描述:
我正在研究将我的(Android)应用程序中的某些逻辑转换为使用RxJava,但是我正在努力想出一种方法来执行一些更高级的逻辑。使用RxJava更新并保留现有数据
用例如下:我想向用户展示一种Feed。 Feed包含来自不同来源的项目,例如消息,文章等。由于API限制,应用程序本身必须收集各个资源并将其显示在一个列表中。是
例如,假设我在饲料项目如下:
class FeedItem {
Type feedItem; // Type of the item, e.g. article, message, etc.
...
}
目前,该饲料是建立在一个单独的线程,并且UI使用监听器当饲料已更新通知。为了让你了解它是如何完成的,下面是一些(伪)Java代码(线程和其他管理代码为了清楚起见而被省略)。
class FeedProducer {
List<FeedItem> currentData = new ArrayList();
public void refreshData() {
for (FeedSource source: getFeedSources()) {
Type sourceType = source.getType();
// Remove existing items
currentData.removeIf(item -> item.feedItem.equals(sourceType));
List<FeedItem> newItems = source.produceItems();
// Add the new items
currentData.addAll(newItems);
// Notify the UI things have changed
notifyDataChanged(currentData);
}
// Notify the UI we are done loading
notifyLoadingComplete();
}
}
这种方法refreshData()
将在每次用户想要刷新数据时被调用。这样,可以仅更新一些源,而其他源保持不变(例如通过更改返回值getFeedSources()
)。
这些来源也在应用程序的其他部分单独使用;我已经将它们转换为Observables那里。这使事情变得更容易,例如如果数据库发生更改,则Observable简单地将更改推送到UI。
因此,我的问题是如何(优雅地)将这些可观察的源代码合并到一个Observable中,但是前面的结果存在一个“全局”状态。我研究过各种结合运营商,但还没有找到我需要的。我很抱歉,如果我忽略了一些显而易见的事情,因为我对RxJava相当陌生。
答
天真的办法是,来电保存最后一个列表,并给它作为参数,当您正在请求新的数据:
public class ReactiveMultipleSources {
// region Classes
public enum SourceType {
TYPE_ARTICLE,
TYPE_MESSAGE,
TYPE_VIDEO
}
public static class Feed {
private SourceType sourceType;
private String content;
Feed(SourceType sourceType, String content) {
this.sourceType = sourceType;
this.content = content;
}
SourceType getSourceType() {
return sourceType;
}
}
// endregion
public static void main(String[] args) throws InterruptedException {
final List<Feed>[] currentList = new List[]{new ArrayList()};
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
System.out.println();
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
}
private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) {
return Observable.fromIterable(getSourceTypes())
.observeOn(Schedulers.io())
// Get List<Feed> forEach sourceType
.concatMap(ReactiveMultipleSources::getFeedItemsBySourceType)
.observeOn(Schedulers.computation())
// Get list of "List of Feed for sourceType", = List<List<Feed>>
.toList()
.map(lists -> {
for (List<Feed> list : lists) {
SourceType sourceType = list.get(0).getSourceType();
// Remove items of currentFeed whose sourceType has new List<Feed>
currentFeed.removeIf(temp -> temp.getSourceType() == sourceType);
// Add new items
currentFeed.addAll(list);
}
return currentFeed;
})
.toObservable();
}
// region Helper
private static List<SourceType> getSourceTypes() {
return new ArrayList<>(Arrays.asList(SourceType.values()));
}
private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) {
String content;
if (sourceType == SourceType.TYPE_ARTICLE)
content = "article ";
else if (sourceType == SourceType.TYPE_MESSAGE)
content = "message ";
else if (sourceType == SourceType.TYPE_VIDEO)
content = "video ";
else
content = "article ";
Feed feed1 = new Feed(sourceType, content + createRandomInt());
Feed feed2 = new Feed(sourceType, content + createRandomInt());
Feed feed3 = new Feed(sourceType, content + createRandomInt());
Feed feed4 = new Feed(sourceType, content + createRandomInt());
return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4));
}
// For simulating different items each time List<Feed> is required
private static int createRandomInt() {
return ThreadLocalRandom.current().nextInt(0, 21);
}
// endregion
}
输出示例:
article 19
article 15
article 18
article 18
message 3
message 2
message 9
message 1
video 19
video 17
video 18
video 11
article 0
article 4
article 18
article 15
message 11
message 16
message 16
message 4
video 1
video 7
video 20
video 2
答
如果您有3个单独的任务返回[0, 1]
,[10, 11]
和[20, 21]
,您希望将它们合并到一个列表中。在这种情况下,您可以使用zip
操作。
public class TestRx {
public static void main(String[] args) {
// some individual observables.
Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1));
Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11));
Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21));
Observable.zip(observable1, observable2, observable3,
new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
// TODO: Remove existing items
// merge all lists
List<Integer> mergedList = new ArrayList<>();
mergedList.addAll(list1);
mergedList.addAll(list2);
mergedList.addAll(list3);
return mergedList;
}
})
.subscribe(new Observer<List<Integer>>() {
@Override
public void onNext(List<Integer> mergedList) {
System.out.println(mergedList);
// TODO: notifyDataChanged(mergedList)
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.toString());
// TODO: handle exceptions
}
@Override
public void onCompleted() {
// TODO: notifyLoadingComplete()
}
});
}
}
因此,它打印像这样[0, 1, 10, 11, 20, 21]
。