Mapreduce with Hadoop via Python with Examples
Table of content
Introduction
In this article I will try to set up two examples of running mapreduce functions on Hadoop by Python. Aparche Hadoop framework is originally meant for Java. However, with Hadoop streaming API we can implement mapreduce functions in Python. In particular, the input and output of mapreduce functions are handled by standard input/output stream STDIN and STDOUT. We use Python module sys.stdin to do the trick.
Word count problem
- Word count is a canonical problem which is to count the occurrences of words in a document.
- The mapper function will take in the raw text file and convert it into a collection of key-value pairs. Each key is a word, and all keys (words) will have a value of 1.
- The reducer function will summary all key-value pairs in which the values of the same key are combined. The result is a list of unique key with the count of appearance.
Implement the mapreduce function for word count
-
With Hadoop streaming API, we aim to write a Python script acting as a mapper and a Python script acting as reducer.
-
In addition the scripts should work with data stream similar as the following
cat document | ./mapper.py | sort -k1,1 | ./reducer.py > output -
Hadoop is implemented in Java and is meant for Java. However, with Hadoop streaming package we can write our own mapreduce function based on Python. There are a few good blog about using Hadoop streaming package with Python, for example,
-
Here we provide a step-by-step tutorial on running a python mapreduce program on Hadoop on a Macos.
-
The magic is to use Hadoop stream API which allows data pass Hadoop through
STDINandSTDOUT. -
We will be using Python
sys.stdinandsys.stdoutto read and write data. Hadoop will take care of other matters.
Implement mapper function
- The first thing we need to do is to write a mapper function.
- Let us call the mapper function
mapper.py. - The function will read in a partition of data (probably a section of a file) from the standard input stream
STDINand output key and value pairs. - An example
mapper.pyis given as the following
#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
words = line.strip().split()
for word in words:
print "%s\t1" % word
if __name__ == "__main__":
main()
- It is worth noting that the first line is important which make the script executable by Hadoop.
- Make sure
mapper.pyare accessible by executing the commandchmod a+X mapper.py
shuffle procedure
- The operation between mapper reducer is sometimes called shuffle
- All key-value pairs are sorted according to the key before send to reducer.
- All pairs with the same key are send to the same reducer.
- The shuffle process is important because
- As the reducer are reading data stream, if it comes across a key that is different from the previous one, it knows that the previous key will never appear again.
- If all key-value pair share the same key, it ends up with only one reducer. There is no parallelization in this case.
Implement reducer function
- Now we are going to implement
reducer.py. - The function will get input from standard output stream
STDOUTwhich is the output of the mapper function, process the data and write toSTDOUT. - An example
reducer.pyis shown as the following
#!/usr/bin/env python
import sys
def main():
curword = None
curcount = 0
for line in sys.stdin:
word,count=line.strip().split('\t')
if curword == None:
curword = word
curcount = 1
continue
if curword == word:
curword = word
curcount += 1
else:
print "%s\t%d" %(curword,curcount)
curword = word
curcount = eval(count)
print "%s\t%d" %(curword,curcount)
if __name__=='__main__':
main()
- Make sure
reducer.pyare accessible by executing the commandchmod a+X reducer.py.
Submit the mapreduce program to Hadoop cluster
Start local Hadoop server
-
If you have followed my instruction to set hadoop on MacOS, you can start Hadoop server with command
hstart -
Use command
jpsto check if everything is fine with your Hadoop. In particular, you should have the following servers up and running8257 Jps 6214 NameNode 6550 ResourceManager 6424 SecondaryNameNode 6651 NodeManager 8172 DataNode
-
You might come across the problem that the datanode is missing which means you did not manage to start the datanode. To start the datanode, you can
- Stop all Hadoop services with
hstop. - Format the HDFS filesystem with
hdfs datanote -format. - Restart all Hadoop services with
hstart.
- Stop all Hadoop services with
-
If the above does not work or you don’t care about the data on the HSDF, you can
- Stop all Hadoop services with
hstop. - delete all file in
<name>hadoop.tmp.dir</name>which is specified in the configuration file located (in my case)/usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/Core-site.xml. - Format the HDFS filesystem with
hdfs datanote -format. - Restart all Hadoop services with
hstart.
- Stop all Hadoop services with
-
The latter alternative works at least for me.
Copy data files to HDFS
-
First, let’s download a big text file with the following command
curl -O http://www.gutenberg.org/files/5000/5000-8.txtAs command
curlis for Macos, you might want to use other alternatives, e.g.wgetin other Linux machine -
Move the data file to the Hadoop file system with the following command
hadoop fs -put 5000-8.txt / -
Make sure that you have the file in the HDFS with the command
hadoop fs -ls /
Submit job to Hadoop
- Now, we should submit the job to Hadoop by calling its streaming function with the following command
hadoop jar \
/usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files ./mapper.py,./reducer.py \
-mapper mapper.py -reducer reducer.py \
-input /5000-8.txt -output /tmp
-
In case there are something wrong with codes, you should delete the result file from HDFS and run the above command again. Remove the previous result file with command
hadoop fs -rm -r /tmp -
Check the result with command
hadoop fs -cat /user/su/wc_out/part-00000
#Line count problem
Line count problem is to count the number of lines in a documents which can be accomplished by a mapreduce heuristics. Basically, the mapper function reads data one line by another and returns 1 after each line, and the reducer function sums up the returned valued.
Implement mapper.py
The mapper function is given as the following
#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
print "1"
if __name__ == "__main__":
main()
Implement reducer.py
The reducer function is given as the following
#!/usr/bin/env python
import sys
def main():
sum = 0
for line in sys.stdin:
sum+=1
print "%s" % sum
if __name__=='__main__':
main()