Mapreduce with Hadoop via Python with Examples

Mapreduce with Hadoop via Python with Examples

Table of content

Introduction

In this article I will try to set up two examples of running mapreduce functions on Hadoop by Python. Aparche Hadoop framework is originally meant for Java. However, with Hadoop streaming API we can implement mapreduce functions in Python. In particular, the input and output of mapreduce functions are handled by standard input/output stream STDIN and STDOUT. We use Python module sys.stdin to do the trick.

Word count problem

  1. Word count is a canonical problem which is to count the occurrences of words in a document.
  2. The mapper function will take in the raw text file and convert it into a collection of key-value pairs. Each key is a word, and all keys (words) will have a value of 1.
  3. The reducer function will summary all key-value pairs in which the values of the same key are combined. The result is a list of unique key with the count of appearance.

Implement the mapreduce function for word count

  1. With Hadoop streaming API, we aim to write a Python script acting as a mapper and a Python script acting as reducer.
  2. In addition the scripts should work with data stream similar as the following

    cat document | ./mapper.py | sort -k1,1 | ./reducer.py > output

  3. Hadoop is implemented in Java and is meant for Java. However, with Hadoop streaming package we can write our own mapreduce function based on Python. There are a few good blog about using Hadoop streaming package with Python, for example,
    1. Writing a Hadoop mapreduce program in Python
    2. Performance analysis for scaling up R computation using Hadoop
    3. Python mapreduce on Hadoop - a beginners tutorial
  4. Here we provide a step-by-step tutorial on running a python mapreduce program on Hadoop on a Macos.
  5. The magic is to use Hadoop stream API which allows data pass Hadoop through STDIN and STDOUT.
  6. We will be using Python sys.stdin and sys.stdout to read and write data. Hadoop will take care of other matters.

Implement mapper function

#!/usr/bin/env python
import sys
def main():
  for line in sys.stdin:
    words = line.strip().split()
    for word in words:
      print "%s\t1" % word
if __name__ == "__main__":
  main()

shuffle procedure

Implement reducer function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
import sys
def main():
  curword = None
  curcount = 0
  for line in sys.stdin:
    word,count=line.strip().split('\t')
    if curword == None:
      curword = word
      curcount = 1
      continue
    if curword == word:
      curword = word
      curcount += 1
    else:
      print "%s\t%d" %(curword,curcount)
      curword = word
      curcount = eval(count)
  print "%s\t%d" %(curword,curcount)
if __name__=='__main__':
  main()

Submit the mapreduce program to Hadoop cluster

Start local Hadoop server

  1. If you have followed my instruction to set hadoop on MacOS, you can start Hadoop server with command hstart
  2. Use command jps to check if everything is fine with your Hadoop. In particular, you should have the following servers up and running

    8257 Jps
    6214 NameNode
    6550 ResourceManager
    6424 SecondaryNameNode
    6651 NodeManager
    8172 DataNode
    
  3. You might come across the problem that the datanode is missing which means you did not manage to start the datanode. To start the datanode, you can
    1. Stop all Hadoop services with hstop.
    2. Format the HDFS filesystem with hdfs datanote -format.
    3. Restart all Hadoop services with hstart.
  4. If the above does not work or you don’t care about the data on the HSDF, you can
    1. Stop all Hadoop services with hstop.
    2. delete all file in <name>hadoop.tmp.dir</name> which is specified in the configuration file located (in my case) /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/Core-site.xml.
    3. Format the HDFS filesystem with hdfs datanote -format.
    4. Restart all Hadoop services with hstart.
  5. The latter alternative works at least for me.

Copy data files to HDFS

  1. First, let’s download a big text file with the following command

    curl -O http://www.gutenberg.org/files/5000/5000-8.txt

    As command curl is for Macos, you might want to use other alternatives, e.g. wget in other Linux machine

  2. Move the data file to the Hadoop file system with the following command

    hadoop fs -put 5000-8.txt /

  3. Make sure that you have the file in the HDFS with the command hadoop fs -ls /

Submit job to Hadoop

1
2
3
4
5
hadoop jar \
     /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar  \
     -files ./mapper.py,./reducer.py \
     -mapper mapper.py -reducer reducer.py \
     -input /5000-8.txt -output /tmp

#Line count problem

Line count problem is to count the number of lines in a documents which can be accomplished by a mapreduce heuristics. Basically, the mapper function reads data one line by another and returns 1 after each line, and the reducer function sums up the returned valued.

Implement mapper.py

The mapper function is given as the following

1
2
3
4
5
6
7
#!/usr/bin/env python
import sys
def main():
  for line in sys.stdin:
    print "1"
if __name__ == "__main__":
  main()

Implement reducer.py

The reducer function is given as the following

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
import sys
def main():
  sum = 0
  for line in sys.stdin:
    sum+=1
  print "%s" % sum
if __name__=='__main__':
  main()
Hongyu Su 15 August 2015