利用Spark构建分布式电影协同过滤推荐系统
一、数据采集
使用MovieLen的开放数据集作为数据源,包含了6000个用户对4000个电影的评分数据,大概有100万条评分数据。数据集也可以从这个网址下载。数据集一共有3个文件:
movie.date(电影ID::电影名称::标签)
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
rating.dat(用户id::电影ID::评分::时间戳)
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
user.dat(用户ID::性别::年龄::Occupation::Zip-code)
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
二、数据处理
使用spark的Mlib库需要添加依赖,主要需要注释掉scope这一行,不然还是会报错
object mllib is not a member of package org.apache.spark
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.0</version>
<!-- <scope>runtime</scope> -->
</dependency>
1.读取评分数据和电影数据
package Sparkstreaming
import org.apache.spark.{SparkConf,SparkContext}
object Movie {
def main(args: Array[String]): Unit = {
//设置spark环境
val conf=new SparkConf().setMaster("local[2]").setAppName("movie")
val sc=new SparkContext(conf)
//读取电影,转成map类型
val movie=sc.textFile("D:\\Python\\ml-1m\\movies.dat").map{
line=>
val str=line.split("::")
//获取电影ID和电影名称
(str(0).toInt,str(1))
}.collect().toMap
//println(movie)
//读取评分数据
val rate=sc.textFile("D:\\Python\\ml-1m\\ratings.dat").map{
line=>
val str=line.split("::")
//生成根据时间戳排列的用户电影评分数据(0,4344,1974,1.0)
val rating=Rating(str(0).toInt,str(1).toInt,str(2).toDouble)
val timestamp=str(3).toLong % 10
(timestamp,rating)
}
//rate.foreach(print)
val moviecount=rate.map(_._3).distinct().count()
val usercount=rate.map(_._2).distinct().count()
val totalcount=rate.count()
println("一共有"+totalcount+"条数据,其中电影条数为"+moviecount+"用户数为"+usercount)
}
}
结果:
2.划分测试集和训练集,计算均方误差RMSE 得出最佳参数:
// 划分训练集和测试集(训练集60%,测试集40%)
val splits = rate.randomSplit(Array(0.6, 0.4), seed =2)
val training= splits(0).repartition(4).cache().map(_._2)
val test = splits(1).repartition(4).cache().map(_._2)
//定义函数计算均方误差RMSE
def Rmse(model: MatrixFactorizationModel,data:RDD[Rating]):
Double={
val predict=model.predict(data.map(x=>(x.user,x.product)))map{x=>((x.user,x.product),x.rating)}
val actual=data.map(x=>((x.user,x.product),x.rating))
val predictrate=predict.join(actual).values
math.sqrt(predictrate.map(x=>(x._1-x._2)*(x._1-x._2)).mean())
}
//测试合适的参数
val ranks=List(8,12)//矩阵的秩
val regus=List(1.0,10.0)//正则系数
val iters=List(10,20)//迭代次数
var bestmodel:Option[MatrixFactorizationModel]=None
var bestRmse=Double.MaxValue
var bestrank=0
var bestregu = -1.0
var bestiter = -1
for (rank<-ranks;regu<-regus;iter<-iters){
val model=ALS.train(training,rank,iter,regu)
val valiRmse=Rmse(model,test)
if (valiRmse<bestRmse){
bestmodel=Some(model)
bestRmse=valiRmse
bestrank=rank
bestregu=regu
bestiter=iter
}
}
val testRmse=Rmse(bestmodel.get,test)
println("bestrank:"+bestrank+",bestiter:"+bestiter+",bestregu:"+bestregu+",testRmse:"+testRmse)
过程可能有的久,结果:
所以轶是8,正则系数是1.0,迭代次数是10,均方误差为1.35
3.使用该模型为用户进行推荐
//使用该模型为用户推荐电影
val userid=1
// 求出第一个用户所有评分过的电影
val user1RatedMovieIds = rate.filter(_._2.user==userid).map(_._2.product).collect.toSeq
// 求出第一个用户没有评分过的电影
val cands = sc.parallelize(movie.keys.filter(!user1RatedMovieIds.contains(_)).toSeq)
// 使用模型来对潜在推荐电影进行评分,并按照预测评分进行排序,取前10
val recommendations = model.predict(cands.map((userid,_))).collect.sortBy(- _.rating).take(10)
var i =1
println("的推荐结果为:")
recommendations.foreach{rec => println("%2d".format(i)+": "+movie(rec.product)+", predictRating: "+rec.rating);i+=1}
结果:
完整代码:
package Sparkstreaming
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
object Movie {
def main(args: Array[String]): Unit = {
//设置spark环境
val conf=new SparkConf().setMaster("local[2]").setAppName("movie")
val sc=new SparkContext(conf)
sc.setLogLevel("WARN")
//读取电影,转成map类型
val movie=sc.textFile("D:\\Python\\ml-1m\\movies.dat").map{
line=>
val str=line.split("::")
//获取电影ID和电影名称
(str(0).toInt,str(1))
}.collect().toMap
//println(movie)
//读取评分数据
val rate=sc.textFile("D:\\Python\\ml-1m\\ratings.dat").map{
line=>
val str=line.split("::")
//生成根据时间戳排列的用户电影评分数据(0,4344,1974,1.0)
val rating=Rating(str(0).toInt,str(1).toInt,str(2).toDouble)
val timestamp=str(3).toLong % 10
(timestamp,rating)
}
//rate.foreach(print)
val moviecount=rate.map(_._2.product).distinct().count()
val usercount=rate.map(_._2.user).distinct().count()
val totalcount=rate.count()
println("一共有"+totalcount+"条数据,其中电影条数为"+moviecount+"用户数为"+usercount)
// 划分训练集和测试集(训练集60%,测试集40%)
val splits = rate.randomSplit(Array(0.6, 0.4), seed =2)
val training= splits(0).repartition(4).cache().map(_._2)
val test = splits(1).repartition(4).cache().map(_._2)
//定义函数计算均方误差RMSE
def Rmse(model: MatrixFactorizationModel,data:RDD[Rating]):
Double={
val predict=model.predict(data.map(x=>(x.user,x.product)))map{x=>((x.user,x.product),x.rating)}
val actual=data.map(x=>((x.user,x.product),x.rating))
val predictrate=predict.join(actual).values
math.sqrt(predictrate.map(x=>(x._1-x._2)*(x._1-x._2)).mean())
}
//测试合适的参数
val ranks=List(8,12)//矩阵的秩
val regus=List(1.0,10.0)//正则系数
val iters=List(10,20)//迭代次数
var bestmodel:Option[MatrixFactorizationModel]=None
var bestRmse=Double.MaxValue
var bestrank=0
var bestregu = -1.0
var bestiter = -1
for (rank<-ranks;regu<-regus;iter<-iters){
val model=ALS.train(training,rank,iter,regu)
val valiRmse=Rmse(model,test)
if (valiRmse<bestRmse){
bestmodel=Some(model)
bestRmse=valiRmse
bestrank=rank
bestregu=regu
bestiter=iter
}
val testRmse=Rmse(bestmodel.get,test)
println("bestrank:"+bestrank+",bestiter:"+bestiter+",bestregu:"+bestregu+",testRmse:"+testRmse)
//bestrank:8,bestiter:10,bestregu:1.0,testRmse:1.352360900171576
//使用该模型为用户推荐电影
val userid=1
// 求出第一个用户所有评分过的电影
val user1RatedMovieIds = rate.filter(_._2.user==userid).map(_._2.product).collect.toSeq
// 求出第一个用户没有评分过的电影
val cands = sc.parallelize(movie.keys.filter(!user1RatedMovieIds.contains(_)).toSeq)
// 使用模型来对潜在推荐电影进行评分,并按照预测评分进行排序,取前10
val recommendations = model.predict(cands.map((userid,_))).collect.sortBy(- _.rating).take(10)
var i =1
println("推荐结果为:")
recommendations.foreach{rec => println("%2d".format(i)+": "+movie(rec.product)+", predictRating: "+rec.rating);i+=1}
}
}
}