forked from HanXiaoyang/pyspark-recommendation-demo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuserBasedRecommender.py
176 lines (140 loc) · 5.11 KB
/
userBasedRecommender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#-*- coding:utf8 -*-
# pySpark实现的基于用户的协同过滤
# 使用的余弦相似度
import sys
from collections import defaultdict
from itertools import combinations
import random
import numpy as np
import pdb
from pyspark import SparkContext
def parseVectorOnUser(line):
'''
解析数据,key是user,后面是item和打分
'''
line = line.split("|")
return line[0],(line[1],float(line[2]))
def parseVectorOnItem(line):
'''
解析数据,key是item,后面是user和打分
'''
line = line.split("|")
return line[1],(line[0],float(line[2]))
def sampleInteractions(item_id,users_with_rating,n):
'''
如果某个商品上用户行为特别多,可以选择适当做点下采样
'''
if len(users_with_rating) > n:
return item_id, random.sample(users_with_rating,n)
else:
return item_id, users_with_rating
def findUserPairs(item_id,users_with_rating):
'''
对每个item,找到共同打分的user对
'''
for user1,user2 in combinations(users_with_rating,2):
return (user1[0],user2[0]),(user1[1],user2[1])
def calcSim(user_pair,rating_pairs):
'''
对每个user对,根据打分计算余弦距离,并返回共同打分的item个数
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
for rating_pair in rating_pairs:
sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
# sum_y += rt[1]
# sum_x += rt[0]
n += 1
cos_sim = cosine(sum_xy,np.sqrt(sum_xx),np.sqrt(sum_yy))
return user_pair, (cos_sim,n)
def cosine(dot_product,rating_norm_squared,rating2_norm_squared):
'''
2个向量A和B的余弦相似度
dotProduct(A, B) / (norm(A) * norm(B))
'''
numerator = dot_product
denominator = rating_norm_squared * rating2_norm_squared
return (numerator / (float(denominator))) if denominator else 0.0
def keyOnFirstUser(user_pair,item_sim_data):
'''
对于每个user-user对,用第一个user做key(好像有点粗暴...)
'''
(user1_id,user2_id) = user_pair
return user1_id,(user2_id,item_sim_data)
def nearestNeighbors(user,users_and_sims,n):
'''
选出相似度最高的N个邻居
'''
users_and_sims.sort(key=lambda x: x[1][0],reverse=True)
return user, users_and_sims[:n]
def topNRecommendations(user_id,user_sims,users_with_rating,n):
'''
根据最近的N个邻居进行推荐
'''
totals = defaultdict(int)
sim_sums = defaultdict(int)
for (neighbor,(sim,count)) in user_sims:
# 遍历邻居的打分
unscored_items = users_with_rating.get(neighbor,None)
if unscored_items:
for (item,rating) in unscored_items:
if neighbor != item:
# 更新推荐度和相近度
totals[neighbor] += sim * rating
sim_sums[neighbor] += sim
# 归一化
scored_items = [(total/sim_sums[item],item) for item,total in totals.items()]
# 按照推荐度降序排列
scored_items.sort(reverse=True)
# 推荐度的item
ranked_items = [x[1] for x in scored_items]
return user_id,ranked_items[:n]
if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, \
"Usage: PythonUserCF <master> <file>"
exit(-1)
sc = SparkContext(sys.argv[1],"PythonUserCF")
lines = sc.textFile(sys.argv[2])
'''
处理数据,获得稀疏item-user矩阵:
item_id -> ((user_1,rating),(user2,rating))
'''
item_user_pairs = lines.map(parseVectorOnItem).groupByKey().map(
lambda p: sampleInteractions(p[0],p[1],500)).cache()
'''
获得2个用户所有的item-item对得分组合:
(user1_id,user2_id) -> [(rating1,rating2),
(rating1,rating2),
(rating1,rating2),
...]
'''
pairwise_users = item_user_pairs.filter(
lambda p: len(p[1]) > 1).map(
lambda p: findUserPairs(p[0],p[1])).groupByKey()
'''
计算余弦相似度,找到最近的N个邻居:
(user1,user2) -> (similarity,co_raters_count)
'''
user_sims = pairwise_users.map(
lambda p: calcSim(p[0],p[1])).map(
lambda p: keyOnFirstUser(p[0],p[1])).groupByKey().map(
lambda p: nearestNeighbors(p[0],p[1],50))
'''
对每个用户的打分记录整理成如下形式
user_id -> [(item_id_1, rating_1),
[(item_id_2, rating_2),
...]
'''
user_item_hist = lines.map(parseVectorOnUser).groupByKey().collect()
ui_dict = {}
for (user,items) in user_item_hist:
ui_dict[user] = items
uib = sc.broadcast(ui_dict)
'''
为每个用户计算Top N的推荐
user_id -> [item1,item2,item3,...]
'''
user_item_recs = user_sims.map(
lambda p: topNRecommendations(p[0],p[1],uib.value,100)).collect()