带循环链接的TPL块?

问题描述:

我使用TPL很多,并有大量数据流管道结构。
作为管道网络的一部分,我想将一些数据写入azure blob存储。我们有很多数据,因此我们有4存储帐户,我们希望在它们之间均匀分配数据。带循环链接的TPL块?

希望继续使用数据流管道模式,因此我想实现一个SourceBlock,如果我将它链接到几个目标模块,它将使用循环法将消息发送给它们。 BufferBlock不够好,因为他正在将消息发送到接受它的第一个块,并假设所有目标块都具有大的有界容量 - 所有消息都将转到第一个目标块。 BroadcastBlock也不好,因为我不想重复。

有什么建议吗?实现与圆形robing行为的界面似乎并不那么简单,我想知道是否有更简单的解决方案呢?或者我不熟悉TPL的任何扩展?

您知道有可能性link the blocks with a predicate?这是样品非常简单,没有得到很好的测试的解决方案:

var buffer = new BufferBlock<int>(); 
var consumer1 = new ActionBlock<int>(i => Console.WriteLine($"First: {i}")); 
var consumer2 = new ActionBlock<int>(i => Console.WriteLine($"Second: {i}")); 
var consumer3 = new ActionBlock<int>(i => Console.WriteLine($"Third: {i}")); 
var consumer4 = new ActionBlock<int>(i => Console.WriteLine($"Forth: {i}")); 

buffer.LinkTo(consumer1, i => Predicate(0)); 
buffer.LinkTo(consumer2, i => Predicate(1)); 
buffer.LinkTo(consumer3, i => Predicate(2)); 
buffer.LinkTo(consumer4, i => Predicate(3)); 
buffer.LinkTo(DataflowBlock.NullTarget<int>()); 

for (var i = 0; i < 10; ++i) 
{ 
    buffer.Post(i); 
} 
buffer.Completion.Wait(); 

输出之一:

Third: 2 
First: 0 
Forth: 3 
Second: 1 
Second: 5 
Second: 9 
Third: 6 
Forth: 7 
First: 4 
First: 8 

这到底是怎么回事是你保持操作的数量,如果当前适合消费者,我们只是增加这一点。请注意,为避免内存问题,您仍应至少链接一次没有任何谓词的块(另外,最好使用监视丢失消息的块来测试循环法)。

+0

感谢VMAtm。我从来没有试过通过谓词,我从你的样本中学到了很多东西。仍然不确定它适用于我的情况。但也许我错过了一些东西,因为还没有完全理解它。所以有一个问题 - 你写了“*请注意,为了避免内存问题,你至少应该链接一个没有谓词的块。”你能解释一下你的记忆问题吗? – Zorik

+0

正如我所说的,这段代码没有经过适当的测试,也许有一种情况,那么所有的谓词都返回false。之后,该消息将保留在“BufferBlock”中,直到程序结束。为“坏”消息创建一个块以离开biffer是一种常见的做法。 'NullTarget'只是简单地忽略了这个消息,但是你可以使用任何你喜欢的块并记录所有的谓词缺失。 – VMAtm

+0

现在明白了,谢谢! – Zorik