RxJava多线程与境界 - 不正确的线程

RxJava多线程与境界 - 不正确的线程

问题描述:

背景RxJava多线程与境界 - 不正确的线程

我用我的应用程序内的境界境界的访问。当数据加载后,它会经历强烈的处理,因此处理发生在后台线程上。

正在使用的编码模式是工作单元模式,Realm只存在于DataManager下的存储库中。这里的想法是每个存储库可以有不同的数据库/文件存储解决方案。

我已经试过

下面是一些类似的代码,我在我的FooRespository类的例子。

这里的想法是获得Realm的一个实例,用于查询感兴趣对象的领域,返回它们并关闭领域实例。请注意,这是同步的,并在最后将对象从Realm复制到非托管状态。

public Observable<List<Foo>> getFoosById(List<String> fooIds) { 

    Realm realm = Realm.getInstance(fooRealmConfiguration); 

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); 

    for(String id : fooIds) { 

     findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id); 
     findFoosByIdQuery.or(); 
    } 

    return findFoosByIdQuery 
      .findAll() 
      .asObservable() 
      .doOnUnsubscribe(realm::close) 
      .filter(RealmResults::isLoaded) 
      .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos)))); 
} 

该代码后面结合经由RxJava使用与重处理代码:

dataManager.getFoosById(foo) 
      .flatMap(this::processtheFoosInALongRunningProcess) 
      .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc 
      .subscribe(tileChannelSubscriber); 

阅读该文档之后,我相信的是,在上述应该工作,因为它不是异步的,因此不需要循环线程。我在同一个线程中获得领域的实例,因此它不在线程之间传递,也不是对象。

当上面的执行,我得到不正确的线程

境界接入问题

。领域对象只能在创建它们的线程*问 。

这看起来不对。我唯一能想到的就是Realm实例池正在让我使用主线程从另一个进程创建的现有实例。

+0

它会工作,如果你试图做一些像'Observable.flatMap {dataManager.getFoosById(foo)''? – wint

+0

你的意思是平面图我张贴在内部的整个链或只是第一部分? –

+0

我不是专家,但我认为你需要从'Schedulers.io'的相同线程获取Realm对象吗?大概是这样 '''Observable.flatMap(dataManager.getFoosById(富)) .flatMap(这:: processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io())//可能是Schedulers.computation()等 .subscribe (tileChannelSubscriber);''' 虽然 – wint

凯所以

return findFoosByIdQuery 
     .findAll() 
     .asObservable() 

这发生在UI线程,因为这是你从最初的

.subscribeOn(Schedulers.io()) 
叫它

Aaaaand然后你在Schedulers.io()上修补它们。

不,这不是一回事!

虽然我不喜欢复制的从零拷贝数据库的方法,您目前的做法是百病之因realmResults.asObservable()滥用的问题,所以这里是为您的代码应该是什么搅局:

public Observable<List<Foo>> getFoosById(List<String> fooIds) { 
    return Observable.defer(() -> { 
     try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works 
      RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); 
      for(String id : fooIds) { 
       findFoosByIdQuery.equalTo(FooFields.ID, id); 
       findFoosByIdQuery.or(); // please guarantee this works? 
      } 
      RealmResults<Foo> results = findFoosByIdQuery.findAll(); 
      return Observable.just(realm.copyFromRealm(results)); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 
+0

顺便说一句,值得注意的是另一个解决方案不起作用,因为asObservable()会在非循环后台线程上崩溃。只是在说'。 – EpicPandaForce

+0

谢谢你。在我的思考中,我并不孤单,RealmResults的asObservable()的真正意义是什么? –

+1

在我的文章中有一个例子https://medium.com/@Zhuinden/how-to-use-realm-for-android-like-a-champ-and-how-to-tell-if-youre-这样做,它-错ac4f66b7f149#。当你到达那里时,你会看到它(简短的回答是,在looper线程(即UI线程)上,它会自动添加一个更改侦听器,通知您并在取消订阅时将其删除) – EpicPandaForce

请注意,您正在所有RxJava处理管道之外创建实例。因此,在主线程(或任何线程上,当您拨打getFoosById()时,

仅仅因为该方法返回一个Observable并不意味着它运行在另一个线程上只有最后创建的Observable的处理管道您getFoosById()方法的声明在正确的线程(filter(),在flatMap()和所有调用者进行的处理)。

因此,你必须确保getFoosById()呼叫通过Schedulers.io()使用的线程上已经完成运行。实现这一

一种方式是通过使用Observable.defer()

Observable.defer(() -> dataManager.getFoosById(foo)) 
      .flatMap(this::processtheFoosInALongRunningProcess) 
      .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc 
      .subscribe(tileChannelSubscriber); 
+1

Wolfram,很高兴你看到了这一点。我会在早上尝试。这段代码可以很容易地添加到位于调用者和存储库之间的DataManager中。 –