第七章 Spark 机器学习库实验手册

2020-05-09
9分钟阅读时长

【版本】

当前版本号v20200509

版本修改说明
v20200509初始化版本

实验前准备

  1. 确保虚拟机连上网络,安装 Python 依赖库 numpy
sudo pip3 install numpy

实验7.1:基本概念-本地向量、转换器、评估器、参数

【实验名称】 实验7.1:基本概念-本地向量、转换器、评估器、参数

【实验目的】

理解本地向量、转换器、评估器、参数

【实验原理】

  • (1)本地向量:Mllib支持两种类型的本地向量:密集向量(dense)和稀疏向量(sparse)。密集向量只有一个浮点数组组成,而一个稀疏向量必须有索引和一个浮点向量组成。例如,(2.1,3.2,4.3)代表一个密集向量。(3,[1.1,2.3],[5.6,4.3,4.4])代表一个稀疏向量。

  • (2)Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。 技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame

  • (3)Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生成一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。比如,一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

  • (4)Parameter:所有Transformers和Estimators现在共享一个用于指定参数的通用API

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm或pyspark交互命令

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下代码,请对输出label,prob,prediction的意义进行说明。
from __future__ import print_function

# $example on$
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# $example off$
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("EstimatorTransformerParamExample")\
        .getOrCreate()

    # label 列为标记,features 是特征向量
    training = spark.createDataFrame([
        (1.0, Vectors.dense([0.0, 1.1, 0.1])),
        (0.0, Vectors.dense([2.0, 1.0, -1.0])),
        (0.0, Vectors.dense([2.0, 1.3, 1.0])),
        (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

    # 创建一个逻辑回归算法,算法是 Estimator.
    # maxIter 最大迭代次数,regParam 是正则化参数
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    # Print out the parameters, documentation, and any default values.
    # print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

    # 算法是一个Estimator,学习训练数据以后,会返回一个模型,模型是Transformer
    model1 = lr.fit(training)

    #下面两行去掉注释,会打印model1的相关参数
    #print("Model 1 was fit using parameters: ")
    #print(model1.extractParamMap())

    # paramMaps可以调整算法的参数,是字典类型
    paramMap = {lr.maxIter: 20}
    paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
    #调整了算法正则系数regParam,和判断概率阈值
    paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

    #你可以通过合并字典修改参数。
    paramMap2 = {lr.probabilityCol: "probability"}  # 这个可以修改预测列名称
    paramMapCombined = paramMap.copy()
    paramMapCombined.update(paramMap2)

    # 调整参数以后再学习一个新的模型
    # paramMapCombined overrides all parameters set earlier via lr.set* methods.
    model2 = lr.fit(training, paramMapCombined)
     #下面两行去掉注释,会打印model2的相关参数
    #print("Model 2 was fit using parameters: ")
    #print(model2.extractParamMap())

    # 测试数据
    test = spark.createDataFrame([
        (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
        (0.0, Vectors.dense([3.0, 2.0, -0.1])),
        (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

    predictions = (model1.transform(test),model2.transform(test))


    for prediction in predictions:
        result = prediction.select("features", "label", "probability", "prediction").collect()
        for row in result:
            print("features=%s, label=%s -> prob=%s, prediction=%s "
              % (row.features, row.label, row.probability, row.prediction))
        print("\n")

    spark.stop()

实验7.2 :逻辑回归算法

【实验名称】 实验7.2 :逻辑回归算法

【实验目的】

掌握Pipeline、逻辑回归的用法

【实验原理】

  • (1)Pipeline:将Pipeline多个Transformers和Estimators 链在一起以指定ML工作流程。

  • (2)逻辑回归:在线性回归增加了一个函数g(z),能够把连续值映射到几个离散的数据,如:0、1等。

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm或pyspark交互命令

【实验步骤】

  1. 阅读并运行以下代码,请对代码输出的意义进行说明。
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark.mllib.linalg import Vectors

#密集向量
dv = Vectors.dense(2, 5, 8)
#print(dv[1])

#稀疏向量
sv=Vectors.sparse(4, (1, 2, 3), (3, 5, 7))
#print(sv[0])

from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Tokenizer,HashingTF
from pyspark.ml.classification import LogisticRegression
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

#分词 Transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")


tokenizer.transform(training).show()

#hash分桶及词频率统计 Transformer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

#逻辑回归算法,算法是一个Estimator


#v=hashingTF.transform(tokenizer.transform(training)).select('features').first()
#for val in v.features.toArray():
#    idx=idx+1
#    if val!=0:
#        print('%d:%f'%(idx,val))


lr = LogisticRegression(maxIter=10, regParam=0.001,threshold=0.5)
#Pipeline 连接 Transformer 和 Estimator
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
#训练出模型,模型是 Transformer
model = pipeline.fit(training)

#测试数据(不含标签)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

#模型对测试数据进行预测,得出预测结果(DataFrame)
prediction = model.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

实验7.3 :K-Means 算法

【实验名称】 实验7.3 :K-Means 算法

【实验目的】

掌握K-Means的用法

【实验原理】

K-Means

选择K个点作为初始质心

repeat

  • 将每个点指派到最近的质心,形成K个簇
  • 重新计算每个簇的质心

until 簇不发生变化或达到最大迭代次数。

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm或pyspark交互命令

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下代码,请对代码输出的意义进行说明。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

#读取数据
dataset = spark.read.format("libsvm").load("file:/opt/spark/data/mllib/sample_kmeans_data.txt")

#算法
kmeans = KMeans().setK(2).setSeed(1)

#训练数据得出模型
model = kmeans.fit(dataset)


# Make predictions
predictions = model.transform(dataset)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

实验7.4 :协同过滤算法

【实验名称】 实验7.4 :协同过滤算法

【实验目的】

掌握基于Spark MLLib的协同过滤。

  • 基于用户(User-Based)的协同过滤
  • 基于物品(Item-Based)的协同过滤
  • 基于ALS协同过滤

【实验原理】

  • (1)基于用户的CF(User CF) 基于用户的 CF 的基本思想相当简单,基于用户对物品的偏好找到相邻邻居用户,然后将邻居用户喜欢的推荐给当前用户。计算上,就是将一个用户对所有物品的偏好作为一个向量来计算用户之间的相似度,找到 K 邻居后,根据邻居的相似度权重以及他们对物品的偏好,预测当前用户没有偏好的未涉及物品,计算得到一个排序的物品列表作为推荐。图 2 给出了一个例子,对于用户 A,根据用户的历史偏好,这里只计算得到一个邻居 - 用户 C,然后将用户 C 喜欢的物品 D 推荐给用户 A。

  • (2)基于物品的CF(Item CF) 基于物品的 CF 的原理和基于用户的 CF 类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。从计算的角度看,就是将所有用户对某个物品的偏好作为一个向量来计算物品之间的相似度,得到物品的相似物品后,根据用户历史的偏好预测当前用户还没有表示偏好的物品,计算得到一个排序的物品列表作为推荐。图 3 给出了一个例子,对于物品 A,根据所有用户的历史偏好,喜欢物品 A 的用户都喜欢物品 C,得出物品 A 和物品 C 比较相似,而用户 C 喜欢物品 A,那么可以推断出用户 C 可能也喜欢物品 C。

  • (3)基于ALS协同过滤推荐

ALS 是交替最小二乘 (alternating least squares)的简称。 ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。 从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。 用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。 假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵,其中的元素表示第u个User对第i个Item的评分。 在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。 另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。矩阵中所缺失的评分,又叫做missing item。

ALS算法举例说明:

(1) 下面的矩阵R表示:观众对电影的喜好,即:打分的情况。注意:实际情况下,这个矩阵可能非非常庞大,并且是一个稀疏矩阵。 矩阵R

(2) 这时,我们可以把这个大的稀疏矩阵R,拆分成两个小一点的矩阵:U和V。通过U和V来近似表示R,如下图

其中:  U矩阵代表:用户的特征,包括三个维度:性格,文化程度,兴趣爱好

 V矩阵代表:电影的特征,也包括三个维度:性格,文化程度,兴趣爱好

(3) 这样,U和V的乘积,近似表示R。

(4) 但是,这样的表示是存在误差的,因为对于一个U矩阵来说,我们并不可能说(性格,文化程度,兴趣爱好)这三个属性就代表着一个人对一部电影评价全部的属性,比如还有地域等因素。这个误差,我们用RMSE(均方根误差)表示。

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm或pyspark交互命令

【实验资源】

实验数据下载

https://pan.baidu.com/s/1zQZya1OrIXNfGuh7YinKTQ#提取码jfsy

【实验步骤】

  1. 阅读并运行以下用户协同过滤算法和物品协同过滤算法代码,请对代码输出的意义进行说明。
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("UserBasedExample")\
        .getOrCreate()

sc = spark.sparkContext

# 读入数据
data = sc.textFile("file:/home/hadoop/spark-exp/exp7/ratingdata.txt")

# // MatrixEntry代表:矩阵中的一行
# // 使用模式匹配
# MatrixEntry
parseData = data.map(lambda x:x.split(","))

# * 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。
# * 其中i是行坐标,j是列坐标,value是值。*/
def func(x):
    arr=x.split(",")
    if(len(arr) == 3):
         return MatrixEntry(int(arr[0]), int(arr[1]), float(arr[2]))
parseData = data.map(func)

# CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的
ratings = CoordinateMatrix(parseData)

"""
由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性
RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,需要转置(transpose)成item_user_rating,这样才是用户的相似
"""

matrix = ratings.transpose().toRowMatrix()
# 计算用户的相似性,并输出
similarities = matrix.columnSimilarities()
print("用户相似性矩阵")
li = similarities.entries.sortBy(lambda x:x.i).collect()
for x in li:
    print(str(x.i) + "->" + str(x.j) + "->" + str(x.value))

ratingOfUser1 =ratings.entries.filter(lambda x:x.i ==1).map(lambda x:(x.j,x.value)).sortBy(lambda x:x[0])
print("\n")
for s in ratingOfUser1.collect():
    print(s)

allRatingValOfUser1=ratingOfUser1.map(lambda x:x[1]).collect()
avgRatingOfUser1 = sum(allRatingValOfUser1)/len(allRatingValOfUser1)
print("\n" + str(avgRatingOfUser1))

otherRatingsToItem1=ratings.entries.filter(lambda x:(x.i !=1 and x.j==101)).map(lambda x:(x.i,x.j,x.value)).sortBy(lambda x:x[0])

for s in otherRatingsToItem1.collect():
    print(s)

weights =similarities.entries.filter(lambda x:x.i==1).sortBy(lambda x:x.value).map(lambda x:(x.i,x.j,x.value)).collect()

for s in weights:
    print(s)
  1. 阅读并运行以下ASL过滤算法代码,请对代码输出的意义进行说明。
#设置这个选项打印所有的值,如果不使用Jupytor Notebook,请注释下面2行
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark.ml.recommendation import ALS
df = spark.createDataFrame(
    [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0), (0, 3, 3.0), (1, 3, 4.0)],
    ["user", "item", "rating"])
# rank 特征数量k,默认是10
als = ALS(rank=5, maxIter=5, seed=0)
model = als.fit(df)

print('# 用户特征向量U')
model.userFactors.orderBy("id").collect()

print('# 物品特征向量V')
model.itemFactors.orderBy("id").collect()

test = spark.createDataFrame([(0, 2), (1, 0), (2, 0),(2, 3)], ["user", "item"])
predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
print('# 对于未知的用户物品推荐度预测结果')
for i in predictions:
    print(i)

user_recs = model.recommendForAllUsers(4)


print(user_recs.where(user_recs.user == 0).select("recommendations.item", "recommendations.rating").collect())


item_recs = model.recommendForAllItems(3)
print(item_recs.where(item_recs.item == 2).select("recommendations.user", "recommendations.rating").collect())


user_subset = df.where(df.user == 2)
user_subset_recs = model.recommendForUserSubset(user_subset, 3)
print(user_subset_recs.select("recommendations.item", "recommendations.rating").first())


item_subset = df.where(df.item == 0)
item_subset_recs = model.recommendForItemSubset(item_subset, 3)
print(item_subset_recs.select("recommendations.user", "recommendations.rating").first())