RxJs笛卡尔积

RxJs笛卡尔积

问题描述:

动机:RxJs笛卡尔积

目标是写一个刮库,允许关系/归一化的数据的提取。

为此目的,有:

  • 源流:发射cheerio文件:要么通过抓取的URL或通过确定范围父cheerio文档(例如cheerio文件 - >多个cheerio文件,一个用于在每立亲本)
  • 表流:发射表的行,订阅源数据流的组合和从给定的文件cheerio

由于复杂的数据需要多个表中提取数据,每个源数据流可能订阅了多个需要时间,因为它包含的副作用(爬行)

例被共享:博客

在一个简单的博客有可能是帖子,作者和类别;基础数据结构是例如

Post: {id, author, title, text} 
Author: {id, name} 
Category: {id, name} 
author_post: {author_id, post_id} 
post_category: {post_id, category_id} 

要重新从刮HTML的数据结构,我们创建三个源流:

  1. 后:美联储发布网址,返回岗位cheerio文件
  2. post.author:孩子的帖子,按照指向作者的超链接发出作者的欢乐文档
  3. post.category:子帖子,为帖子中列出的每个类别返回一个cheerio文档(例如'.categories li')

要重新创建post_category表,每个帖子必须与属于该帖子的每个类别(== carthesian product)组合。

我的实际问题更加人为化,因为子流已经发出了他们自己的cheerio文档以及他们每个父母的文档,即{post:cheerio,author:cheerio}。

组合流的问题只有例如,兄弟姐妹。

我也不能通过发射父的所有儿童一个流的内部(例如{交,作者,类别})规避分组的问题,作为更复杂的数据结构需要由祖父母

(I分组可以提供一个例子,如果需要的话,但是这样做已经够长了)。

问题

这是不可能使用GROUPBY和zip结合热观测组。

groupBy发出的GroupObservables在创建后立即发出它们的值,并且zip等待所有压缩的observables发出GroupObservables意味着缺少zip函数运行之前发出的任何值。

问题

我如何组和zip热观测不失值?时间信息(例如,在下一个父母发出之前发出的所有孩子)不能被依赖,因为儿童可能异步解决(爬行)。

更多信息:

我可以想像它的最好的是这样的:

Child1和CHILD2映射父的版本,C1C2是分组Child1和CHILD2母公司的结果,计算笛卡尔这些组内的产品。

Parent: -1------2-------3-------- 

Child1: --a------b--c------------ 

Child2: ---1-2----3---4---------- 

C1C2: --a1-a2---b3-c3-b4-c4---- 

