基于Spark-ALS的协同过滤算法推荐

基于Spark-ALS的协同过滤算法推荐

1.协同过滤介绍

协同过滤是一种根据用户对各种产品的交互与评分来推荐新产品的推荐系统技术。
协同过滤吸引人的地方就在于它只需要输入一系列用户/产品的交互记录,协同过滤算法能够根据这些交互记录知道哪些产品之间比较相似(因为相同的用户与它们发生了交互)以及哪些用户之间比较相似,然后可以作出新的推荐。
交互记录分为“显式”反馈(例如在购物网站上进行评分)和“隐式”反馈(例如用户访问了一个产品的页面但是没有对产品评分)。

2.显示反馈与隐式反馈

推荐系统依赖不同类型的输入数据,最方便的是高质量的显式反馈数据,它们包含用户对感兴趣商品明确的评价。例如,收集的用户对电影评价的星星等级数据。但是显式反馈数据不一定总是找得到,因此推荐系统可以从更丰富的隐式反馈信息中推测用户的偏好。
隐式反馈类型包括购买历史、浏览历史、搜索模式甚至鼠标动作。例如,购买同一个作者许多书的用户可能喜欢这个作者。
许多研究都集中在处理显式反馈,然而在很多应用场景下,应用程序重点关注隐式反馈数据。因为可能用户不愿意评价商品或者由于系统限制我们不能收集显式反馈数据。在隐式模型中,一旦用户允许收集可用的数据,在客户端并不需要额外的显式数据。

3.协同过滤的常用算法——交替最小二乘(ALS)

在矩阵分解(matrix factorization)中使用的一种算法。有一个稀疏矩阵,假设这个矩阵是低阶的,可以分解成两个小矩阵相乘。然后交替对两个小矩阵使用最小二乘法,算出这两个小矩阵,就可以估算出稀疏矩阵缺失的值。

算法内容

基于Spark-ALS的协同过滤算法推荐

矩阵分解模型

用户对物品的打分行为可以表示成一个评分矩阵A(m*n),表示m个用户对n各物品的打分情况。如下表所示:
基于Spark-ALS的协同过滤算法推荐
其中,A(i,j)表示用户i对物品的打分。但是,用户不会对所有物品打分,表中”?”表示用户没有打分的情况,所以这个矩阵A很多元素都是空的,我们称其为“缺失值”。协同过滤提出了一种支持不完整评分矩阵的矩阵分解方法,不用对评分矩阵进行估值填充。在推荐系统中,我们希望得到用户对所有物品的打分情况,如果用户没有对一个物品打分,那么就需要预测用户是否会对该物品打分,以及会打多少分。这就是所谓的“矩阵补全”。

ALS 的核心假设是:打分矩阵A是近似低秩的,即一个m∗n的打分矩阵 A 可以用两个小矩阵X(m∗k)和Y(n∗k)的乘积来近似
我们把打分理解成相似度,那么“打分矩阵A(m∗n)”就可以由“用户喜好特征矩阵X(m∗k)”和“产品特征矩阵Y(n∗k)”的乘积。
我们使用用户喜好特征矩阵X(m∗k)中的第i个用户的特征向量Xi,和产品特征矩阵Y(n∗k)第j个产品的特征向量vj来预测打分矩阵A(m∗n)中的aij。我们可以得出一下的矩阵分解模型的损失函数为:
基于Spark-ALS的协同过滤算法推荐

在上面的公式中,rui表示评分数据集中用户i对产品j的真实评分,另外一部分表示用户i的特征向量(转置)*产品j的特征向量(这里可以得到预测的i对j的评分)后面一部分是为了防止过拟合,加入正则化项。为了找到使得矩阵X和Y的乘积尽可能地逼近R,采用最小化平方误差损失函数。
至此,协同过滤问题转化为一个优化问题。引用ALS:先固定Y,由上式求解X,然后再固定求得的X,求解Y,如此交替执行直到误差满足阈值条件或者到达迭代次数上限。

推导过程

