根据特定条件在RxJs中加入两个可观察对象流
我有两个对象流,即帐户和余额。根据特定条件在RxJs中加入两个可观察对象流
我需要合并(加入)两个流根据id
和account_id
var accounts = Rx.Observable.from([
{ id: 1, name: 'account 1' },
{ id: 2, name: 'account 2' },
{ id: 3, name: 'account 3' },
]);
var balances = Rx.Observable.from([
{ account_id: 1, balance: 100 },
{ account_id: 2, balance: 200 },
{ account_id: 3, balance: 300 },
]);
什么是预期:
var results = [
{ id: 1, name: 'account 1', balance: 100},
{ id: 2, name: 'account 2', balance: 200},
{ id: 3, name: 'account 3', balance: 300},
];
是与RxJs这可行吗?
要清楚我知道如何用普通的js/lodash或类似的东西来做到这一点。在我的情况下,我从Angular Http Module获得这些流,所以我问在这种情况下我是否可以从RxJs中受益
对于您的某条评论,您的示例是模拟来自Angular Http调用的流。
所以不是:
var accounts = Rx.Observable.from([
{ id: 1, name: 'account 1' },
{ id: 2, name: 'account 2' },
{ id: 3, name: 'account 3' },
]);
var balances = Rx.Observable.from([
{ account_id: 1, balance: 100 },
{ account_id: 2, balance: 200 },
{ account_id: 3, balance: 300 },
]);
我宁愿说它是:
var accounts = Rx.Observable.of([
{ id: 1, name: 'account 1' },
{ id: 2, name: 'account 2' },
{ id: 3, name: 'account 3' },
]);
var balances = Rx.Observable.of([
{ account_id: 1, balance: 100 },
{ account_id: 2, balance: 200 },
{ account_id: 3, balance: 300 },
]);
为什么:from
将逐一发出的每个项目,of
会散发出整个数组我猜你的http响应是整个阵列。
这就是说,你可能想实现的是:
const { Observable } = Rx;
// simulate HTTP requests
const accounts$ = Rx.Observable.of([
{ id: 1, name: 'account 1' },
{ id: 2, name: 'account 2' },
{ id: 3, name: 'account 3' }
]);
const balances$ = Rx.Observable.of([
{ account_id: 1, balance: 100 },
{ account_id: 2, balance: 200 },
{ account_id: 3, balance: 300 }
]);
// utils
const joinArrays = (accounts, balances) =>
accounts
.map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));
const findBalanceByAccountId = (balances, id) =>
balances.find(balance => balance.account_id === id) || { balance: 0 };
const print = (obj) => JSON.stringify(obj, null, 2)
// use forkJoin to start both observables at the same time and not wait between every request
Observable
.forkJoin(accounts$, balances$)
.map(([accounts, balances]) => joinArrays(accounts, balances))
.do(rslt => console.log(print(rslt)))
.subscribe();
输出:
[
{
"id": 1,
"name": "account 1",
"balance": 100
},
{
"id": 2,
"name": "account 2",
"balance": 200
},
{
"id": 3,
"name": "account 3",
"balance": 300
}
]
这里有一个工作Plunkr:https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview
编辑1: 在数组上编写结果的工作可能不是性能的最佳想法,而不是返回数组,可能尝试返回一个具有该键的帐户ID。这样,你可以简单地删除findBalanceByAccountId
功能,并具有更快的应用程序(这里只修改后的代码):
const balances$ = Rx.Observable.of({
1: { account_id: 1, balance: 100 },
2: { account_id: 2, balance: 200 },
3: { account_id: 3, balance: 300 }
});
// utils
const joinArrays = (accounts, balances) =>
accounts
.map(account => Object.assign(
{},
account,
{ balance: balances[account.id].balance }
));
谢谢,但不幸的是,你的假设是不正确的:s,实际上,我将数据视为流的对象'Observable
可以使用的GroupBy操作者ID加入两个项目:
Rx.Observable.merge(accounts, balances.map(({account_id, balance})=>({id: account_id, balance})))
.groupBy(accountInfo => accountInfo.id)
.flatMap(accountInfo$ => accountInfo$.scan((acc, info) => Object.assign(acc, info), {})
.filter(accountInfo => accountInfo.name && accountInfo.balance)
.take(1)
).subscribe(console.log)
待办事项你想要结果,一旦两个流都结束或建立它们随着时间的推移? – Maxime
如果您要发布示例代码,这将有助于更好地理解问题。如前所述,目前还不清楚为什么不直接在数组上进行连接,而不是从流内部进行连接。但无论如何,是的,可以做你提到的加入。参看http://*.com/questions/43332674/rxjs-to-combine-attributes-from-triples-to-a-table/43333630#43333630其中显示了如何迭代构建一个结构 – user3743222
@Maxime,在我的情况下完成后,但是如果在流程中有办法做到这一点,那么也可以使用 – amd