根据特定条件在RxJs中加入两个可观察对象流

问题描述:

我有两个对象流,即帐户和余额。根据特定条件在RxJs中加入两个可观察对象流

我需要合并(加入)两个流根据idaccount_id

var accounts = Rx.Observable.from([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.from([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

什么是预期:

var results = [ 
    { id: 1, name: 'account 1', balance: 100}, 
    { id: 2, name: 'account 2', balance: 200}, 
    { id: 3, name: 'account 3', balance: 300}, 
]; 

是与RxJs这可行吗?

要清楚我知道如何用普通的js/lodash或类似的东西来做到这一点。在我的情况下,我从Angular Http Module获得这些流,所以我问在这种情况下我是否可以从RxJs中受益

+1

待办事项你想要结果,一旦两个流都结束或建立它们随着时间的推移? – Maxime

+0

如果您要发布示例代码,这将有助于更好地理解问题。如前所述,目前还不清楚为什么不直接在数组上进行连接,而不是从流内部进行连接。但无论如何,是的,可以做你提到的加入。参看http://*.com/questions/43332674/rxjs-to-combine-attributes-from-triples-to-a-table/43333630#43333630其中显示了如何迭代构建一个结构 – user3743222

+0

@Maxime,在我的情况下完成后,但是如果在流程中有办法做到这一点,那么也可以使用 – amd

对于您的某条评论,您的示例是模拟来自Angular Http调用的流。

所以不是:

var accounts = Rx.Observable.from([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.from([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

我宁愿说它是:

var accounts = Rx.Observable.of([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' }, 
]); 

var balances = Rx.Observable.of([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 }, 
]); 

为什么:from将逐一发出的每个项目,of会散发出整个数组我猜你的http响应是整个阵列。

这就是说,你可能想实现的是:

const { Observable } = Rx; 

// simulate HTTP requests 
const accounts$ = Rx.Observable.of([ 
    { id: 1, name: 'account 1' }, 
    { id: 2, name: 'account 2' }, 
    { id: 3, name: 'account 3' } 
]); 

const balances$ = Rx.Observable.of([ 
    { account_id: 1, balance: 100 }, 
    { account_id: 2, balance: 200 }, 
    { account_id: 3, balance: 300 } 
]); 

// utils 
const joinArrays = (accounts, balances) => 
    accounts 
    .map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance })); 

const findBalanceByAccountId = (balances, id) => 
    balances.find(balance => balance.account_id === id) || { balance: 0 }; 

const print = (obj) => JSON.stringify(obj, null, 2) 

// use forkJoin to start both observables at the same time and not wait between every request 
Observable 
    .forkJoin(accounts$, balances$) 
    .map(([accounts, balances]) => joinArrays(accounts, balances)) 
    .do(rslt => console.log(print(rslt))) 
    .subscribe(); 

输出:

[ 
    { 
    "id": 1, 
    "name": "account 1", 
    "balance": 100 
    }, 
    { 
    "id": 2, 
    "name": "account 2", 
    "balance": 200 
    }, 
    { 
    "id": 3, 
    "name": "account 3", 
    "balance": 300 
    } 
] 

这里有一个工作Plunkr:https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

编辑1: 在数组上编写结果的工作可能不是性能的最佳想法,而不是返回数组,可能尝试返回一个具有该键的帐户ID。这样,你可以简单地删除findBalanceByAccountId功能,并具有更快的应用程序(这里只修改后的代码)

const balances$ = Rx.Observable.of({ 
    1: { account_id: 1, balance: 100 }, 
    2: { account_id: 2, balance: 200 }, 
    3: { account_id: 3, balance: 300 } 
}); 

// utils 
const joinArrays = (accounts, balances) => 
    accounts 
    .map(account => Object.assign(
     {}, 
     account, 
     { balance: balances[account.id].balance } 
    )); 
+0

谢谢,但不幸的是,你的假设是不正确的:s,实际上,我将数据视为流的对象'Observable '而不是单个数组的流'可观察' – amd

可以使用的GroupBy操作者ID加入两个项目:

Rx.Observable.merge(accounts, balances.map(({account_id, balance})=>({id: account_id, balance}))) 
.groupBy(accountInfo => accountInfo.id) 
.flatMap(accountInfo$ => accountInfo$.scan((acc, info) => Object.assign(acc, info), {}) 
    .filter(accountInfo => accountInfo.name && accountInfo.balance) 
    .take(1) 
).subscribe(console.log)