HDFS、MapReduce设计概念、基础架构、Python代码实现、常用命令(三)

咱们来看看对特别大的文件统计,整个过程是如何分拆的。
大家想想词频统计的过程,如果是单机完成,我们需要做的事情是维护一个计数器字典,对每次出现的词,词频+1.但是当数据量非常大的时候,没办法在内存中维护这么大的一个字典,我们就要换一种思路来完成这个任务了,也就是我们所谓的map-reduce过程。

大体的过程画成图是下面这个样子:
HDFS、MapReduce设计概念、基础架构、Python代码实现、常用命令(三)
大概是分成下面几个环节:

  1. map阶段
    主要完成key-value对生成,这里是每看到一个单词,就输出(单词,1)的kv对
  2. 排序阶段
    对刚才的kv对进行排序,这样相同单词就在一块儿了
  3. Reduce阶段
    对同一个单词的次数进行汇总,得到(词,频次)对
    Map阶段代码
    流程大家都看清楚啦,咱们来看看用代码如何实现,你猜怎么着,有了hadoop streaming,咱们可以用python脚本完成map和reduce的过程,然后把整个流程跑起来!

比如咱们map阶段要做的就是把每一个单词和出现1次的信息输出来!所以我们写一个mapper.py文件,具体内容如下:

#coding: utf-8
#!/usr/bin/env python

import sys

# 从标准输入过来的数据
for line in sys.stdin:
    # 把首位的空格去掉
    line = line.strip()
    # 把这一行文本切分成单词(按照空格)
    words = line.split()
    # 对见到的单词进行次数标注(出现1次)
    for word in words:
        print '%s\t%s' % (word, 1)

排序阶段
中间会有一个对上述结果进行排序的过程,以保证所有相同的单词都在一起,不过不用担心,这个过程是系统会自动完成的,因此不用我们编写额外的代码。
Reduce阶段
接下来就是对map排序后的结果进行汇总了,这个阶段我们可以用一个reducer.py的python脚本来完成,具体完成的任务,就是:

对于读入的(单词,1)对

如果这个单词还没有结束(排序后所有相同的单词都在一起了),我们就对单词的次数+1
如果遇到新单词了,那重新开始对新单词计数
基于上面的想法,我们可以完成以下的reducer.py脚本:

#coding: utf-8
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# 依旧是标准输入过来的数据
for line in sys.stdin:
    # 去除左右空格
    line = line.strip()

    # 按照tab键对行切分,得到word和次数1
    word, count = line.split('\t', 1)

    # 你得到的1是一个字符串,需要对它进行类型转化
    try:
        count = int(count)
    except ValueError:
        #如果不能转成数字,输入有问题,调到下一行
        continue

    # 如果本次读取的单词和上一次一样,对次数加1
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # 输出统计结果
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# 不要忘了最后一个词哦,也得输出结果
if current_word == word:
    print '%s\t%s' % (current_word, current_count)