阿卡流滑动窗口来控制减少发射由SourceQueue下沉

问题描述:

更新:我把我的问题在test project来解释我的意思详细阿卡流滑动窗口来控制减少发射由SourceQueue下沉

============== ================================================== =====

我有一个Akka源contiune从数据库表中读取,groupby一些关键然后减少它。但是,在我应用reduce函数后,似乎数据永远不会发送到sink,因为上游始终有数据到来,所以它会保持减少。

我读了一些文章,并尝试分组内嵌和滑动,但它不工作,因为我认为,它只将消息分组到较大的部分,但从来没有使上游暂停和发射沉没。以下是在阿卡流2.5.2

的源代码减少代码:

source = source 
    .groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS)) 
    .sliding(3, 1) 
    .mapConcat(i -> i) 
    .mapConcat(i -> i) 
    .groupBy(2000000, i -> i.getEntityName()) 
    .map(i -> new Pair<>(i.getEntityName(), i)) 
    .reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;}) 
    .map(i -> i.second()) 
    .mergeSubstreams(); 

水槽和运行:

Sink<Object, CompletionStage<Done>> sink = 
     Sink.foreach(i -> System.out.println(i)) 
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left()); 
run.run(materIalizer); 

我自己也尝试.takeWhile(断言);我使用定时器来切换谓词值true和false,但它似乎只会将第一个开关设置为false,当我切换回true时,它不会重新启动上游。

请提前帮助我,谢谢!

============================================== ===

更新有关的元素类型

添加

信息我想: 我有一流的呼叫SystemCodeTracking包含2个属性(id, entityName)

我有对象的列表:(1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")

我想GROUPBY的entityName再总结的ID,因此,我希望看到的结果如下

("table1" 1+4),("table3", 3+5),("table2", 2) 

我现在正在做的代码现在下面

source 
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName) 
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId())) 
.scan(....) 

我的问题是更多的关于如何建立扫描inital状态 我应该怎么办?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId())) 
+1

你为什么把'mapConcat'后您的分组流? –

+0

因为groupedWithin和sliding会给我一个对象列表,但我只需要逐一减少它。我应该减少整个名单吗? – zt1983811

+1

是的。您应该使用其中一个(滑动或分组的内向),然后减少每个列表元素。我相信你真正想要的是分组在内,它恰好是一个时间窗口(最大尺寸)。 –

所以,你想要什么,如果我明白了一切很好是:

  • 第一,按ID
  • 然后按时间窗口,该时间窗口内,总结所有的systemCodeTracking.getId()

对于第一部分,您需要groupBy。第二部分groupedWithin。但是,它们的工作方式并不相同:第一个会给你子流,而第二个会给你一个列表流。

因此,我们必须以不同的方式处理它们。

首先,让我们写一个减速器为名单:

private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception { 
    if (list.isEmpty()) { 
     throw new Exception(); 
    } else { 
     SystemCodeTracking building = list.get(0); 
     building.setId(0L); 
     list.forEach(next -> building.setId(building.getId() + next.getId())); 
     return building; 
    } 
} 

所以对于列表中的每个元素,我们增加building.id获得当整个名单已经走过我们想要的值。

现在你只需要做

FiniteDuration sec = FiniteDuration.apply(1, TimeUnit.SECONDS) 
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source 
    .groupBy(20000, SystemCodeTracking::getEntityName) // group by name 
    .groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100) 
    .filterNot(List::isEmpty)       // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer) 
    .map(this::reduceList)        // reduce each list to sum the ids 
    .log("====== doing reduceing ")     // log each passing element using akka logger, rather than `System.out.println` 
    .mergeSubstreams()         // merge back all elements with different names 
+0

再一次,对于java风格感到遗憾,我真的更习惯于scala代码。 –

+0

@Cynille真的很感谢你的回答,它效果很好。我在更多的java风格上对groupedWithin做了一些修改;-)。请接受它,我也会接受这个答案。唯一的问题是,我认为我不能使用原生减少功能来实现这一点。但无论如何谢谢你再次! – zt1983811

+0

@Cynille为什么拒绝? .groupedWithin(100,sec)不会在java中编译,它应该是这个.groupedWithin(100,FiniteDuration.create(10,TimeUnit.SECONDS))我想。 – zt1983811