RxJava - 如何启动订阅的lambda函数内的异步网络调用? Observable
下面是我的确切代码。注意foo(o)在订阅lambda函数内被调用。这给Android提供了严格的模式错误。现在我当然可以在这里分解一个线程,但这是Rxjava做事的方式?我现在不得不再次考虑线程吗?想在这里知道我最好的选择。我怎么能这样做RxJava的方式,使其他部分的代码可以订阅新的事件,这将返回网络呼叫响应?所以基本上,我通过异步响应将一种类型的事件MyObj转换为NetObj响应事件。RxJava - 如何启动订阅的lambda函数内的异步网络调用? Observable
public static final PublishSubject<MyObj> myObjSubject = PublishSubject.create();
public static final Observable<MyObj> observable = myObjSubject.asObservable();
protected CompositeSubscription myCompositeSubs = new CompositeSubscription();
myCompositeSubs.add(
observable.subscribe((MyObj o) -> {
// `. Update UI with MyObject and
// 2. Kick off network call which will return JsonResponse type object which I would like to possibly process here and also publish to let others respond to it.
foo(o); // $$$$$ million dollar question what if I am kicking off something long running here?
}));
private void foo(MyObj o){
// long running request. How to kick off long running from here?
JsonRespons resp = RestAdapter.makeQuest(o.url);
// I want this JsonResponse object to be an even others can listen for. I don't want to just process it right here.
}
更新:只是为了阐明我正在尝试做的是侦听类(Android Fragment)中的数据事件MyObject。一旦我得到了这个,我想做两件事情,用MyObject同步更新UI,这是我想要消费的东西,而不仅仅是转换。但我也想开始一个长期的请求。一个返回JsonObject响应的网络调用。这可能会在它返回时处理相同的片段,或者我可能希望其他类能够监听此事件。所以我并没有单独观察MyObject来进行网络调用并获得响应。我需要做两个操作,一个同步另一个异步。
这是使用RxJava运算符(如flatMap
)的绝佳机会。如果您需要更多文档,请参阅here,但flatMap
允许您带一种类型的Observable(您的Observable<MyObj>
)并返回另一种类型的Observable(在这种情况下,它可以是Observable<Void>
或Observable<Boolean>
,如果您想知道您的长时间运行请求是否完成)。
看看这个代码:
Subscription sub = observable.flatMap((MyObj o) -> {
// this will be run on your subscribeOn thread -- in background
return Observable.just(foo(o));
}).subscribe(aVoid -> {
// we have completed both operations here, emissions will be
// on our observeOn() thread
});
如果您foo(MyObj)
功能现在结构为:
private Void foo(MyObj o) {
// long running stuff
return null;
}
它使返回类型Void
,而不是void
,使我们能够很重要使用flatMap
运算符并获取返回的Observable<Void>
。
您所有的长期运行foo()
操作将在您的subscribeOn()
线程中运行,与您最初的观察到一起,并在操作完成后,它会发出Void
项目到您subscribe()
您observeOn()
线程(这可能是你的主要如果你想通知UI,则在Android中进行线程)。
更新,以允许在flatMap调用UI工作:
注:这是假定第一observable
定义subscribeOn()
在后台线程和observeOn()
到AndroidSchedulers.mainThread()
Subscription sub = observable.flatMap((MyObj o) -> {
// << do stuff with MyObj to UI HERE >>
return Observable.defer(() -> Observable.just(foo(o)))
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}).subscribe(aVoid -> {
// foo() has completed, we're back on the UI thread
});
如果你想用JsonResponse做一些事情,你可以改变foo()
函数返回该函数而不是Void
,那么你就可以使用的结果在UI线程上。
更新2:更改Observable.create()
到Observable.defer()
使其更清洁。
只是为了澄清我在一个类(Android Fragment),我想以这种方式处理此问题,以便我可以在接收MyObject时采取一些同步操作,但也希望进行异步调用,并在完成时或出错时执行其他操作。 – FunctionallyReactive
我有单身人士与各种观察员和主题,但让我们看看是否我返回在这一类内工作的Objservable.just(foo),但如果我想让响应在片段外部可见,该怎么办?例如,我想要映射到JSonResponse。所以外面的课我希望他们能够消费JSonResponse听。外来者听众如何利用JSonResponse对象? – FunctionallyReactive
所以我希望Fragment能够对MyObj采取一些操作,并启动这个异步操作,它将返回一个JsonResponse。它是我想要能够倾听或观察的那种反应。我可能想要在将它踢出或者其他地方的片段中处理它。所以我说的是单个类将使用MyObj类来更新UI,并且它将启动异步。一旦返回,我希望它可以发布,或者有一种方法来监听流的JsonResponse类型。 – FunctionallyReactive
看起来像这种类型的问题可能已经在这里讨论:https://github.com/ReactiveX/RxJava/issues/1574 – FunctionallyReactive