Compute TF-IDF with Hadoop Python
If you have read my article about Hadoop Python with streaming API this is an extension.
I will use the same dataset as in the previous article. In addition, I generate other four set of data by sampling lines randomly from 5000-8.txt
. Before running Hadoop mapreduce, these datasets need to be uploaded to Hadoop HSDF with the command hadoop fs -put 5000* /
. Take a look at the HDFS with hadoop fs -ls /
.
Hongyu-MacBook-Air:simpleExample su$ hadoop fs -ls /
Found 5 items
-rw-r--r-- 1 su supergroup 1428841 2015-08-16 12:13 /5000-5.txt
-rw-r--r-- 1 su supergroup 447003 2015-08-16 12:13 /5000-6.txt
-rw-r--r-- 1 su supergroup 904220 2015-08-16 12:13 /5000-7.txt
-rw-r--r-- 1 su supergroup 1428841 2015-08-16 12:13 /5000-8.txt
-rw-r--r-- 1 su supergroup 395608 2015-08-16 12:13 /5000-9.txts
In this article, I will be computing the TF-IDF from these files.
mapper
function#!/usr/bin/env python
import sys
import os
def tfmapper():
for line in sys.stdin:
words = line.strip().split()
for word in words:
print "%s\t%s\t1" % (word,os.getenv('mapreduce_map_input_file','noname'))
if __name__ == '__main__':
tfmapper()
reducer
function#!/usr/bin/env python
import sys
def tfreducer():
curprefix = None
curcount = None
for line in sys.stdin:
word,filename,count = line.strip().split('\t')
prefix = '%s\t%s' % (word,filename)
if curprefix == None:
curprefix = prefix
curcount = eval(count)
elif curprefix == prefix:
curcount += eval(count)
else:
print "%s\t%s" % (curprefix,curcount)
curprefix = prefix
curcount = eval(count)
print "%s\t%s" % (curprefix,curcount)
if __name__=='__main__':
tfreducer()
When deploy the mapreduce functions on Hadoop, we can use wild card in the input filename such that all files match the wild card will be sent to input stream. In particular, we can submit the above mapreduce function with the following command
hadoop jar \
/usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files ./mapper.py,./reducer.py \
-mapper tfmapper.py \
-reducer tfreducer.py \
-input /5000-* \
-output /tmp
Take a look at the output hadoop fs -cat /tmp/par*|head -50
Hongyu-MacBook-Air:tfidf su$ hadoop fs -cat /tmp/par*|head -50
"(Lo)cra" hdfs://localhost:9000/5000-7.txt 1
"(Lo)cra" hdfs://localhost:9000/5000-5.txt 1
"(Lo)cra" hdfs://localhost:9000/5000-8.txt 1
"1490 hdfs://localhost:9000/5000-8.txt 1
"1490 hdfs://localhost:9000/5000-5.txt 1
"1490 hdfs://localhost:9000/5000-9.txt 1
It is obvious that each line in the output is word-file-count
.
Document frequency (DF) of a word \(w_i\) is by definition the ratio between the number of documents having word \(w_i\) and the total number of documents.
We will use the result file from the last step. In particular, we copy and rename the result file into HDFS hadoop fs -cp /tmp/part-00000 /tf
mapper
functionThe mapper
function will read each record from the above result and add 1 to the end of each record. An example mapper
function is given as the following
#!/usr/bin/env python
import sys
import os
def dfmapper():
for line in sys.stdin:
print "%s\t1" % line.strip()
if __name__ == '__main__':
dfmapper()
reduce
functionThe reducer
function will for each word read corresponding records into a buffer and compute the number of documents having the word. In the end, it will output all record from the buffer and add the number of the documents to the end of each record.
#!/usr/bin/env python
import sys
def dfreducer():
curword = None
curcount = None
space = []
for line in sys.stdin:
word,filename,wordcount,count = line.strip().split()
prefix = "%s\t%s\t%s" %(word,filename,wordcount)
if word == None:
curword = word
curcount = eval(count)
space.append(prefix)
elif curword == word:
curcount += eval(count)
space.append(prefix)
else:
for item in space:
print "%s\t%d" % (item,curcount)
curword = word
curcount = eval(count)
space = [prefix]
for item in space:
print "%s\t%d" % (item,curcount)
if __name__=='__main__':
dfreducer()
Now, we need to submit the mapreduce function to Hadoop. We need to clarify mapper function dfmapper.py
and reducer function dfreduce.py
, as well as input data file in HDFS. The following command can be used to submit the job
hadoop jar \
/usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files ./dfmapper.py,./dfreducer.py \
-mapper dfmapper.py -reducer dfreducer.py \
-input /tf -output /tmp
As a results, we have each line from the result file with the format word-file-tfcount-dfcount
Hongyu-MacBook-Air:tfidf su$ hadoop fs -cat /tmp/par*
"(Lo)cra" hdfs://localhost:9000/5000-5.txt 1 3
"(Lo)cra" hdfs://localhost:9000/5000-8.txt 1 3
"(Lo)cra" hdfs://localhost:9000/5000-7.txt 1 3
"1490 hdfs://localhost:9000/5000-8.txt 1 3
"1490 hdfs://localhost:9000/5000-5.txt 1 3
"1490 hdfs://localhost:9000/5000-9.txt 1 3
"1498," hdfs://localhost:9000/5000-9.txt 1 3
"1498," hdfs://localhost:9000/5000-8.txt 1 3
With the result from last step, it is straight forward to compute TF-IDF of a word. For a Hadoop implementation, we just need a mapper
function to compute the value for each record and a reduce
function which does not perform any operation.