形成笛卡尔产品本身是没有问题的,因为一个简单的实现(从问题#807 RxJs服用,不能发布更多的链接)

function xprod (o1, o2) { 
    return o1.concatMap(x => o2, (x, y) => [x, y]); 
}; 

问题不遗漏任何值。

编辑jsbin显示了一个简单的情况:1父母,2孩子。

参考

+0

改进的描述,但是你仍然在我看来,太多的问题说明及实施细则混合('流可能订阅多次' ,父母,孩子,兄弟姐妹等)。我仍然不明白你想要的输出是什么。这是表流吗?这是您提供的基础数据结构吗? – user3743222

+0

在任何情况下,如果你有html文档(文章),并且你想要生成'id,author,title,text','id',你可能会生成'title'和'text'我想你没有问题得到解决。为了获得'author'(只有一个通过post right?),你需要获取其他一些html文档并提取作者信息。那是对的吗?这将处理底层数据结构中的第一个表。 “类别”一样,但有几个类别的邮寄。您只需在获取一些html文档后提取的每个类别。如果这就是这样,不需要笛卡尔产品。 – user3743222

+0

如果你确实想做'author'和'category'的笛卡尔积,我的答案仍然有效。稍后更多。请首先确认我的理解是正确的。 – user3743222

我想出了一个解决方案是一个天真的实现分组操作的返回ReplaySubjects。虽然该解决方案是专门针对我的要求,一般的要点可能会有所帮助:

Observable.prototype.splitBy = function(keySelector) { 
    const parentObservable = this; 
    let group, lastKey; 
    return Observable.create(observable => { 
    return parentObservable.subscribe(
     value => { 
     const currentKey = keySelector(value); 
     if(currentKey === lastKey) { 
      group.next(value); 
     } else { 
      if(group) group.complete(); 
      group = new ReplaySubject(); 
      observable.next(group); 
      group.key = currentKey; 
      group.next(value); 
     } 
     lastKey = currentKey; 
     }, 
     error => observable.error(error), 
     completed => { 
     group.complete(); 
     observable.complete(); 
     }); 
    }); 
}; 

我在这里添加其他答案,什么是我对你的问题的新认识。如果我误解了,我会稍后删除它。

我的理解是:

  • 存在父可观察到的,其产生的值
  • 那些值被用于通过child1生成另一个可观察到的,其代表从异步操作值的序列($。获得,或任何)
  • 相同的孩子2
  • 亲本的一个值,因此产生两个观测值,你想这两个观测值的笛卡尔乘积。

该代码应该做的:

var parent = Rx.Observable.interval(500) 
    .take(3) 
    .publish(); 
var Obs3Val = function (x, delay) { 
    return Rx.Observable.interval(delay) 
    .map(function(y){return "P"+x+":"+"C" + y;}) 
    .take(3); 
}; 
function emits(who){ 
    return function (x) {console.log(who + " emits " + x);}; 
} 

var child1 = parent.map(x => Obs3Val(x, 200)); 
var child2 = parent.map(x => Obs3Val(x, 100)); 

Rx.Observable.zip(child1, child2) 
      .concatMap(function(zipped){ 
    var o1 = zipped[0], o2 = zipped[1]; 
o1.subscribe(emits("child1")); 
o2.subscribe(emits("child2")); 
    return o1.concatMap(x => o2, (x, y) => [x, y]); 
}) 
    .subscribe(function(v) { 
       console.log('cartesian product : ' +v); 
       }); 

parent.subscribe(); 

parent.connect(); 

jsbin:https://jsbin.com/revurohoce/1/edit?js,consolehttps://jsbin.com/wegayacede/edit?js,console

登录:

"child2 emits P0:C0" 
"child1 emits P0:C0" 
"child2 emits P0:C1" 
"cartesian product : P0:C0,P0:C0" 
"child2 emits P0:C2" 
"child1 emits P0:C1" 
"cartesian product : P0:C0,P0:C1" 
"cartesian product : P0:C0,P0:C2" 
"child2 emits P1:C0" 
"child1 emits P0:C2" 
"cartesian product : P0:C1,P0:C0" 
"child1 emits P1:C0" 
"child2 emits P1:C1" 
"cartesian product : P0:C1,P0:C1" 
"child2 emits P1:C2" 
"cartesian product : P0:C1,P0:C2" 
"child1 emits P1:C1" 
"cartesian product : P0:C2,P0:C0" 
"cartesian product : P0:C2,P0:C1" 
"child1 emits P1:C2" 
"child2 emits P2:C0" 
"cartesian product : P0:C2,P0:C2" 
"child1 emits P2:C0" 
"child2 emits P2:C1" 
"child2 emits P2:C2" 
"child1 emits P2:C1" 
"cartesian product : P1:C0,P1:C0" 
"cartesian product : P1:C0,P1:C1" 
"child1 emits P2:C2" 
"cartesian product : P1:C0,P1:C2" 
"cartesian product : P1:C1,P1:C0" 
"cartesian product : P1:C1,P1:C1" 
"cartesian product : P1:C1,P1:C2" 
"cartesian product : P1:C2,P1:C0" 
"cartesian product : P1:C2,P1:C1" 
"cartesian product : P1:C2,P1:C2" 
"cartesian product : P2:C0,P2:C0" 
"cartesian product : P2:C0,P2:C1" 
"cartesian product : P2:C0,P2:C2" 
"cartesian product : P2:C1,P2:C0" 
"cartesian product : P2:C1,P2:C1" 
"cartesian product : P2:C1,P2:C2" 
"cartesian product : P2:C2,P2:C0" 
"cartesian product : P2:C2,P2:C1" 
"cartesian product : P2:C2,P2:C2" 
+0

嘿,感谢您的耐心。我已更新我的问题以提供更多信息 - 抱歉这么久。据我所知,你的代码工作,因为孩子是自主的observables而不是父亲observable的映射版本(请参阅我的示例jsbin链接) –