阿卡流滑动窗口来控制减少发射由SourceQueue下沉
更新:我把我的问题在test project来解释我的意思详细阿卡流滑动窗口来控制减少发射由SourceQueue下沉
============== ================================================== =====
我有一个Akka源contiune从数据库表中读取,groupby一些关键然后减少它。但是,在我应用reduce函数后,似乎数据永远不会发送到sink,因为上游始终有数据到来,所以它会保持减少。
我读了一些文章,并尝试分组内嵌和滑动,但它不工作,因为我认为,它只将消息分组到较大的部分,但从来没有使上游暂停和发射沉没。以下是在阿卡流2.5.2
的源代码减少代码:
source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();
水槽和运行:
Sink<Object, CompletionStage<Done>> sink =
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);
我自己也尝试.takeWhile(断言);我使用定时器来切换谓词值true和false,但它似乎只会将第一个开关设置为false,当我切换回true时,它不会重新启动上游。
请提前帮助我,谢谢!
============================================== ===
更新有关的元素类型
添加
信息我想: 我有一流的呼叫
SystemCodeTracking
包含2个属性(id, entityName)
我有对象的列表:
(1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")
我想GROUPBY的entityName再总结的ID,因此,我希望看到的结果如下
("table1" 1+4),("table3", 3+5),("table2", 2)
我现在正在做的代码现在下面
source .groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName) .map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId())) .scan(....)
我的问题是更多的关于如何建立扫描inital状态 我应该怎么办?
scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))
所以,你想要什么,如果我明白了一切很好是:
- 第一,按ID
- 然后按时间窗口,该时间窗口内,总结所有的
systemCodeTracking.getId()
对于第一部分,您需要groupBy
。第二部分groupedWithin
。但是,它们的工作方式并不相同:第一个会给你子流,而第二个会给你一个列表流。
因此,我们必须以不同的方式处理它们。
首先,让我们写一个减速器为名单:
private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
if (list.isEmpty()) {
throw new Exception();
} else {
SystemCodeTracking building = list.get(0);
building.setId(0L);
list.forEach(next -> building.setId(building.getId() + next.getId()));
return building;
}
}
所以对于列表中的每个元素,我们增加building.id
获得当整个名单已经走过我们想要的值。
现在你只需要做
FiniteDuration sec = FiniteDuration.apply(1, TimeUnit.SECONDS)
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
.groupBy(20000, SystemCodeTracking::getEntityName) // group by name
.groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100)
.filterNot(List::isEmpty) // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
.map(this::reduceList) // reduce each list to sum the ids
.log("====== doing reduceing ") // log each passing element using akka logger, rather than `System.out.println`
.mergeSubstreams() // merge back all elements with different names
再一次,对于java风格感到遗憾,我真的更习惯于scala代码。 –
@Cynille真的很感谢你的回答,它效果很好。我在更多的java风格上对groupedWithin做了一些修改;-)。请接受它,我也会接受这个答案。唯一的问题是,我认为我不能使用原生减少功能来实现这一点。但无论如何谢谢你再次! – zt1983811
@Cynille为什么拒绝? .groupedWithin(100,sec)不会在java中编译,它应该是这个.groupedWithin(100,FiniteDuration.create(10,TimeUnit.SECONDS))我想。 – zt1983811
你为什么把'mapConcat'后您的分组流? –
因为groupedWithin和sliding会给我一个对象列表,但我只需要逐一减少它。我应该减少整个名单吗? – zt1983811
是的。您应该使用其中一个(滑动或分组的内向),然后减少每个列表元素。我相信你真正想要的是分组在内,它恰好是一个时间窗口(最大尺寸)。 –