如何在使用渠道的golang流水线阶段批量处理项目?
问题描述:
我读管道在线教程,并试图构建经营这样一个阶段 - 将它们发送到了陈 如何在使用渠道的golang流水线阶段批量处理项目?
- 批了传入事件中的10个批次的每个在5秒内没有看到10个事件,将我们收到的数量合并并发送给他们,关闭出局并返回。
但是,我不知道第一个选择案例会是什么样子。尝试了很多事情,但无法通过这个。 任何指针非常感谢!
func BatchEvents(inChan <- chan *Event) <- chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for event := range inChan {
select {
case -WHAT GOES HERE?-:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++;
} else {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i=0;
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
out <- &comboEvent
// stop after
return
}
}
}()
return out
}
答
而不是做一个范围在通道中,你的第一选择的情况下应该是从该通道,具有无限循环中的整个事情。
func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for {
select {
case event, ok := <-inChan:
if !ok {
return
}
comboEvent.data = append(comboEvent.data, event.data)
i++
if i == batchSize {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i = 0
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
if i > 0 {
out <- &comboEvent
}
// stop after
return
}
}
}()
return out
}
请注意,'select's是__NOT__按优先级排序。你必须人为地产生一组优先选择。请参阅https://*.com/questions/11117382/priority-in-go-select-statement-workaround – RayfenWindspear