7.4 RHadoop实现基于MapReduce的协同过滤算法

问题

RHadoop如何实现基于MapReduce的协同过滤算法?

引言

由于RHadoop的rmr2包对Hadoop操作有一些特殊性,所以代码实现有一定难度。需要深入学习的同学,请多尝试并思考MapReduce算法中的key/value值的设计。

7.4.1 基于物品推荐的协同过滤算法介绍

越来越多的互联网应用,都开始设计自己的推荐系统。推荐算法主要使用协同过滤算法,根据用户活跃度和物品流行度,可以分为基于用户的协同过滤算法和 基于物品的协同过滤算法。

  • 基于用户的协同过滤算法,是给用户推荐和他兴趣相似的其他用户喜欢的物品。
  • 基于物品的协同过滤算法,是给用户推荐和他之前喜欢的物品相似的物品。

基于物品的协同过滤算法,是目前广泛使用的一种推荐算法,像Netflix、YouTube、 Amazon等很多的互联网公司都在使用。设计推荐算法主要分为两步:

  1. 计算物品之间的相似度
  2. 根据物品的相似度和用户的历史行为给用户生成推荐列表

为了介绍算法模型,我们选择一组很小的测试数据集。该测试数据集摘自《Mahout In Action》一书第49页,原数据集中第8行,“3,101,2.5” 改为 “3,101,2.0”,数据集中每行3个字段,依次是用户ID、物品ID、对物品的评分。在服务上创建数据文件 small.csv。

~ vi small.csv   #用vi编辑数据文件
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.0
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

7.4.2 R语言本地程序实现

首先,通过R语言实现基于物品的协同过滤算法, 与RHadoop实现进行对比。这里我使用《Mahout In Action》一书第6章介绍的分步式基于物品的协同过滤算法进行实现。算法的思想,分为3步实现:

  1. 建立物品的同现矩阵
  2. 建立用户对物品的评分矩阵
  3. 矩阵计算推荐结果

1. 建立物品的同现矩阵

按用户分组,找到每个用户所选的物品单独出现计数及两两一组计数。例如:用户ID为3的用户,分别给101、104、105、107这4个物品打分。

(1) (101,101),(104,104),(105,105),(107,107),单独出现计算各加1。

(2) (101,104),(101,105),(101,107),(104,105),(104,107),(105,107),两个一组计数各加1。

(3) 最后,把所有用户的计算结果求和,生成一个三角矩阵,再补全三角矩阵,就建立了物品的同现矩阵。如下面矩阵所示:

 [101] [102] [103] [104] [105] [106] [107]
[101]   5     3     4     4     2     2     1
[102]   3     3     3     2     1     1     0
[103]   4     3     4     3     1     2     0
[104]   4     2     3     4     2     2     1
[105]   2     1     1     2     2     1     1
[106]   2     1     2     2     1     2     0
[107]   1     0     0     1     1     0     1

2. 建立用户对物品的评分矩阵

按用户分组,找到每个用户所选的物品及评分,例如,用户ID为3的用户,分别给(3,101,2.0)、(3,104,4.0)、(3,105,4.5)、(3,107,5.0)这4个物品打分。

(1) 找到物品评分(3,101,2.0),(3,104,4.0),(3,105,4.5),(3,107,5.0)。 (2) 建立用户对物品的评分矩阵。

     U3
[101] 2.0
[102] 0.0
[103] 0.0
[104] 4.0
[105] 4.5
[106] 0.0
[107] 5.0

3. 矩阵计算推荐结果

同现矩阵 * 评分矩阵 = 推荐结果,如图7-1所示。推荐给ID为3的用户的结果是(103,24.5), (102,18.5), (106,16.5)。

矩阵乘法

图7-1 矩阵乘法(摘自《Mahout In Action》)

按照上面的描述过程,我们用R语言代码进行实现:

