将无限流的无限流转换为无限流 - 活性X

将无限流的无限流转换为无限流 - 活性X

问题描述:

如何实现活性x(理想情况下是RxJava或RxJs中的示例)?将无限流的无限流转换为无限流 - 活性X

a |-a-------------------a-----------a-----------a---- 
s1 |-x-x-x-x-x-x -| (subscribe) 
s2      |-x-x-x-x-x-| (subscribe) 
s2            |-x-x-x-x-x-| (subscribe) 
... 
sn 
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe) 

a是触发事件的每一个应为无穷流S的一部分的有限流sn同时能够订阅每个sn流(为了做到求和操作)事件的无限流,但在同时保持流S为无限。

编辑:为了更具体我提供了我在Kotlin寻找的实现。 每10秒发射一个事件,映射到4个事件的共享有限流。元流是flatMap - 进入正常的无限流。我利用doAfterNext额外订阅每个有限流并打印出结果。

/** Creates a finite stream with events 
* $ch-1 - $ch-4 
*/ 
fun createFinite(ch: Char): Observable<String> = 
     Observable.interval(1, TimeUnit.SECONDS) 
       .take(4) 
       .map({ "$ch-$it" }).share() 

fun main(args: Array<String>) { 

    var ch = 'A' 

    Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
      .map { createFinite(ch++) } 
      .doAfterNext { 
       it 
         .count() 
         .subscribe({ c -> println("I am done. Total event count is $c") }) 
      } 
      .flatMap { it } 
      .subscribe { println("Just received [$it] from the infinite stream ") } 

    // Let main thread wait forever 
    CountDownLatch(1).await() 
} 

但是我不确定这是否是'纯粹的RX'方式。

+1

这看起来像'concatMap',但从问题如何将每个事件映射到一组N个内部源的问题还不清楚。 – akarnokd

+1

也许增加一个你迄今为止尝试过的例子,那会让我们更清楚你想要完成什么。 – paulpdaniels

+0

http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - 我在阅读标题 – inf

你不清楚你想如何计算。如果你正在做一个总数,那么就没有必要做内部认购:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it } 
     .doOnNext(counter.incrementAndget()) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

在另一方面,如果你需要为每个中间观察到的一个数,那么你可以在里面移动计数在flatMap()并打印出计数和复位它完成:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it 
        .doOnNext(counter.incrementAndget() 
        .doOnCompleted({ long ctr = counter.getAndSet(0) 
             println("I am done. Total event count is $ctr") 
            }) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

这是不是很实用,但是这种报告往往打破正常流。