Spark 第七章 Spark 机器学习库实验手册

文章导航

«返回课程汇总页面

【实验手册版本】

当前版本号v20200509

版本修改说明
v20200509初始化版本

实验前准备

  1. 确保虚拟机连上网络,安装 Python 依赖库 numpy
sudo pip3 install numpy

实验7.1:基本概念-本地向量、转换器、评估器、参数

【实验名称】 实验7.1:基本概念-本地向量、转换器、评估器、参数

【实验目的】

理解本地向量、转换器、评估器、参数

【实验原理】

【实验环境】

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下代码,请对输出label,prob,prediction的意义进行说明。
from __future__ import print_function

# $example on$
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# $example off$
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("EstimatorTransformerParamExample")\
        .getOrCreate()

    # label 列为标记,features 是特征向量
    training = spark.createDataFrame([
        (1.0, Vectors.dense([0.0, 1.1, 0.1])),
        (0.0, Vectors.dense([2.0, 1.0, -1.0])),
        (0.0, Vectors.dense([2.0, 1.3, 1.0])),
        (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

    # 创建一个逻辑回归算法,算法是 Estimator.
    # maxIter 最大迭代次数,regParam 是正则化参数
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    # Print out the parameters, documentation, and any default values.
    # print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

    # 算法是一个Estimator,学习训练数据以后,会返回一个模型,模型是Transformer
    model1 = lr.fit(training)

    #下面两行去掉注释,会打印model1的相关参数
    #print("Model 1 was fit using parameters: ")
    #print(model1.extractParamMap())

    # paramMaps可以调整算法的参数,是字典类型
    paramMap = {lr.maxIter: 20}
    paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
    #调整了算法正则系数regParam,和判断概率阈值
    paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

    #你可以通过合并字典修改参数。
    paramMap2 = {lr.probabilityCol: "probability"}  # 这个可以修改预测列名称
    paramMapCombined = paramMap.copy()
    paramMapCombined.update(paramMap2)

    # 调整参数以后再学习一个新的模型
    # paramMapCombined overrides all parameters set earlier via lr.set* methods.
    model2 = lr.fit(training, paramMapCombined)
     #下面两行去掉注释,会打印model2的相关参数
    #print("Model 2 was fit using parameters: ")
    #print(model2.extractParamMap())

    # 测试数据
    test = spark.createDataFrame([
        (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
        (0.0, Vectors.dense([3.0, 2.0, -0.1])),
        (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

    predictions = (model1.transform(test),model2.transform(test))


    for prediction in predictions:
        result = prediction.select("features", "label", "probability", "prediction").collect()
        for row in result:
            print("features=%s, label=%s -> prob=%s, prediction=%s "
              % (row.features, row.label, row.probability, row.prediction))
        print("\n")

    spark.stop()

实验7.2 :逻辑回归算法

【实验名称】 实验7.2 :逻辑回归算法

【实验目的】

掌握Pipeline、逻辑回归的用法

【实验原理】

【实验环境】

【实验步骤】

  1. 阅读并运行以下代码,请对代码输出的意义进行说明。
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark.mllib.linalg import Vectors

#密集向量
dv = Vectors.dense(2, 5, 8)
#print(dv[1])

#稀疏向量
sv=Vectors.sparse(4, (1, 2, 3), (3, 5, 7))
#print(sv[0])

from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Tokenizer,HashingTF
from pyspark.ml.classification import LogisticRegression
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

#分词 Transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")


tokenizer.transform(training).show()

#hash分桶及词频率统计 Transformer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

#逻辑回归算法,算法是一个Estimator


#v=hashingTF.transform(tokenizer.transform(training)).select('features').first()
#for val in v.features.toArray():
#    idx=idx+1
#    if val!=0:
#        print('%d:%f'%(idx,val))


lr = LogisticRegression(maxIter=10, regParam=0.001,threshold=0.5)
#Pipeline 连接 Transformer 和 Estimator
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
#训练出模型,模型是 Transformer
model = pipeline.fit(training)

#测试数据(不含标签)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

#模型对测试数据进行预测,得出预测结果(DataFrame)
prediction = model.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

实验7.3 :K-Means 算法

【实验名称】 实验7.3 :K-Means 算法

【实验目的】

掌握K-Means的用法

【实验原理】

K-Means

选择K个点作为初始质心

repeat

until 簇不发生变化或达到最大迭代次数。

【实验环境】

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下代码,请对代码输出的意义进行说明。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

#读取数据
dataset = spark.read.format("libsvm").load("file:/opt/spark/data/mllib/sample_kmeans_data.txt")

#算法
kmeans = KMeans().setK(2).setSeed(1)

#训练数据得出模型
model = kmeans.fit(dataset)


# Make predictions
predictions = model.transform(dataset)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

实验7.4 :协同过滤算法

【实验名称】 实验7.4 :协同过滤算法

【实验目的】

掌握基于Spark MLLib的协同过滤。

【实验原理】

ALS 是交替最小二乘 (alternating least squares)的简称。 ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。 从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。 用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。 假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵,其中的元素表示第u个User对第i个Item的评分。 在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。 另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。矩阵中所缺失的评分,又叫做missing item。

ALS算法举例说明:

(1) 下面的矩阵R表示:观众对电影的喜好,即:打分的情况。注意:实际情况下,这个矩阵可能非非常庞大,并且是一个稀疏矩阵。 矩阵R

(2) 这时,我们可以把这个大的稀疏矩阵R,拆分成两个小一点的矩阵:U和V。通过U和V来近似表示R,如下图

其中:  U矩阵代表:用户的特征,包括三个维度:性格,文化程度,兴趣爱好

 V矩阵代表:电影的特征,也包括三个维度:性格,文化程度,兴趣爱好

(3) 这样,U和V的乘积,近似表示R。

(4) 但是,这样的表示是存在误差的,因为对于一个U矩阵来说,我们并不可能说(性格,文化程度,兴趣爱好)这三个属性就代表着一个人对一部电影评价全部的属性,比如还有地域等因素。这个误差,我们用RMSE(均方根误差)表示。

【实验环境】

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下用户协同过滤算法和物品协同过滤算法代码,请对代码输出的意义进行说明。
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("UserBasedExample")\
        .getOrCreate()

sc = spark.sparkContext

# 读入数据
data = sc.textFile("file:/home/hadoop/spark-exp/exp7/ratingdata.txt")

# // MatrixEntry代表:矩阵中的一行
# // 使用模式匹配
# MatrixEntry
parseData = data.map(lambda x:x.split(","))

# * 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。
# * 其中i是行坐标,j是列坐标,value是值。*/
def func(x):
    arr=x.split(",")
    if(len(arr) == 3):
         return MatrixEntry(int(arr[0]), int(arr[1]), float(arr[2]))
parseData = data.map(func)

# CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的
ratings = CoordinateMatrix(parseData)

"""
由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性
RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,需要转置(transpose)成item_user_rating,这样才是用户的相似
"""

matrix = ratings.transpose().toRowMatrix()
# 计算用户的相似性,并输出
similarities = matrix.columnSimilarities()
print("用户相似性矩阵")
li = similarities.entries.sortBy(lambda x:x.i).collect()
for x in li:
    print(str(x.i) + "->" + str(x.j) + "->" + str(x.value))

ratingOfUser1 =ratings.entries.filter(lambda x:x.i ==1).map(lambda x:(x.j,x.value)).sortBy(lambda x:x[0])
print("\n")
for s in ratingOfUser1.collect():
    print(s)

allRatingValOfUser1=ratingOfUser1.map(lambda x:x[1]).collect()
avgRatingOfUser1 = sum(allRatingValOfUser1)/len(allRatingValOfUser1)
print("\n" + str(avgRatingOfUser1))

otherRatingsToItem1=ratings.entries.filter(lambda x:(x.i !=1 and x.j==101)).map(lambda x:(x.i,x.j,x.value)).sortBy(lambda x:x[0])

for s in otherRatingsToItem1.collect():
    print(s)

weights =similarities.entries.filter(lambda x:x.i==1).sortBy(lambda x:x.value).map(lambda x:(x.i,x.j,x.value)).collect()

for s in weights:
    print(s)
  1. 阅读并运行以下ASL过滤算法代码,请对代码输出的意义进行说明。
#设置这个选项打印所有的值,如果不使用Jupytor Notebook,请注释下面2行
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark.ml.recommendation import ALS
df = spark.createDataFrame(
    [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0), (0, 3, 3.0), (1, 3, 4.0)],
    ["user", "item", "rating"])
# rank 特征数量k,默认是10
als = ALS(rank=5, maxIter=5, seed=0)
model = als.fit(df)

print('# 用户特征向量U')
model.userFactors.orderBy("id").collect()

print('# 物品特征向量V')
model.itemFactors.orderBy("id").collect()

test = spark.createDataFrame([(0, 2), (1, 0), (2, 0),(2, 3)], ["user", "item"])
predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
print('# 对于未知的用户物品推荐度预测结果')
for i in predictions:
    print(i)

user_recs = model.recommendForAllUsers(4)


print(user_recs.where(user_recs.user == 0).select("recommendations.item", "recommendations.rating").collect())


item_recs = model.recommendForAllItems(3)
print(item_recs.where(item_recs.item == 2).select("recommendations.user", "recommendations.rating").collect())


user_subset = df.where(df.user == 2)
user_subset_recs = model.recommendForUserSubset(user_subset, 3)
print(user_subset_recs.select("recommendations.item", "recommendations.rating").first())


item_subset = df.where(df.item == 0)
item_subset_recs = model.recommendForItemSubset(item_subset, 3)
print(item_subset_recs.select("recommendations.user", "recommendations.rating").first())