> library(plyr)  # 加载plyr包
> train<-read.csv(file="small.csv",header=FALSE)  # 读取数据集
> names(train)<-c("user","item","pref")
> train  # 查看train数据集
  user item pref
1 1 101 5.0
2 1 102 3.0
3 1 103 2.5
4 2 101 2.0
5 2 102 2.5
6 2 103 5.0
7 2 104 2.0
8 3 101 2.0
9 3 104 4.0
10 3 105 4.5
11 3 107 5.0
12 4 101 5.0
13 4 103 3.0
14 4 104 4.5
15 4 106 4.0
16 5 101 4.0
17 5 102 3.0
18 5 103 2.0
19 5 104 4.0
20 5 105 3.5
21 5 106 4.0

> usersUnique<-function(){  # 计算用户列表
+     users<-unique(train$user)
+     users[order(users)]
+ }

> itemsUnique<-function(){  #计算商品列表方法
+     items<-unique(train$item)
+     items[order(items)]
+ }

> users<-usersUnique()  # 用户列表
> users
[1] 1 2 3 4 5

> items<-itemsUnique()    # 商品列表
> items
[1] 101 102 103 104 105 106 107


> index<-function(x) which(items %in% x)    #建立商品列表索引
> data<-ddply(train,.(user,item,pref),summarize,idx=index(item))

> data
 user item pref idx
1 1 101 5.0 1
2 1 102 3.0 2
3 1 103 2.5 3
4 2 101 2.0 1
5 2 102 2.5 2
6 2 103 5.0 3
7 2 104 2.0 4
8 3 101 2.0 1
9 3 104 4.0 4
10 3 105 4.5 5
11 3 107 5.0 7
12 4 101 5.0 1
13 4 103 3.0 3
14 4 104 4.5 4
15 4 106 4.0 6
16 5 101 4.0 1
17 5 102 3.0 2
18 5 103 2.0 3
19 5 104 4.0 4
20 5 105 3.5 5
21 5 106 4.0 6

> cooccurrence<-function(data){  #同现矩阵
+     n<-length(items)
+     co<-matrix(rep(0,n*n),nrow=n)
+     for(u in users){
+         idx<-index(data$item[which(data$user==u)])
+         m<-merge(idx,idx)
+         for(i in 1:nrow(m)){
+             co[m$x[i],m$y[i]]=co[m$x[i],m$y[i]]+1
+         }
+     }
+     return(co)
+ }

> recommend<-function(udata=udata,co=coMatrix,num=0){  #推荐算法
+     n<-length(items)+     pref<-rep(0,n)
+     pref[udata$idx]<-udata$pref
+     
+     userx<-matrix(pref,nrow=n)  # 用户评分矩阵
+
+     
+     r<-co %*% userx  # 同现矩阵*评分矩阵
+     
+     r[udata$idx]<-0  # 推荐结果排序
+     idx<-order(r,decreasing=TRUE)
+     topn<-data.frame(user=rep(udata$user[1],length(idx)),item=items[idx],val=r[idx])
+     topn<-topn[which(topn$val>0),]
+     
+     if(num>0){  # 推荐结果取前num个
+         topn<-head(topn,num)
+     }
+     
+     return(topn)  #返回结果
+ }

> co<-cooccurrence(data)  # 生成同现矩阵
> co
    [,1] [,2] [,3] [,4] [,5] [,6] [,7]
[1,]  5    3    4    4    2    2    1
[2,]  3    3    3    2    1    1    0
[3,]  4    3    4    3    1    2    0
[4,]  4    2    3    4    2    2    1
[5,]  2    1    1    2    2    1    1
[6,]  2    1    2    2    1    2    0
[7,]  1    0    0    1    1    0    1

> recommendation<-data.frame()   # 计算推荐结果
> for(i in 1:length(users)){
+     udata<-data[which(data$user==users[i]),]
+     recommendation<-rbind(recommendation,recommend(udata,co,0))
+ }

> recommendation  # 输出推荐结果
  user item val
