如何合并多个rxjs BehaviourSubjects
我正在构建一个Angular2应用程序,并且有两个BehaviourSubjects
,我想将它们合并为一个订阅。我提出了两个http请求,并且希望在两人都回来时发起一个事件。我在看forkJoin
vs combineLatest
。看起来combineLatest会在任何一个behvaviorSubjects被更新时触发,而forkJoin只会在所有behaviorsSubjects被更新后触发。它是否正确?必须有一个普遍接受的模式,这不是吗?如何合并多个rxjs BehaviourSubjects
编辑
这里是我的angular2组件订阅我的behaviorSubjects的一个示例:
export class CpmService {
public cpmSubject: BehaviorSubject<Cpm[]>;
constructor(private _http: Http) {
this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
}
getCpm(id: number): void {
let params: URLSearchParams = new URLSearchParams();
params.set('Id', id.toString());
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.subscribe(_cpm => {
this.cpmSubject.subscribe(cpmList => {
//double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
})
});
}
}
这里是我的组件的订阅的一个片段:
this._cpmService.cpmSubject.subscribe(cpmList => {
doSomeWork();
});
但是相反在单个订阅上触发doSomeWork()我只想在cpmSubject和fooSubject触发时触发doSomeWork()。
你可以使用zip
- 运算符,它的工作原理类似于combineLatest或forkJoin,但只有当两个流都发出触发:http://reactivex.io/documentation/operators/zip.html
zip
和combineLatest
之间的区别是: 邮编将只会引发”并行“,而combineLatest
将触发任何更新并发出每个流的最新值。 因此,假设下列2个流:
streamA => 1--2--3
streamB => 10-20-30
与zip
:
- “1,10”
- “2,20”
- “3,30”
与combineLatest
:
- “1,10”
- “2,10”
- “2,20”
- “3,20”
- “3,30”
这里也是一个活例子:
const a = new Rx.Subject();
const b = new Rx.Subject();
Rx.Observable.zip(a,b)
.subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
.subscribe(x => console.log("combineLatest: " + x.join(", ")));
a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
另一个注意事项:永远不会订阅订阅。 做这样的事情,而不是:
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.withLatestFrom(this.cpmSubject)
.subscribe([_cpm, cpmList] => {
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
});
zip和combineLatest有什么区别? – cobolstinks
我已更新答案 – olsn
谢谢,详细的答案。我试图尝试一下,但我没有在我的Rx.Observable对象上找到zip方法。 – cobolstinks
一个http请求不能直接返回'BehaviorSubject' - 我猜想,你是'nexting' HTTP的响应每到一个'BehaviorSubject'或者甚至订阅'Subject'到'get/post/put'? – olsn
@olsn是的,我订阅了http响应,并将我的主题与他们在服务类中的回复联系起来 – cobolstinks
行为主题的公共访问是反模式。请使用带有“as Observable”的downcast的getter。所以你不能使用服务外的下一个电话 - >分离问题 –