推导过程: 先随机生成Y,固定之,对损失函数L(X, Y)在xu上求偏导,并令其导数=0:

基于Spark-ALS的协同过滤算法推荐

同理,由对称性得:
基于Spark-ALS的协同过滤算法推荐

如此交替执行直到误差满足阈值条件或者到达迭代次数上限。

隐式反馈模型

以上针对显式反馈,对于隐式反馈的情形,损失函数如下:
基于Spark-ALS的协同过滤算法推荐

其中pui表示用户X对商品Y的偏好。只有pui的话会单纯的认为如果用户购买了商品,表示用户喜欢该商品,否则表示用户不喜欢该商品。但是,用户购买一个商品也并不一定是用户喜欢它,可能是作为礼物才购买的。因此我们需要一个新的信任等级来显示用户偏爱某个商品,引入变量Cui,它衡量了我们观察到pui的信任度。rui表示动作的频率,比如购买、加购物车、收藏、点击的次数,或者观看/收听视频/音频的时长等等,α是置信度系数。按照这种方式,我们存在最小限度的信任度,并且随着我们观察到的正偏向的证据越来越多,信任度也会越来越大。
推导过程类似显式反馈的公式推导,结果如下:
基于Spark-ALS的协同过滤算法推荐

4.算法实战

ALS流程图:

基于Spark-ALS的协同过滤算法推荐以商品推荐为例进行,使用的数据按照MLlib中ALS算法固有的数据格式。其中源码如下:这里用户和商品分别用代号表示,然后rating就是用户对于该商品的评分。
case class Rating @Since(“0.8.0”) (
@Since(“0.8.0”) user: Int,
@Since(“0.8.0”) product: Int,
@Since(“0.8.0”) rating: Double)
数据集格式如下:
用户代号 商品代号 用户对该商品的评分
1 11 2
1 12 3
1 13 1
1 14 0
1 15 1
2 11 1
2 12 2
2 13 2
2 14 1
2 15 4

ALS算法的第二步就是数据建模,其实在MLlib算法库中有可以直接使用的训练算法,ALS.tran方法源码如下:
def train(
ratings: RDD[Rating], //需要训练的数据集
rank: Int, //模型中隐藏因子数
iterations: Int, //算法中迭代次数
lambda: Double, //ALS中的正则化参数
blocks: Int, //并行计算的block数(-1为自动配置)
alpha: Double, //ALS隐式反馈变化率用于控制每次拟合修正的幅度
seed: Long //加载矩阵的随机数
): MatrixFactorizationModel = {
new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.{SparkConf, SparkContext}
object CollaborativeFilter {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
.setAppName(“CollaborativeFilter”)
.setMaster(“local”)
val sc=new SparkContext(conf) //设置数据集
val data=sc.textFile(“C:\Users\Desktop\d.txt”)
val ratings=data.map(.split(" ") match { //处理数据
case Array(user,item,rate) => //转化数据集
Rating(user.toInt,item.toInt,rate.toDouble) //将数据集转化为专用的Rating
})
val rank=2 //设置隐藏因子
val numIterations=2 //设置迭代次数
val model=ALS.train(ratings,rank,numIterations,0.01) //进行模型训练
val rs=model.recommendProducts(2,1) //为用户2推荐一个商品
rs.foreach(println) //打印推荐结果
val result:Double=rs(0).rating //预测的评分结果
val realilty=data.map(
.split(" ") match {
case Array(user,item,rate) =>
Rating(user.toInt,item.toInt,rate.toDouble)
}).map(num=>{
if(num.use r== 2 && num.product == 15)
num.rating //返回实际评分结果
else 0
}).foreach(num=>{
if(num!=0)
println(“对15号商品预测的准确率为:”+(1-(math.abs(result-num)/1)))
})
}
}
运行结果:
Rating(2,15,3.97662718480575)
对15号商品预测的准确率为:0.9941567962014375
根据结果显示,为第二个用户优先推荐了编号为15的物品,同时预测的评分为3.97662718480575,而实际值为4,准确率为0.9941567962014375