1 1 104 33.5
2 1 106 18.0
3 1 105 15.5
4 1 107 5.0
5 2 106 20.5
6 2 105 15.5
7 2 107 4.0
8 3 103 24.5
9 3 102 18.5
10 3 106 16.5
11 4 102 37.0
12 4 105 26.0
13 4 107 9.5
14 5 107 11.5

7.4.3 R基于Hadoop分步式程序实现

R语言实现的MapReduce算法,可以基于R的数据对象实现,不必如Java一样使用文本存储。算法思想同上面R语言实现思想,略有复杂,具体如下。

(1) 建立物品的同现矩阵,首先是按用户分组,得到所有物品出现的组合列表;其次是对物品组合列表进行计数,建立物品的同现矩阵。

(2) 建立用户对物品的评分矩阵。

(3) 合并同现矩阵和评分矩阵。

(4) 计算推荐结果列表。

(5) 按输入格式得到推荐评分列表。

通过MapReduce实现时,所有操作都要使用Map和Reduce的任务完成,程序实现过程略有变化,如图7-2所示。下面按照上面列出来的5个步骤,进行矩阵分析。

分布式程序设计

图7-2 分布式程序设计(摘自《Mahout In Action》)

1. 建立物品的同现矩阵

1) 按用户分组,得到所有物品出现的组合列表。

  • key:物品列表向量
  • val:物品组合向量
$key
[1] 101 101 101 101 101 101 101 101 101 101 101 101 101 101 101 102 102 102 102
[20] 102 102 102 103 103 103 103 103 103 103 103 103 103 103 104 104 104 104 104
[39] 104 104 104 104 104 104 104 105 105 105 105 106 106 106 106 107 107 107 107
[58] 101 101 101 101 101 101 102 102 102 102 102 102 103 103 103 103 103 103 104
[77] 104 104 104 104 104 105 105 105 105 105 105 106 106 106 106 106 106

$val
[1] 101 102 103 101 102 103 104 101 104 105 107 101 103 104 106 101 102 103 101
[20] 102 103 104 101 102 103 101 102 103 104 101 103 104 106 101 102 103 104 101
[39] 104 105 107 101 103 104 106 101 104 105 107 101 103 104 106 101 104 105 107
[58] 101 102 103 104 105 106 101 102 103 104 105 106 101 102 103 104 105 106 101
[77] 102 103 104 105 106 101 102 103 104 105 106 101 102 103 104 105 106

2) 对物品组合列表进行计数,建立物品的同现矩阵

  • key:物品列表向量
  • val:同现矩阵的数据框值(item,item,Freq)

矩阵格式要与 "2. 建立用户对物品的评分矩阵" 的格式一致,把异构的两种数据源合并为同一种数据格式,为 "3. 合并同现矩阵和评分矩阵" 做数据基础。

$key
[1] 101 101 101 101 101 101 101 102 102 102 102 102 102 103 103 103 103 103 103
[20] 104 104 104 104 104 104 104 105 105 105 105 105 105 105 106 106 106 106 106
[39] 106 107 107 107 107

$val
k v freq
1 101 101 5
2 101 102 3
3 101 103 4
4 101 104 4
5 101 105 2
6 101 106 2
7 101 107 1
8 102 101 3
9 102 102 3
10 102 103 3
11 102 104 2# 省略部分输出

2. 建立用户对物品的评分矩阵

  • key:物品列表
  • val:用户对物品打分矩阵
$key
[1] 101 101 101 101 101 102 102 102 103 103 103 103 104 104 
$val
item user pref
1 101 1 5.0
2 101 2 2.0
3 101 3 2.0
4 101 4 5.0
5 101 5 4.0
6 102 1 3.0
7 102 2 2.5
8 102 5 3.0
9 103 1 2.5
10 103 2 5.0
11 103 4 3.0
12 103 5 2.0
13 104 2 2.0
14 104 3 4.0
15 104 4 4.5

