Mapreduce with Hadoop via Python with Examples
In this article I will try to set up two examples of running mapreduce functions on Hadoop by Python. Aparche Hadoop framework is originally meant for Java. However, with Hadoop streaming API we can implement mapreduce functions in Python. In particular, the input and output of mapreduce functions are handled by standard input/output stream STDIN and STDOUT. We use Python module sys.stdin to do the trick.
In addition the scripts should work with data stream similar as the following
cat document | ./mapper.py | sort -k1,1 | ./reducer.py > output
STDIN and STDOUT.sys.stdin and sys.stdout to read and write data. Hadoop will take care of other matters.mapper functionmapper.py.STDIN and output key and value pairs.mapper.py is given as the following#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
words = line.strip().split()
for word in words:
print "%s\t1" % word
if __name__ == "__main__":
main()
mapper.py are accessible by executing the command chmod a+X mapper.pyreducer functionreducer.py.STDOUT which is the output of the mapper function, process the data and write to STDOUT.reducer.py is shown as the following1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
import sys
def main():
curword = None
curcount = 0
for line in sys.stdin:
word,count=line.strip().split('\t')
if curword == None:
curword = word
curcount = 1
continue
if curword == word:
curword = word
curcount += 1
else:
print "%s\t%d" %(curword,curcount)
curword = word
curcount = eval(count)
print "%s\t%d" %(curword,curcount)
if __name__=='__main__':
main()
reducer.py are accessible by executing the command chmod a+X reducer.py.hstartUse command jps to check if everything is fine with your Hadoop. In particular, you should have the following servers up and running
8257 Jps
6214 NameNode
6550 ResourceManager
6424 SecondaryNameNode
6651 NodeManager
8172 DataNode
hstop.hdfs datanote -format.hstart.hstop.<name>hadoop.tmp.dir</name> which is specified in the configuration file located (in my case) /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/Core-site.xml.hdfs datanote -format.hstart.First, let’s download a big text file with the following command
curl -O http://www.gutenberg.org/files/5000/5000-8.txt
As command curl is for Macos, you might want to use other alternatives, e.g. wget in other Linux machine
Move the data file to the Hadoop file system with the following command
hadoop fs -put 5000-8.txt /
Make sure that you have the file in the HDFS with the command hadoop fs -ls /
1
2
3
4
5
hadoop jar \
/usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files ./mapper.py,./reducer.py \
-mapper mapper.py -reducer reducer.py \
-input /5000-8.txt -output /tmp
In case there are something wrong with codes, you should delete the result file from HDFS and run the above command again. Remove the previous result file with command hadoop fs -rm -r /tmp
Check the result with command hadoop fs -cat /user/su/wc_out/part-00000
#Line count problem
Line count problem is to count the number of lines in a documents which can be accomplished by a mapreduce heuristics. Basically, the mapper function reads data one line by another and returns 1 after each line, and the reducer function sums up the returned valued.
mapper.pyThe mapper function is given as the following
1
2
3
4
5
6
7
#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
print "1"
if __name__ == "__main__":
main()
reducer.pyThe reducer function is given as the following
1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
import sys
def main():
sum = 0
for line in sys.stdin:
sum+=1
print "%s" % sum
if __name__=='__main__':
main()