Mapreduce with Hadoop via Python with Examples
In this article I will try to set up two examples of running mapreduce functions on Hadoop by Python. Aparche Hadoop framework is originally meant for Java. However, with Hadoop streaming API we can implement mapreduce functions in Python. In particular, the input and output of mapreduce functions are handled by standard input/output stream STDIN
and STDOUT
. We use Python module sys.stdin
to do the trick.
In addition the scripts should work with data stream similar as the following
cat document | ./mapper.py | sort -k1,1 | ./reducer.py > output
STDIN
and STDOUT
.sys.stdin
and sys.stdout
to read and write data. Hadoop will take care of other matters.mapper
functionmapper.py
.STDIN
and output key and value pairs.mapper.py
is given as the following#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
words = line.strip().split()
for word in words:
print "%s\t1" % word
if __name__ == "__main__":
main()
mapper.py
are accessible by executing the command chmod a+X mapper.py
reducer
functionreducer.py
.STDOUT
which is the output of the mapper function, process the data and write to STDOUT
.reducer.py
is shown as the following1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
import sys
def main():
curword = None
curcount = 0
for line in sys.stdin:
word,count=line.strip().split('\t')
if curword == None:
curword = word
curcount = 1
continue
if curword == word:
curword = word
curcount += 1
else:
print "%s\t%d" %(curword,curcount)
curword = word
curcount = eval(count)
print "%s\t%d" %(curword,curcount)
if __name__=='__main__':
main()
reducer.py
are accessible by executing the command chmod a+X reducer.py
.hstart
Use command jps
to check if everything is fine with your Hadoop. In particular, you should have the following servers up and running
8257 Jps
6214 NameNode
6550 ResourceManager
6424 SecondaryNameNode
6651 NodeManager
8172 DataNode
hstop
.hdfs datanote -format
.hstart
.hstop
.<name>hadoop.tmp.dir</name>
which is specified in the configuration file located (in my case) /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/Core-site.xml
.hdfs datanote -format
.hstart
.First, let’s download a big text file with the following command
curl -O http://www.gutenberg.org/files/5000/5000-8.txt
As command curl
is for Macos, you might want to use other alternatives, e.g. wget
in other Linux machine
Move the data file to the Hadoop file system with the following command
hadoop fs -put 5000-8.txt /
Make sure that you have the file in the HDFS with the command hadoop fs -ls /
1
2
3
4
5
hadoop jar \
/usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files ./mapper.py,./reducer.py \
-mapper mapper.py -reducer reducer.py \
-input /5000-8.txt -output /tmp
In case there are something wrong with codes, you should delete the result file from HDFS and run the above command again. Remove the previous result file with command hadoop fs -rm -r /tmp
Check the result with command hadoop fs -cat /user/su/wc_out/part-00000
#Line count problem
Line count problem is to count the number of lines in a documents which can be accomplished by a mapreduce heuristics. Basically, the mapper
function reads data one line by another and returns 1 after each line, and the reducer
function sums up the returned valued.
mapper.py
The mapper
function is given as the following
1
2
3
4
5
6
7
#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
print "1"
if __name__ == "__main__":
main()
reducer.py
The reducer
function is given as the following
1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
import sys
def main():
sum = 0
for line in sys.stdin:
sum+=1
print "%s" % sum
if __name__=='__main__':
main()