3. 合并同现矩阵和评分矩阵

这一步操作是MapReduce中比较特殊的,因为数据源是两个异构数据源,进行MapReduce的操作。在第二步时,我们已经把两种格式合并为一样的,第三步使用equijoin这个rmr2包的函数,进行矩阵合并。

  • key:NULL
  • val:合并的数据框
$key
NULL
$val
k.l v.l freq.l item.r user.r pref.r
1 103 101 4 103 1 2.5
2 103 102 3 103 1 2.5
3 103 103 4 103 1 2.5
4 103 104 3 103 1 2.5
5 103 105 1 103 1 2.5
6 103 106 2 103 1 2.5
7 103 101 4 103 2 5.0
8 103 102 3 103 2 5.0
9 103 103 4 103 2 5.0
10 103 104 3 103 2 5.0
11 103 105 1 103 2 5.0
12 103 106 2 103 2 5.0
13 103 101 4 103 4 3.0

4. 计算推荐结果列表

把第三步中的矩阵,进行合并计算,得到推荐结果列表。

  • key:物品列表
  • val:推荐结果数据框
$key
[1] 101 101 101 101 101 101 101 101 101 101 101 101 101 101 101 101 101 101


$val
k.l v.l user.r v
1 101 101 1 25.0
2 101 101 2 10.0
3 101 101 3 10.0
4 101 101 4 25.0
5 101 101 5 20.0
6 101 102 1 15.0
7 101 102 2 6.0
8 101 102 3 6.0
9 101 102 4 15.0
10 101 102 5 12.0
11 101 103 1 20.0
12 101 103 2 8.0
13 101 103 3 8.0
14 101 103 4 20.0
15 101 103 5 16.0
16 101 104 1 20.0
17 101 104 2 8.0
18 101 104 3 8.0

# 省略部分输出

5. 按输入格式得到推荐评分列表

对推荐结果列表,进行排序处理,输出排序后的推荐结果。

  • key:用户ID
  • val:推荐结果数据框
$key
[1] 1 1 1 1 1 1 1 2 2 2 2 2 2 2 3 3 3 3 3 3 3 4 4 4 4 4 4 4 5 5 5 5 5 5 5

$val
user item pref
1 1 101 44.0
2 1 103 39.0
3 1 104 33.5
4 1 102 31.5
5 1 106 18.0
6 1 105 15.5
7 1 107 5.0
8 2 101 45.5
9 2 103 41.5
10 2 104 36.0
11 2 102 32.5
12 2 106 20.5
13 2 105 15.5
14 2 107 4.0
15 3 101 40.0
16 3 104 38.0
17 3 105 26.0
18 3 103 24.5
19 3 102 18.5
20 3 106 16.5
21 3 107 15.5
22 4 101 63.0
23 4 104 55.0
24 4 103 53.5
25 4 102 37.0
26 4 106 33.0
27 4 105 26.0
28 4 107 9.5
29 5 101 68.0
30 5 104 59.0
31 5 103 56.5
32 5 102 42.5
33 5 106 34.5
34 5 105 32.0
35 5 107 11.5

6. rmr2使用提示:

1) rmr.options(backend = ‘hadoop’)

这里backend有两个值,hadoop,local。hadoop是默认值,使用hadoop环境运行程序。local是一个本地测试的设置,已经不建议再使用。我在开发时,试过local设置,运行速度非常快,模拟了hadoop的运行环境。但是,local模式下的代码,不能和hadoop模式下完全兼容,变动也比较大,因此不建议大家使用。

2) equijoin(…,outer=c(‘left’))

这里outer包括了4个值,c(“”, “left”, “right”, “full”),非常像数据库中两个表的join操作

3) keyval(k,v)

用于MapReduce的操作需要key和valve保存数据。如果直接输出或者输出的未加key,会有一个警告:Converting to.dfs argument to keyval with a NULL key。再上一节中,rmr2的例子中就有类似的情况,请大家注意修改代码。

