Spark via Python: basic setup, count lines, and word counts

This post is about how to set up Spark for Python. In particular, it shows the steps to setup Spark on an interactive cluster located in University of Helsinki, Finland. In addition, there are two super simple but classical problems: count lines in a files and word counts, together with the solution codes.

Setup Spark services

  1. Build a directory for the project

    mkdir mySpark

  2. Download the latest Spark package from Spark home. On Linux machine, use the following command

    wget http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz

  3. Unpack the .tar package with the following command

    tar -xvvf spark-1.4.1-bin-hadoop2.6.tgz

  4. Configure Spark according to the local cluster by adding a file to ./spark-1.4.1-bin-hadoop2.6/conf/slaves. In my case the configuration file looks like

    # A Spark Worker will be started on each of the machines listed below.
    node180
    node182
    node183
    
  5. Start Spark services on the predefined nodes of the cluster with the following command

    ./spark-1.4.1-bin-hadoop2.6/sbin/start-all.sh

    You can check the running status of the Spark service from the root node by start a command line browser. For example, in a Linux system you can use lynx to check the address http://localhost on port 8080 with the following command

    lynx http://localhost:8080

    As a result, you should be about to see some information, e.g., in my case

    [spark-logo-77x50px-hd.png] 1.4.1 Spark Master at spark://ukko178:7077                                                         
       
         * URL: spark://ukko178:7077                                                                                               
         * REST URL: spark://ukko178:6066 (cluster mode)                                                                           
         * Workers: 3                                                                                                              
         * Cores: 48 Total, 0 Used                                                                                                 
         * Memory: 91.2 GB Total, 0.0 B Used                                                                                       
         * Applications: 0 Running, 0 Completed                                                                                    
         * Drivers: 0 Running, 0 Completed                                                                                         
         * Status: ALIVE                                                                                                           
       
    Workers                                                                                                                        
       
       Worker Id                                                                                                                   
       Address                                                                                                                     
       State                                                                                                                       
       
       Cores Memory                                                                                                                
       worker-20150726121716-86.50.20.184-40008 86.50.20.184:40008 ALIVE 16 (0 Used) 30.4 GB (0.0 B Used)                          
       worker-20150726121717-86.50.20.183-37887 86.50.20.183:37887 ALIVE 16 (0 Used) 30.4 GB (0.0 B Used)                          
       worker-20150726121718-86.50.20.181-51810 86.50.20.181:51810 ALIVE 16 (0 Used) 30.4 GB (0.0 B Used)                          
       
    Running Applications                                                                                                           
       
       Application ID                                                                                                              
       Name                                                                                                                        
       
       Cores Memory per Node Submitted Time User State Duration                                                                    
       
    Completed Applications                                                                                                         
       
       Application ID
    

Example: count lines

from pyspark import SparkContext
from pyspark import SparkConf
APP_NAME = 'my python script'
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster('spark://ukko178:7077')
sc = SparkContext(conf=conf)
lines = sc.textFile("../spark-1.4.1-bin-hadoop2.6/README.md")
lineLength = lines.map(lambda s: len(s))
lineLengths.persist()
totalLength = lineLength.reduce(lambda a,b:a+b)
print totalLength
from pyspark import SparkContext
from pyspark import SparkConf
def count_lines():
  # configuration
  APP_NAME = 'count lines'
  conf = SparkConf().setAppName(APP_NAME)
  conf = conf.setMaster('spark://ukko178:7077')
  sc = SparkContext(conf=conf)
  # core part of the script
  lines = sc.textFile("../spark-1.4.1-bin-hadoop2.6/README.md")
  lineLength = lines.map(lambda s: len(s))
  lineLengths.persist()
  totalLength = lineLength.reduce(lambda a,b:a+b)
  # output results
  print totalLength
if __name__ == '__main__':
  count_lines()
from pyspark import SparkContext
from pyspark import SparkConf
def count_lines_functioncall():
  # configuration
  APP_NAME = 'count lines'
  conf = SparkConf().setAppName(APP_NAME)
  conf = conf.setMaster('spark://ukko178:7077')
  sc = SparkContext(conf=conf)
  # core part of the script
  lines = sc.textFile("../spark-1.4.1-bin-hadoop2.6/README.md")
  lineLength = lines.map(count_lines_single)
  totalLength = lineLength.reduce(reducer)
  # output results
  print totalLength
def count_lines_single(lines):
  return len(lines)
def reducer(length1,length2):
  return length1+length2
if __name__ == '__main__':
  count_lines_functioncall()

Example: word counts

from pyspark import SparkContext
from pyspark import SparkConf
def word_count_lambdaexpression():
  # configuration
  APP_NAME = 'word count'
  conf = SparkConf().setAppName(APP_NAME)
  conf = conf.setMaster('spark://ukko178:7077')
  sc = SparkContext(conf=conf)
  # core part of the script
  lines = sc.textFile("../spark-1.4.1-bin-hadoop2.6/README.md")
  words = lines.flatMap(lambda x: x.split(' '))
  pairs = words.map(lambda x: (x,1))
  count = pairs.reduceByKey(lambda x,y: x+y)
  # output results
  for x in count.collect():
    print x
if __name__ == '__main__':
  word_count_lambdaexpression()
from pyspark import SparkContext
from pyspark import SparkConf
def word_count_functioncall():
  # configuration
  APP_NAME = 'word count'
  conf = SparkConf().setAppName(APP_NAME)
  conf = conf.setMaster('spark://ukko178:7077')
  sc = SparkContext(conf=conf)
  # core part of the script
  lines = sc.textFile("../spark-1.4.1-bin-hadoop2.6/README.md")
  table = lines.flatMap(flatmapper).map(mapper).reduceByKey(reducer)
  for x in table.collect():
    print x
def flatmapper(lines):
  return lines.split(' ')
def mapper(word):
  return (word,1)
def reducer(a, b):
  return a+b
if __name__ == '__main__':
  word_count_functioncall()
Hongyu Su 26 July 2015