从方法返回BlockingCollection作为IEnumerable

问题描述:

我想从BlockingCollection支持的方法返回IEnumerable。代码模式是:从方法返回BlockingCollection作为IEnumerable

public IEnumerable<T> Execute() { 
    var results = new BlockingCollection<T>(10); 
    _ExecuteLoad(results); 
    return results.GetConsumingEnumerable(); 
} 

private void _ExecuteLoad<T>(BlockingCollection<T> results) { 
    var loadTask = Task.Factory.StartNew(() => 
    { 
     //some async code that adds items to results 
     results.CompleteAdding(); 
    }); 
} 

public void Consumer() { 
    var count = Execute().Count(); 
} 

问题是从Execute()返回的枚举总是空的。我看到的所有例子都在Task中迭代BlockingCollection。这似乎在这种情况下不起作用。

有没有人知道我要去哪里错了?


为了让事情更清楚些,我粘贴了我正在执行的代码来填充集合。也许有什么导致这个问题在这里?

Task.Factory.StartNew(() => 
{ 
    var continuationRowKey = ""; 
    var continuationParitionKey = ""; 
    var action = HttpMethod.Get; 
    var queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout); 
    while (true) 
    { 
     using (var request = GetRequest(queryUri, null, action.Method, azureAccountName, azureAccountKey)) 
     { 
      request.Method = action; 
      request.RequestUri = queryUri; 

      using (var client = new HttpClient()) 
      { 
       var sendTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); 
       using (var response = sendTask.Result) 
       { 
        continuationParitionKey = // stuff from headers 
        continuationRowKey = // stuff from headers 

        var streamTask = response.Content.ReadAsStreamAsync(); 
        using (var stream = streamTask.Result) 
        { 
         using (var reader = XmlReader.Create(stream)) 
         { 
          while (reader.Read()) 
          { 
           if (reader.NodeType == XmlNodeType.Element && reader.Name == "entry" && reader.NamespaceURI == "http://www.w3.org/2005/Atom") 
           { 
            results.Add(XNode.ReadFrom(reader) as XElement); 
           } 
          } 
          reader.Close(); 
         } 
        } 
       } 
      } 

      if (continuationParitionKey == null && continuationRowKey == null) 
       break; 

      queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout); 
     } 
    } 
    results.CompleteAdding(); 
}); 
+1

它并不能解决问题,但如果你只是直接使用Result来调用'... Async()'方法的话就没有多少意义。 – porges 2012-02-22 08:33:02

+0

你的代码原始代码适合我。你有没有试过在你的'Task'中调试代码,看看它为什么从不调用'Add()'?因为这是最可能的解释。 – svick 2012-02-22 11:45:36

当您完成向集合添加项目时,您需要致电results.CompleteAdding()

如果你不这样做,枚举将永远不会结束,Count()将永远不会返回。

除此之外,您发布的代码是正确的。

+0

我应该补充一点。问题是该方法立即返回,并且枚举总是空的。 – 2012-02-22 05:32:06

+0

'Count()'将阻塞,直到枚举完成。你有没有尝试在'CompleteAdding()'上放置一个断点? – 2012-02-22 06:11:53

+0

感谢您的回答。事实证明,我有一个无关的错误,导致CompleteAdding不被调用。谢谢! – 2012-02-22 19:00:43