积累和重置数据流中的值
我在使用RxJS进行无功编程,偶然发现了一些我不确定如何解决的问题。积累和重置数据流中的值
比方说,我们实施自动售货机。您插入一枚硬币,选择一个项目,然后机器分配一个项目并返回更改。我们假定价格总是1美分,因此插入一个季度(25美分)应该返还24美分,依此类推。
“棘手”的部分是,我希望能够处理用户插入2个硬币,然后选择一个项目的情况。或者选择一个物品而不插入硬币。
将插入的硬币和选定项目实现为流似乎很自然。然后,我们可以在这两个操作之间引入某种依赖关系 - 合并或压缩或组合最新的操作。
但是,我很快就遇到了一个问题,我希望硬币能积累起来,直到物品被分配,但没有进一步分配。 AFAIU,这意味着我不能使用sum
或scan
,因为在某个时刻没有办法“重置”以前的积累。
下面是一个例子图:
coins: ---25---5-----10------------|->
acc: ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->
和对应的代码:
this.getCoinsStream()
.scan(function(sum, current) { return sum + current })
.combineLatest(this.getSelectedItemsStream())
.subscribe(function(cents, item) {
dispenseItem(item);
dispenseChange(cents - 1);
});
25和5美分插入并然后选择 “foo” 的项目。积累硬币,然后结合最新的将导致“foo”与“30”(这是正确的)相结合,然后是“bar”与“40”(这是不正确的;应该是“bar”和“10”)。
我查看了all of the methods进行分组和筛选,没有看到任何可以使用的东西。
我可以使用的另一种解决方案是分别累积硬币。但这导致状态流的外面,我真的想避免:
var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
return centsDeposited += cents;
});
this.getSelectedItemsStream().subscribe(function(item) {
dispenseItem(item);
dispenseChange(centsDeposited - 1);
centsDeposited = 0;
});
此外,这不允许进行互相依赖的数据流,如等待硬币被插入,直到选定的动作可以返回一个项目。
我是否缺少已有的方法?实现这种目标的最佳方式是什么 - 直到需要将它们与另一个流合并为止,并且在第一个流中至少等待一个值,然后将它与第二个流中的值合并之前,才会累积值?
您可以使用您的扫描/ combineLatest方法,然后完成了first
随后进行了repeat
所以流,它“重新开始”的流,但您的观察员将无法看到它。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
.combineLatest(selectedStream,
function(coins, selection) {
return {coins : coins, selection : selection};
})
//Combine latest won't emit until both Observables have a value
//so you can safely get the first which will be the point that
//both Observables have emitted.
.first()
//First will complete the stream above so use repeat
//to resubscribe to the stream transparently
//You could also do this conditionally with while or doWhile
.repeat()
//If you only will subscribe once, then you won't need this but
//here I am showing how to do it with two subscribers
.publish();
//Dole out the change
dispenser.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange);
//Get the selection for dispensation
dispenser.pluck('selection').subscribe(dispense);
//Wire it up
dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
Ahha,所以诀窍是用“first()”结束流,然后用'repeat()'结束“resume”?有趣。但也有点混乱。我期望'repeat()'无限地重复上一个值(这是找到的第一个组合)。另一方面,如果重复重新开始整个流,那么这是有道理的。非常酷的方法,而且根本不明显:)谢谢。 – kangax
正确。一种想法是,每次购买汽水时,你实际上都开始了一项新的交易(在购买新的硬币之前不会完成新的硬币)。您真的只对第一时间感兴趣,因为用户已经推动了选择并付款。之后,你想“重复”或重新开始一个新的交易(注意,如果这很让人困惑,你也可以看看'while'和'doWhile',它们做类似的事情,可能更熟悉) – paulpdaniels
一般来说有以下方程组:
inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item
难的是coins_in_machine
。您需要根据两个来源的排放来切换源观测值。
function emits (who) {
return function (x) { console.log([who, ": "].join(" ") + x);};
}
function sum (a, b) {return a + b;}
var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function (x) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function (x) {return "snickers";});
console.log("running");
var accumulated_coins = inserted_coins.scan(sum);
var coins_in_machine =
Rx.Observable.merge(
items.tap(emits("items")).map(function (x) {return {value : x, flag : 1};}),
inserted_coins.tap(emits("coins inserted ")).map(function (x) {return {value : x, flag : 0};}))
.distinctUntilChanged(function(x){return x.flag;})
.flatMapLatest(function (x) {
switch (x.flag) {
case 1 :
return Rx.Observable.just(0);
case 0 :
return inserted_coins.scan(sum, x.value).startWith(x.value);
}
}
).startWith(0);
coins_in_machine.subscribe(emits("coins in machine"));
jsbin:http://jsbin.com/mejoneteyo/edit?html,js,console,output
[更新]
说明:
- 我们合并insert_coins流和项目流,而一个标志附加到他们知道哪一个当我们在合并流中收到一个值时发出的两个数字
- 当它是物流发射时,我们想把0放在
coins_in_machine
。当它是insert_coins
时,我们希望对输入值进行求和,因为这个和会代表机器中新的硬币数量。这意味着insert_coins
的定义在之前定义的逻辑下从一个流切换到另一个流。该逻辑是在switchMapLatest
中实施的。 - 我使用
switchMapLatest
而不是switchMap
,否则coins_in_machine
流将继续接收来自前转换流的发射,即重复发射,因为最后只有两个流进出我们切换。如果可以的话,我会说这是一个接近并转换我们需要的东西。 -
switchMapLatest
必须返回一个流,所以我们跳火圈,使发射0和never
两端流(并且不阻止计算机,作为使用repeat
运营商将在这种情况下) - 我们通过一些跳额外的箍让
inserted_coins
发出我们想要的值。我的第一个实施是inserted_coins.scan(sum,0)
,并且从来没有工作。关键和我发现非常棘手的是,当我们到达流程中的那一点时,inserted_coins
已经发出了其中一个值,这是总和的一部分。该值是作为flatMapLatest
的一个参数传递的值,但它不再在源中,因此在获得该值之后调用scan
,因此需要从flatMapLatest
中获取该值并重新构建正确的行为。
大问题和有趣的解决方案,谢谢!因此,使用'distinctUntilChanged'并通过标志区分流是一种选择。但该死......这真的很难理解:/ – kangax
增加了一些解释。是的,这不是微不足道的,花了几个小时才能找出我最初的幼稚执行过程中出了什么问题,但在处理热的高阶可观察事件时要小心排放时间的见解是一个很好的方法。 – user3743222
不知道我为什么需要'never()。startWith()',在这种情况下,'just'应该做同样的事情,因为下游永远不会“看到”中间流的完成。 – paulpdaniels
您还可以使用Window组合在一起的多个硬币事件,并使用项目选择作为窗口边界。
接下来我们可以使用zip来获取项目值。
通知我们立即尝试发出物品。因此用户在决定物品之前必须插入硬币。
注意,为了安全起见,我决定发布selectedStream
和dispenser
,我们不想引发竞争条件,在构建查询时事件触发,并且拉链变得不平衡。这将是一种非常罕见的情况,但请注意,当我们的消息来源是冷观察者时,他们几乎一开始订阅就会开始生成,我们必须使用Publish
来保护自己。
(无耻窃取paulpdaniels示例代码)。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
// Build the query.
var dispenser = Rx.Observable.zip(
coinStream
.window(selectedStream)
.flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
selectedStream,
(coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();
var dispose = new Rx.CompositeDisposable(
//Dole out the change
dispenser
.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange),
//Get the selection for dispensation
dispenser
.pluck('selection')
.subscribe(dispense),
//Wire it up
dispenser.connect(),
selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
我不知道,但你可以看看http://jsbin.com/vimito/edit?js,console – xgrommx
我忘了加盟模式的http:// jsbin .com/hikata/3 /编辑?js,控制台,输出或合并/地图/扫描http://jsbin.com/nuvuka/edit?js,console,output – xgrommx
抱歉打扰你,但你可以请看看在https://*.com/questions/44434944。任何帮助,将不胜感激。 –