基于MongoDB分布式存储进行MapReduce并行查询

之前的文章中介绍了如何基于Mongodb进行关系型数据的分布式存储,有了存储就会牵扯到查询。虽然用普通的方式也可以进行查询,但今天要介绍的是如何使用MONGODB中提供的MapReduce功能进行查询。
有关MongoDb的MapReduce之前我写过一篇文章 Mongodb Mapreduce 初窥

今天介绍如何基于sharding机制进行mapreduce查询。在MongoDB的官方文档中,这么一句话:

ShardedEnvironments
Inshardedenvironments,dataprocessingofmap/reduceoperationsrunsinparallelonallshards.


即: map/reduce操作会并行运行在所有的shards上。
下面我们就用之前这篇文章中白搭建的环境来构造mapreduce查询:

首先要说的是,基于sharding的mapreduce与非sharding的数据在返回结构上有一些区别,我目前注意到的主要是不支持定制式的json格式的返回数据,也就是下面方式可能会出现问题:

return{count:total};


注意:上面的情况目前出现在了我的测试环境下,如下图:

基于MongoDB分布式存储进行MapReduce并行查询

就需要改成 return count;

下面是测试代码,首先是按帖子id来查询相应数量(基于分组查询实例方式):

基于MongoDB分布式存储进行MapReduce并行查询基于MongoDB分布式存储进行MapReduce并行查询
publicpartialclassgetfile:System.Web.UI.Page
{

publicMongoMongo{get;set;}


publicIMongoDatabaseDB
{
get
{
returnthis.Mongo["dnt_mongodb"];
}
}

///<summary>
///Setsupthetestenvironment.YoucaneitheroverridethisOnInittoaddcustominitialization.
///</summary>
publicvirtualvoidInit()
{
stringConnectionString="Server=10.0.4.85:27017;ConnectTimeout=30000;ConnectionLifetime=300000;MinimumPoolSize=512;MaximumPoolSize=51200;Pooled=true";
if(String.IsNullOrEmpty(ConnectionString))
thrownewArgumentNullException("Connectionstringnotfound.");
this.Mongo=newMongo(ConnectionString);
this.Mongo.Connect();
}
stringmapfunction="function(){\n"+
"if(this._id=='548111'){emit(this._id,1);}\n"+
"};";

stringreducefunction="function(key,current){"+
"varcount=0;"+
"for(variincurrent){"+
"count+=current[i];"+
"}"+
"returncount;\n"+
"};";


protectedvoidPage_Load(objectsender,EventArgse)
{
Init();

varmrb
=DB["posts1"].MapReduce();//attach_gfstream.files
intgroupCount=0;
using(varmr=mrb.Map(mapfunction).Reduce(reducefunction))
{
foreach(Documentdocinmr.Documents)
{
groupCount
=int.Parse(doc["value"].ToString());
}
}
this.Mongo.Disconnect();
}
}


下面是运行时的查询结果,如下:

基于MongoDB分布式存储进行MapReduce并行查询


接着演示一下如何把查询到的帖子信息返回并装入list集合,这里只查询ID为548110和548111两个帖子:

基于MongoDB分布式存储进行MapReduce并行查询基于MongoDB分布式存储进行MapReduce并行查询
stringmapfunction="function(){\n"+
"if(this._id=='548110'||this._id=='548111'){emit(this,1);}\n"+
"};";

stringreducefunction="function(doc,current){"+
"returndoc;\n"+
"};";

protectedvoidPage_Load(objectsender,EventArgse)
{
Init();

varmrb
=DB["posts1"].MapReduce();//attach_gfstream.files
List<Document>postDoc=newList<Document>();
using(varmr=mrb.Map(mapfunction).Reduce(reducefunction))
{
foreach(Documentdocinmr.Documents)
{
postDoc.Add((Document)doc[
"value"]);
}
}
this.Mongo.Disconnect();
}


下面是运行时的查询结果,如下:

基于MongoDB分布式存储进行MapReduce并行查询

上面的map/reduce方法还有许多写法,如果大家感兴趣可以看一下如下这些链接:
http://cookbook.mongodb.org/patterns/unique_items_map_reduce/
http://www.mongodb.org/display/DOCS/MapReduce

以及之前我写的这篇文章:http://www.cnblogs.com/daizhj/archive/2010/06/10/1755761.html


当然在mongos进行map/reduce运算时,会生成一些临时文件,如下图:
基于MongoDB分布式存储进行MapReduce并行查询

我猜这些临时文件可能会对再次查询系统时的性能有一些提升(但目前未观察到)。

当然对于mongodb的gridfs系统(可使用它搭建分布式文件存储系统,我之前在这篇文章中已介绍过,我也做了测试,但遗憾的是并未成功,它经常会报一些错误,比如:

ThuSep0912:09:29Assertionfailure_grabclient\parallel.cpp461


看来mapreduce程序链接到mongodb上时,会产生一些问题,但不知道是不是其自身稳定性的原因,还是我的机器环境设置问题(内存或配置的64位系统mongos与32位的client连接问题)。

好了,今天的文章就先到这里了。


原文链接:http://www.cnblogs.com/daizhj/archive/2010/09/09/1822264.html

BLOG: http://daizhj.cnblogs.com/

作者:daizhj,代震军