forked from HanXiaoyang/pyspark-recommendation-demo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMovie_Recommendation_using_pyspark.py
157 lines (126 loc) · 5.51 KB
/
Movie_Recommendation_using_pyspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
# 基于spark中ALS的推荐系统,针对movielens中电影打分数据做推荐
# Edit:寒小阳([email protected])
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
def parseRating(line):
"""
MovieLens的打分格式是userId::movieId::rating::timestamp
我们对格式做一个解析
"""
fields = line.strip().split("::")
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
def parseMovie(line):
"""
对应的电影文件的格式为movieId::movieTitle
解析成int id, 文本
"""
fields = line.strip().split("::")
return int(fields[0]), fields[1]
def loadRatings(ratingsFile):
"""
载入得分
"""
if not isfile(ratingsFile):
print "File %s does not exist." % ratingsFile
sys.exit(1)
f = open(ratingsFile, 'r')
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
f.close()
if not ratings:
print "No ratings provided."
sys.exit(1)
else:
return ratings
def computeRmse(model, data, n):
"""
评估的时候要用的,计算均方根误差
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
if __name__ == "__main__":
if (len(sys.argv) != 3):
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \
"MovieLensALS.py movieLensDataDir personalRatingsFile"
sys.exit(1)
# 设定环境
conf = SparkConf() \
.setAppName("MovieLensALS") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
# 载入打分数据
myRatings = loadRatings(sys.argv[2])
myRatingsRDD = sc.parallelize(myRatings, 1)
movieLensHomeDir = sys.argv[1]
# 得到的ratings为(时间戳最后一位整数, (userId, movieId, rating))格式的RDD
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
# 得到的movies为(movieId, movieTitle)格式的RDD
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
# 根据时间戳最后一位把整个数据集分成训练集(60%), 交叉验证集(20%), 和评估集(20%)
# 训练, 交叉验证, 测试 集都是(userId, movieId, rating)格式的RDD
numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
.values() \
.union(myRatingsRDD) \
.repartition(numPartitions) \
.cache()
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
.values() \
.repartition(numPartitions) \
.cache()
test = ratings.filter(lambda x: x[0] >= 8).values().cache()
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
# 训练模型,在交叉验证集上看效果
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(training, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
testRmse = computeRmse(bestModel, test, numTest)
# 在测试集上评估 交叉验证集上最好的模型
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
# 我们把基线模型设定为每次都返回平均得分的模型
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."
# 个性化的推荐(针对某个用户)
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')
# clean up
sc.stop()