一种更好的方式,从一个可观察到另一个

一种更好的方式,从一个可观察到另一个

问题描述:

我已经创建了测试Observables两个函数传递数据,每个返回Observable:这样一种更好的方式,从一个可观察到另一个

foo() { 
    return new Observable(observer => { 
    let i = 0; 
    setInterval(() => { 
     if(i === 10) { 
     observer.complete(); 
     } else { 
     observer.next(i); 
     i++; 
     } 
    }, 1000); 
    // If I call observer.complete() here then it never completes 
    }); 
} 

bar(fooResult) { 
    return new Observable(observer => { 
    let j = 0; 
    setInterval(() => { 
     if(fooResult) { 
     observer.next('j -> '+j+' plus fooResult '+JSON.stringify(fooResult)); 
     observer.complete(); 
     } else { 
     observer.next(j); 
     j++; 
     } 
    }, 2000); 
    }); 
} 

,并利用其中的一部分:

let fooResult = []; 

    // Testing observables... 
    this.exampleProductService.foo().subscribe(
     (res) => { 
     console.log('foo next() -> '+res); 
     fooResult.push(res); 
     }, 
     (err) => { 
     console.error('foo error: '+JSON.stringify(err)); 
     }, 
    () => { 
     console.log('foo finished'); 
     this.exampleProductService.bar(fooResult).subscribe(
      (res) => { 
      console.log('bar next() -> '+res); 
      }, 
      (err) => { 
      console.error('bar error: '+JSON.stringify(err)); 
      }, 
     () => { 
      console.log('bar finished'); 
      } 
     ); 
     } 
    ); 

提出问题的人:

  1. 有没有更好的方法来传递数据完成另一个函数的Observable,该函数也返回一个Observable?建立一个数组似乎麻烦,我不能做以下为Observablecomplete callback部分不传递参数一样progressUpdateonError

    (complete) => { this.exampleProductService.bar(complete).// rest of code } 
    
  2. 我试图指派第一功能的结果一个变量,然后传递该变量,但如预期的那样,我得到了一个Observable,而不是我想要的结果。

  3. 关于我如何进行上述操作,有什么不正确的吗?

感谢

附:这是一个Angular 2应用程序!

我认为你的功能有点过于复杂。例如,在工厂功能已经可用的情况下,不要使用构造函数,在这种情况下,请指定intervaltimer,尽管在这种情况下它会更加冗长。

其次,bar函数并没有什么实际意义,因为您似乎正在等待您已经完成的某些已完成的任务(因为您没有订阅它,直到完成由foo生成的Observable)。

我重构根据您陈述的目标等待一个Observable完成之前开始第二个,同时使用第二个第一个的结果。

// Factory function to emit 10 items, 1 every second 
function foo() { 
    return Observable.interval(1000) 
    .take(10); 
} 

// Lifts the passed in value into an Observable and stringfys it 
function bar(fooResult) { 
    return Rx.Observable.of(fooResult) 
    .map(res => JSON.stringify(fooResult)) 
} 

现在使用它们时,你会怎么做,而不是:

上面我
foo() 
    // Log the side effects of foo 
    .do(
    x => console.log(`foo next() -> ${x}`), 
    err => console.error(`foo error: ${JSON.stringify(err)}`), 
    () => console.log('foo finished') 
) 
    // Collect the results from foo and only emit when foo completes 
    .reduce((total, diff) => [...total, diff], []) 

    // Pass the result from the reduce on to bar 
    .concatMap(fooResult => bar(fooResult)) 

    //Subscribe to the results of bar 
    .subscribe(
    res => console.log(`bar next() -> ${res}`), 
    err => console.error(`bar error: ${JSON.stringify(err)}`), 
    () => console.log('bar finished') 
); 

通知我也摆脱全球性的状态,这是函数式编程的诅咒的。哪里有可能你的状态应该被本地化到流中。

看到这里的工作示例:http://jsbin.com/pexayohoho/1/edit?js,console

+0

感谢feedbacl保罗。在这种情况下,您如何特别摆脱全球状态? – Katana24

+0

另外 - 你能解释一下这条线多一点吗? .reduce((total,diff)=> [... total,diff],[])从文档中,它充当Observable上的累加器,并在我完成时返回一个值。我明白,但可选的种子提供(差异)是什么,以及[...总,差异]是什么意思 - 我以前从来没有见过这样的:全局状态 – Katana24

+1

,不再有一个全局数组叫做' fooResult'。 re:语法,'[... total,diff]'是'total.concat([diff])'的ES6短手,基本上它会创建一个新的数组副本加上新的项目以避免变化现有的项目。 'diff'是进入的新值,而不是可选的种子,它是作为一个空数组'[]'提供的。这是否回答你的问题? – paulpdaniels

我不确定你想要达到什么效果,但这里有一个jsbin that试图复制你的代码。

几点需要注意:

您FOO()是非常容易使用Observable.timer和。取()操作符来创建。您的酒吧()可以由另一个计时器与地图和.takeWhile()运算符一起使用。

至于最后一次订阅(完整部分),它只会打印'foo finished',但没有别的,因为它订阅了一个已经终止的非缓冲序列。

+0

感谢您的反馈梅厄 - 我试图实现是通过从一个可观察到另一个输出的方式。我喜欢你的例子中foo部分的简洁性 – Katana24