Spark ALS算法进行矩阵分解,U * V = Q
如果数据不是运行在集群上,而是运行在本地,为了保证内存充足,在启动spark-shell时需要指定参数--driver-memory 6g。
-
- 数据集
艺术家点播数据集:
用户和艺术家的关系是通过其他行动隐含提现出来的,例如播放歌曲或专辑,而不是通过显式的评分或者点赞得到的。这被称为隐式反馈数据。现在的家用电视点播也是这样,用户一般不会主动评分。
数据集下载地址是http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
-
- 数据处理
Spark MLib的ALS算法实现有一个小缺点:它要求用户和产品的ID必须是数值型,并且是32位非负整数,这意味着大于Integer.MAX_VALUE(2147483647)的ID是非法的。我们首先看看数据集是否满足要求:
scala> val rawUserArtistData = sc.textFile("D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt")
rawUserArtistData: org.apache.spark.rdd.RDD[String] = D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
res0: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)
scala> rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
res1: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)
过滤数据
val rawArtistData = sc.textFile("hdfs:///user/ds/artist_data.txt")
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
-
- 训练算法
首先将数据转换成Rating
import org.apache.spark.mllib.recommendation._
val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
val finalArtistID =
bArtistAlias.value.getOrElse(artistID, artistID)
Rating(userID, finalArtistID, count)
}.cache()
val
model =
ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
//
查看特征变量:
model.userFeatures.mapValues(_.mkString(", ")).first()
//我们可以对此用户做出5个推荐:
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
-
- 选择超参数
计算AUC这部分代码没有试。AUC(Area Under ROC Curve)是ROC(Receiver Operating Characteristic,受试者工作特征)线,它源于二战中用于敌机检测的雷达信号分析技术。在非均等代价下,ROC曲线不能直接反映出学习器的期望总体代价,而“代价曲线”则可达到该目的。
机器学习常涉及两类参数:一类是算法的参数,亦称“超参数”,数目常在10以内;另一类是模型的参数,数目可能很多。前者通常是由人工设定多个参数候选值后产生模型,后者则是通过学习来产生多个候选模型。
ALS.trainImplicit()的参数包括以下几个:rank
模型的潜在因素的个数,即“用户-特征”和“产品-特征”矩阵的列数;一般来说,它也是矩阵的阶。iterations
矩阵分解迭代的次数;迭代的次数越多,花费的时间越长,但分解的结果可能会更好。lambda 标准的过拟合参数;值越大越不容易产生过拟合,但值太大会降低分解的准确度。lambda取较大的值看起来结果要稍微好一些。alpha 控制矩阵分解时,被观察到的“用户-产品”交互相对没被观察到的交互的权重。40是最初ALS论文的默认值,这说明了模型在强调用户听过什么时的表现要比强调用户没听过什么时要好。
-
- 应用(产生推荐)
val someUsers = allData.map(_.user).distinct().take(100)
val someRecommendations =
someUsers.map(userID => model.recommendProducts(userID, 5))
someRecommendations.map(
recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")
).foreach(println)
-
- Rating数据代码
def run(filename: String) {
println("mlALS is running!")
val data = getData(filename)
println(s"dataLen:${ data.count}")
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("newUserId")
.setItemCol("newMovieId")
.setRatingCol("newRating");
val split = data.randomSplit(Array(0.8, 0.2))
val model = als.fit(split(0));
println(s"userCol name:${ model.getUserCol}")
val predictions = model.transform(split(1))
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("newRating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
val userCommd = model.recommendForAllUsers(10)
userCommd.show()
}
def getData(filename: String) :Dataset[Row] = {
val sqlContext = new SQLContext(sc.sparkContext)
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> filename, "header" -> "true"))
df.columns.map(col => println(s"col:${col}"))
val newdf = df.select(df.col("userId").cast(IntegerType).as("newUserId"),
df.col("movieId").cast(IntegerType).as("newMovieId"),
df.col("rating").cast(DoubleType).as("newRating"),
df.col("timestamp").cast(LongType).as("newTimeStamp"))
newdf
}
-
- 数据正则化
val dataFrame = sqlContext.read.format("libsvm").load("data/libsvm.txt")
// L1正则化
val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures")
// 设置 L1正则化
.setP(1.0)
// 正则化转换
val l1NormData = normalizer.transform(dataFrame)
// L2正则化
val l2InfNormData = normalizer.transform(dataFrame, normalizer.p -> 2)
l2InfNormData.foreach(println)