Spark with Python: collaborative filtering
The algorithm implemented for collaborative filtering (CF) in Spark MLlib is Alternative Least Squares (ALS) with Weight Regularization. The algorithm is described in the research paper ‘Large-scale Parallel Collaborative Filtering for the Netflix Prize’. I would assume there exist better algorithm out there. However, ALS is the one implemented in MLlib. So be it. I just try to sketch the general idea of the algorithm as in the following bullet list. Interesting read can go to other external reference for a comprehensive story.
The loss function used in ALS is so called rooted mean square error (RMSE) defined as
\(\mathcal{L}(R,U,M) = \frac{1}{n}\sum_{i,j}(r_{i,j} - <u_{i},m_{j}>)^2\),
where \(n\) is the number of entries in the rating matrix \(R\).
Combine the loss function, the objective of ALS can be formulated as
\(\underset{U,M}{\min} \frac{1}{n}\sum_{i,j}(r_{i,j} - <u_{i},m_{j}>)^2 + \lambda (\sum_{i} n_{n_i} u_i^2+\sum_{i} n_{m_i} m_i^2)\),
where \(\lambda\) is the regularization parameter that controls the balance of the loss term and the regularization term, \(n_{u_i}\) is the number of movies rated by user \(i\), and \(n_{m_i}\) is the number of users that rate movies \(i\).
##General information
UserID::MovieID::Rating::Time
.lynx http://localhost:8080
.conf/spark-env.sh
.export SPARK_DAEMON_MEMORY=8g
export SPARK_WORKER_MEMORY=8g
export SPARK_DAEMON_JAVA_OPTS="-Xms8g -Xmx8g"
export SPARK_JAVA_OPTS="-Xms8g -Xmx8g"
export SPARK_LOCAL_DIRS='/cs/work/group/urenzyme/workspace/SparkViaPython/tmp/'
/tmp/
directory is almost full. The solution is to specify another temporary directory for Spark by writing again the configuration file according to the following
conf/spark-env.sh
.export SPARK_LOCAL_DIRS='/cs/work/group/urenzyme/workspace/SparkViaPython/tmp/'
Statistics of the dataset
Name | Number |
---|---|
ratings | 1000209 |
Training | 978241 |
Test | 21968 |
Parameter selections
Rank | \(\lambda\) | Iteration | RMSE |
---|---|---|---|
30 | 0.1 | 10 | 0.812829467087 |
30 | 0.1 | 15 | 0.805940272574 |
30 | 0.1 | 20 | 0.802684279113 |
30 | 0.01 | 10 | 0.630386656389 |
30 | 0.01 | 15 | 0.625034196609 |
30 | 0.01 | 20 | 0.622905635641 |
30 | 0.001 | 10 | 0.631097769333 |
30 | 0.001 | 15 | 0.623913185952 |
30 | 0.001 | 20 | 0.619115901472 |
20 | 0.1 | 10 | 0.81763450096 |
20 | 0.1 | 15 | 0.809997696948 |
20 | 0.1 | 20 | 0.806740178999 |
20 | 0.01 | 10 | 0.687371500823 |
20 | 0.01 | 15 | 0.684068044569 |
20 | 0.01 | 20 | 0.682312433409 |
20 | 0.001 | 10 | 0.689387829401 |
20 | 0.001 | 15 | 0.682992017723 |
20 | 0.001 | 20 | 0.68042871175 |
10 | 0.1 | 10 | 0.829041851 |
10 | 0.1 | 15 | 0.82183469039 |
10 | 0.1 | 20 | 0.819218398722 |
10 | 0.01 | 10 | 0.761779803655 |
10 | 0.01 | 15 | 0.757920617128 |
10 | 0.01 | 20 | 0.756874383345 |
10 | 0.001 | 10 | 0.759740415789 |
10 | 0.001 | 15 | 0.757288020603 |
10 | 0.001 | 20 | 0.75646083268 |
Best parameters
Rank | \(\lambda\) | Iteration | RMSE |
---|---|---|---|
30 | 0.001 | 20 | 0.619115901472 |
If taking the best parameter on training data, the performances on training and test sets are listed in the following table.
ALS | Mean imputation | |
---|---|---|
Training | 0.62 | 1.12 |
Test | 1.18 | 1.12 |
If taking the second best parameter on training data, the performance on training and test sets are listed in the following table.
ALS | Mean imputation | |
---|---|---|
Training | 0.62 | 1.12 |
Test | 1.19 | 1.12 |
If taking the third best parameter on training data, the performance on training and test sets are listed in the following table.
ALS | Mean imputation | |
---|---|---|
Training | 0.62 | 1.12 |
Test | 0.96 | 1.12 |
It seems that we should not overfit training data :scream: :angry: But the question is how to select best parameter based on test data. I guess the most common way is to perform cross validation rather than the above training and test separation.
Statistics of the dataset
Name | Number |
---|---|
ratings | 10000054 |
Training | 9786084 |
Test | 213970 |
Parameter selections
Rank | \(\lambda\) | Iteration | RMSE |
---|---|---|---|
30 | 0.1 | 10 | 0.789595015096 |
30 | 0.1 | 15 | 0.783871458306 |
30 | 0.1 | 20 | 0.781022279169 |
30 | 0.01 | 10 | 0.629501460062 |
30 | 0.01 | 15 | 0.624860412768 |
30 | 0.01 | 20 | 0.622747739521 |
30 | 0.001 | 10 | 0.625608789618 |
30 | 0.001 | 15 | 0.619326932555 |
30 | 0.001 | 20 | 0.615837106427 |
20 | 0.1 | 10 | 0.787573198049 |
20 | 0.1 | 15 | 0.783031125788 |
20 | 0.1 | 20 | 0.781263906207 |
20 | 0.01 | 10 | 0.670921064045 |
20 | 0.01 | 15 | 0.667651507438 |
20 | 0.01 | 20 | 0.66548013614 |
20 | 0.001 | 10 | 0.668088452457 |
20 | 0.001 | 15 | 0.663146686623 |
20 | 0.001 | 20 | 0.661223585465 |
10 | 0.1 | 10 | 0.791031639862 |
10 | 0.1 | 15 | 0.785616978954 |
10 | 0.1 | 20 | 0.784268636801 |
10 | 0.01 | 10 | 0.7284122791 |
10 | 0.01 | 15 | 0.725975535523 |
10 | 0.01 | 20 | 0.725448043578 |
10 | 0.001 | 10 | 0.724738139253 |
10 | 0.001 | 15 | 0.722580933105 |
10 | 0.001 | 20 | 0.721248563089 |
Performance on training and test sets
ALS | Mean imputation | |
---|---|---|
Training | 0.62 | 1.06 |
Test | 0.98 | 1.06 |
As you can see ALS actually improves the RMSE :+1: :v:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
import itertools
from math import sqrt
import sys
from operator import add
# set up Spark environment
APP_NAME = "Collaboratove filtering for movie recommendation"
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster('spark://ukko160:7077')
sc = SparkContext(conf=conf)
# read in data
data = sc.textFile(filename)
ratings = data.map(parseRating)
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r:r[0]).distinct().count()
numMovies = ratings.values().map(lambda r:r[1]).distinct().count()
print "--- %d ratings from %d users for %d movies\n" % (numRatings, numUsers, numMovies)
parseRating
function is defined asdef parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp.
"""
fields = line.strip().split("::")
return (int(int(fields[0])%10),int(int(fields[1])%10)), (int(fields[0]), int(fields[1]), float(fields[2]))
numPartitions = 10
training = ratings.filter(lambda r: not(r[0][0]<=0 and r[0][1]<=1) ).values().repartition(numPartitions).cache()
test = ratings.filter(lambda r: r[0][0]<=0 and r[0][1]<=1 ).values().cache()
numTraining = training.count()
numTest = test.count()
print "ratings:\t%d\ntraining:\t%d\ntest:\t\t%d\n" % (ratings.count(), training.count(),test.count())
ranks = [10,20,30]
lambdas = [0.1,0.01,0.001]
numIters = [10,20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(training, rank, numIter, lmbda)
predictions = model.predictAll(training.map(lambda x:(x[0],x[1])))
predictionsAndRatings = predictions.map(lambda x:((x[0],x[1]),x[2])).join(training.map(lambda x:((x[0],x[1]),x[2]))).values()
validationRmse = sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(numTraining))
print rank, lmbda, numIter, validationRmse
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
print bestRank, bestLambda, bestNumIter, bestValidationRmse
print "ALS on train:\t\t%.2f" % bestValidationRmse
meanRating = training.map(lambda x: x[2]).mean()
baselineRmse = sqrt(training.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTraining)
print "Mean imputation:\t\t%.2f" % baselineRmse
# predict test ratings
try:
predictions = bestModel.predictAll(test.map(lambda x:(x[0],x[1])))
predictionsAndRatings = predictions.map(lambda x:((x[0],x[1]),x[2])).join(test.map(lambda x:((x[0],x[1]),x[2]))).values()
testRmse = sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(numTest))
except Exception as myerror:
print myerror
testRmse = sqrt(test.map(lambda x: (x[0] - 0) ** 2).reduce(add) / float(numTest))
print "ALS on test:\t%.2f" % testRmse
# use mean rating as predictions
meanRating = training.map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
print "Mean imputation:\t%.2f" % baselineRmse
# shut down spark
sc.stop()