将无限流的无限流转换为无限流 - 活性X
问题描述:
如何实现活性x(理想情况下是RxJava或RxJs中的示例)?将无限流的无限流转换为无限流 - 活性X
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
是触发事件的每一个应为无穷流S
的一部分的有限流sn
同时能够订阅每个sn
流(为了做到求和操作)事件的无限流,但在同时保持流S
为无限。
编辑:为了更具体我提供了我在Kotlin寻找的实现。 每10秒发射一个事件,映射到4个事件的共享有限流。元流是flatMap
- 进入正常的无限流。我利用doAfterNext
额外订阅每个有限流并打印出结果。
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
但是我不确定这是否是'纯粹的RX'方式。
答
你不清楚你想如何计算。如果你正在做一个总数,那么就没有必要做内部认购:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it }
.doOnNext(counter.incrementAndget())
.subscribe { println("Just received [$it] from the infinite stream ") }
在另一方面,如果你需要为每个中间观察到的一个数,那么你可以在里面移动计数在flatMap()
并打印出计数和复位它完成:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it
.doOnNext(counter.incrementAndget()
.doOnCompleted({ long ctr = counter.getAndSet(0)
println("I am done. Total event count is $ctr")
})
.subscribe { println("Just received [$it] from the infinite stream ") }
这是不是很实用,但是这种报告往往打破正常流。
这看起来像'concatMap',但从问题如何将每个事件映射到一组N个内部源的问题还不清楚。 – akarnokd
也许增加一个你迄今为止尝试过的例子,那会让我们更清楚你想要完成什么。 – paulpdaniels
http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - 我在阅读标题 – inf