> to.dfs(1:10)
Warning message:
In to.dfs(1:10) : Converting to.dfs argument to keyval with a NULL key

下面展示完成的运行代码:

> library(rmr2)  # 加载rmr2包
> train<-read.csv(file="small.csv",header=FALSE)  # 输入数据文件
> names(train)<-c("user","item","pref")
> rmr.options(backend = 'hadoop')  # 使用rmr的hadoop格式,hadoop是默认设置。
> train.hdfs = to.dfs(keyval(train$user,train))  # 把数据集存入HDFS
> from.dfs(train.hdfs)    #打印计算结果

#STEP 1, 建立物品的同现矩阵
# 1) 按用户分组,得到所有物品出现的组合列表。
> train.mr<-mapreduce(
+  train.hdfs,
+  map = function(k, v) {
+    keyval(k,v$item)
+  }
+  ,reduce=function(k,v){
+    m<-merge(v,v)
+    keyval(m$x,m$y)
+  }
+ )
> from.dfs(train.mr)    #打印计算结果

# 2) 对物品组合列表进行计数,建立物品的同现矩阵
> step2.mr<-mapreduce(
+  train.mr,
+  map = function(k, v) {
+    d<-data.frame(k,v)
+    d2<-ddply(d,.(k,v),count)
+    key<-d2$k
+    val<-d2
+    keyval(key,val)
+  }
+)
> from.dfs(step2.mr)    #打印计算结果

# 2. 建立用户对物品的评分矩阵
> train2.mr<-mapreduce(
+  train.hdfs,
+  map = function(k, v) {
+    #df<-v[which(v$user==3),]
+    df<-v
+    key<-df$item
+    val<-data.frame(item=df$item,user=df$user,pref=df$pref)
+    keyval(key,val)
+  }
+)
> from.dfs(train2.mr)    #打印计算结果

#3. 合并同现矩阵和评分矩阵
> eq.hdfs<-equijoin(
+  left.input=step2.mr,
+  right.input=train2.mr,
+  map.left=function(k,v){
+    keyval(k,v)
+  },
+  map.right=function(k,v){
+    keyval(k,v)
+  },
+  outer = c("left")
+)
> from.dfs(eq.hdfs)    #打印计算结果

# 4. 计算推荐结果列表
> cal.mr<-mapreduce(
+  input=eq.hdfs,
+  map=function(k,v){
+    val<-v
+    na<-is.na(v$user.r)
+    if(length(which(na))>0) val<-v[-which(is.na(v$user.r)),]
+    keyval(val$k.l,val)
+  }
+  ,reduce=function(k,v){
+    val<-ddply(v,.(k.l,v.l,user.r),summarize,v=freq.l*pref.r)
+    keyval(val$k.l,val)
+  }
+)
> from.dfs(cal.mr)    #打印计算结果

# 5. 按输入格式得到推荐评分列表
> result.mr<-mapreduce(
+  input=cal.mr,
+  map=function(k,v){
+    keyval(v$user.r,v)
+  }
+  ,reduce=function(k,v){
+    val<-ddply(v,.(user.r,v.l),summarize,v=sum(v))
+    val2<-val[order(val$v,decreasing=TRUE),]
+    names(val2)<-c("user","item","pref")
+    keyval(val2$user,val2)
+  }
+)
> from.dfs(result.mr)   #打印计算结果

本节提供了R语言基于MapReduce的方法实现协同过滤算法的一种思路,算法可能不是最优的,希望大家有时间写出更好的算法来!随着R语言及Hadoop的发展,相信会有越来越多的算法应用会使用这种方式!

如果想对比Java基于MapReduce实现基于物品的协同过滤算法,可以参考作者的博客文章用“Hadoop构建电影推荐系统”和“Mahout分步式程序开发 基于物品的协同过滤ItemCF”。

results matching ""

    No results matching ""