2013年1月21日 星期一

[hadoop]用純 python 寫 hadoop 的 map reduce 程式

好繞口的標題:)。hadoop 是 java 寫成的,但是並不是只能用 java 才能在 hadoop 裡面執行程式。但是第一個的觀念要有的是,程式的執行環境,就是在 java VM 裡面。所以,在 hadoop 文件裡介紹的 python 程式,要執行的時候,是用 jython 轉成 java jar,然後交由 hadoop framework 來執行。所以,以此類推,任何語言有能力轉成 java jar 的,應該都可以寫出程式讓 hadoop framework 執行。

回頭講 python。用慣 python 的人,應該是不太習慣 jython 的(我就是)。這兩個的確有些差別,讓我無法適應(ironpython 也是)。所以,Noll 先生就介紹了如何用純 python 在 hadoop framework 執行

他所利用的是,hadoop 的一個 Streaming API 收發串流資料的能力,且 hadoop 可存取 stdin 及 stdout 。因此把它們當做通道,把 hadoop framework 與 python 執行期結合起來。所以,接下來要注意的是,mapper.py、reducer.py 是放在本機環境,而資料是放在 hadoop 環境裡。稍不注意放錯位置就會無法執行。

準備環境:

  • hadoop 環境,照之前的文章,準備一個單機環境
  • 準備以下的程式:
    • mapper.py 放在 /home/hduser/mapper.py (記得 chmod +x)

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

    • reducer.py 放在 /home/hduser/reducer.py (記得 chmod +x)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

 

Noll 先生還很好心地建議,利用一點小手法測試一下 mapper.py 及 reducer.py,免得執行完不如預期,牽拖別人。

# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1

hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
The     1
Project 1
Gutenberg       1
EBook   1
of      1
[...]
(you get the idea)

好了,程式就這麼簡單。接下來要試著讓它跑。要準備材料。跟之前測試的一樣,到 Project Gutenberg 下載三本書,要下載文字版,UTF-8 編碼的檔案。(Plain Text UTF-8)

把文字檔放到 /tmp/gutenberg 目錄去。

然後,要把這三個檔案,放到 HDFS 去(也就是 Hadoop 環境的檔案區)

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

然後,就是執行 mapreduce job 的時候,只有一行指令,當然參數很多,不要換行!

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py    -mapper /home/hduser/mapper.py
-file /home/hduser/reducer.py   -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

執行完的結果會放在,執行範例如下:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob:  map 0%  reduce 0%
[...] INFO streaming.StreamJob:  map 43%  reduce 0%
[...] INFO streaming.StreamJob:  map 86%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 33%
[...] INFO streaming.StreamJob:  map 100%  reduce 70%
[...] INFO streaming.StreamJob:  map 100%  reduce 77%
[...] INFO streaming.StreamJob:  map 100%  reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$

執行完的結果在 HDFS 的 /user/hduser/gutenberg-output,用指令看一下結果的內容

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
hduser@ubuntu:/usr/local/hadoop$
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        2
"A_     1
"Absoluti       1
[...]
hduser@ubuntu:/usr/local/hadoop$

沒有留言:

張貼留言