第四章 Spark RDD 实验手册

2020-02-14
10分钟阅读时长

【版本】

当前版本号v20200317

版本修改说明
v20200317实验4.2,修正练习(8),应该是求交集
v20200310初始化版本

【实验名称】实验4.1:RDD 的创建

【实验目的】

  • 掌握RDD的创建的方式

【实验原理】

【实验环境】

  • Ubuntu 16.04
  • Python 3
  • PySpark
  • spark 2.4.4
  • Hadoop 2.7.3

【实验步骤】

  1. 在/home/hadoop路径下创建一个文本文件,并命名为123.txt(注意替换123为你学号的后三位)。并输入以下内容,注意替换其中汉字为你个人学号后三位。
你学号后三位,997,953,932,877,453
  1. 启动 hadoop,并把第一步创建的文本文件上传到 HDFS。

(1)启动 Hadoop

start-hdp.sh

(2)上传文本文件

hdfs dfs -mkdir -p /spark-exp4
hdfs dfs -put /home/hadoop/123.txt /spark-exp4/

(3)查看是否上传成功

hdfs dfs -ls /spark-exp4
  1. 启动 PySpark 或 PyCharm 进行开发。

  2. 使用文件创建RDD。(注意替换123为你学号的后三位)

r4 = sc.textFile("/home/hadoop/123.txt")
r4.flatMap(lambda line:line.split(',')).collect()
  1. 使用 HDFS 文件创建RDD。(注意替换123为你学号的后三位)
r5 = sc.textFile("hdfs://node0:8020/spark-exp4/123.txt")
r5.flatMap(lambda line:line.split(',')).collect()
  1. 基于parallelize创建
distData = sc.parallelize([1, 2, 3, 4, 5])
distData.getNumPartitions()
distData = sc.parallelize([1, 2, 3, 4, 5],4)
distData.getNumPartitions()

【实验名称】实验4.2:常见RDD算子练习

【实验目的】

  • 掌握常用的 RDD

【实验原理】

【实验环境】

  • Ubuntu 16.04
  • Python 3
  • PySpark
  • spark 2.4.4
  • Hadoop 2.7.3

【实验参考资料】

算子说明
collect返回RDD所有元素组成的列表
reduce类似 MapReduce 的 Reducer,执行归并操作
saveAsTextFile输出文本结果到文件目录
map类似 MapReduce的Mapper,处理每个元素,可以返回新的数据结构
flatMap会把所有返回的集合类型如list,dict,tuple全部遍历展平
foreach处理每个元素,无返回值
filter自定义过滤
distinct去重
take(n)取前N个
first取第一个
sortBy返回排序的数据,默认升序
top(n)返回降序的数据前N个元素
join内连接
leftOuterJoin左外连接
rightOuterJoin右外连接
union(RDD)合并
intersection(RDD)交集
subtract(RDD)减去交集
mapPartitions针对每一个分区做Map操作
mapPartitionsWithIndex针对每一个分区做Map操作,带分区参数
glom把每个分区数据列表作为元素,组成新列表
coalesce归并(reduce)分区数量n
reduceByKey(f)将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
groupByKey()在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD,(没有入参)
mapValues(f)对键值对中的每个值(value)应用一个函数,但不改变键(key)
flatMapValues(f)对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。
combineByKey( createCombiner, mergeValue, mergeCombiners, numPartitions=None)使用不同的返回类型合并具有相同键的值
join(otherDataset, numPartitions=None)按key值相同的记录进行连接
union(otherDataset,numPartitions=None)对源RDD和参数RDD求并集后返回一个新的RDD
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)使用给定的combine(组合)函数和一个中性的“零值”来聚合每个键的值
sortByKey(ascending=True, numPartitions=None)在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
join(other, numPartitions=None)在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
keys()获取所有key
values()获取所有value
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
collectAsMap与collect相关的算子是将结果返回到driver端。collectAsMap算子是将Kev-Value结构的RDD收集到driver端,并返回成一个字典
countByKey与count类似,但是是以key为单位进行统计。

【实验准备】

在Python开发环境默认只打印最后一行变量,如果需要把所有行的变量都打印,需要运行以下2行代码,每次打开环境只需要运行一次即可。

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

测试

sc.parallelize([1,2]).collect()
sc.parallelize([3,4]).collect()

【实验要求】

以下要求<补充代码>的部分,请各位同学补充相应的代码,并在实验报告内要记录以下题目的完整代码和输出。

(1)要求:使用 map 算子把以下 RDD 列表每个元素转为int类型,并且每个元素都加上1000,并输出结果。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.map(<补充代码>).collect()

(2)补充以下代码,使得输出符合要求。

rdd1 = sc.parallelize([1,2,3,4,5,6],3)
rdd2 = <补充代码>
rdd2.collect()

期望输出值

[[1, 2], [3, 4], [5, 6]]

(3)补充以下代码,使得输出每个分区元素累加总和。

rdd = sc.parallelize([1,2,3,4],2)
def f(iterator): yield sum(iterator)
rdd.<补充代码>.collect()

期望输出值

[3,7]

提示:可以查下关于分区的算子

(4)补充以下代码,使得输出每个分区编号和元素累加总和。

rdd = sc.parallelize([1,2,3,4],2)
def f<补充代码>
rdd.mapPartitionsWithIndex(f).collect()

期望输出值

[(0,[3]),(1,[7])]

提示:mapPartitionsWithIndex传进去的函数应该有2个参数。

(5)补充以下代码,输出个位数是7的元素。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>.collect()

提示:可以使用int()转换字符串为整形

(6)补充以下代码,使得重复的元素只保留1个进行输出。

sc.parallelize([1, 1, 1, 2, 3, 2, 3]).<补充代码>.collect()

提示:distinct 算子

(7)补充以下代码,求2个rdd的并集。

rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([5, 3, 4, 6])
<补充代码>

期望输出值

[1,2,3,4,5,6]

提示:union和distinct的组合可以产生并集的效果,要注意输出结果的排序。

(8)补充以下代码,求2个rdd的交集。

rdd1 = sc.parallelize([1, 4, 2, 3])
rdd2 = sc.parallelize([5, 3, 4, 6])
<补充代码>

期望输出值

[3,4]

(9)补充以下代码,对List(列表)进行降序排序。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
<补充代码>

(10)补充以下代码,对列表元素key进行降序排序。

rdd = sc.parallelize([(101,'Mike'),(134,'John'),(123,'Mary')])
<补充代码>

期望输出值

[(134,'John'),(123,'Mary'),(101,'Mike')]

(11)补充以下代码,按照元组的第三个元素进行降序排序。

rdd = sc.parallelize([(101,'Mike',23),(134,'John',45),(123,'Mary',18)])
<补充代码>

期望输出值

[(134, 'John', 45), (101, 'Mike', 23), (123, 'Mary', 18)]

(12)补充以下代码,把列表中奇数和偶数分别分为2个区。

rdd1 = sc.parallelize([1, 1, 2, 3, 5, 8])
rdd1.groupBy(<补充代码>).map(<补充代码>).collect()

期望输出值

[(0, [2, 8]), (1, [1, 1, 3, 5])]

(13)以下代码都是把RDD数据重新分为2个区,每句执行多次,对比他们三者的结果是否一样。如果不一致是什么原因?

sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).partitionBy(2).glom().collect()
sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).repartition(2).glom().collect()
sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).coalesce(2).glom().collect()

(14)补充以下代码,返回所有元素的和。

sc.parallelize([1, 2, 3, 4, 5]).reduce(<补充代码>)

(15)补充以下代码,返回所有key相同value的和。

sc.parallelize([(1, 2),(3, 4), (3, 5),(1, 4)]).<补充代码>

期望输出值:

[(1, 6), (3, 9)]

(16)补充以下代码,计算元素的个数。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>

(17)补充以下代码,取前3个最大值,并按降序排列。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>

(18)补充以下代码,取每个分区的最大值,并计算分区最大值的和。

rdd = sc.parallelize([1, 2, 3, 4, 5,6],2)
rdd.aggregate(<补充代码>)

(19)补充以下代码,把每个分区字符连接起来,分区与分区之间用空格隔开。

rdd = sc.parallelize(["H","e","l","l","o","F","r","a","n","k"],2)
rdd.aggregate(<补充代码>)

期望输出值

' Hello Frank'

(20)补充以下代码,对相同的key进行分组。

rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2), ("b", 4)])
rdd.<补充代码>.map(lambda x:<补充代码>)).collect()

期望输出值

[('b', [3, 4]), ('a', [1, 2])]

(21)补充以下代码,对value列表的每个元素的第一个字母转为大写。

rdd = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])])
def upperWordInList(list):
    r=[]
    for x in list:
        <补充把单词第一个字母转为大写代码>
    return r

rdd.<补充代码>.collect()

(22)补充以下代码,满足期望的输出值

rdd = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])])
rdd.<补充代码>.collect()

期望输出值:

[('fruites', 'apple'),
 ('fruites', 'banana'),
 ('fruites', 'lemon'),
 ('vegetables', 'tomato'),
 ('vegetables', 'cabbage')]

(23)补充以下代码,输出由key值组成的列表和value值组成的列表

rdd=sc.parallelize([(1, 2), (3, 4)])
rdd.<补充代码>.collect()
rdd.<补充代码>.collect()

期望输出值:

[1, 3]
[2, 4]

(24)补充以下代码,输出由key值对应value元素个数。

sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])]).flatMapValues(lambda x:x).<补充代码>

期望输出值:

defaultdict(int, {'fruites': 3, 'vegetables': 2})

(25)补充以下代码,输出由key值对应value总和

rdd = sc.parallelize([ ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)], 2)
rdd.aggregateByKey(<补充代码>).collect()

期望输出值:

[('mouse', 6), ('cat', 19), ('dog', 12)]

(26)补充以下代码,使用连接算子,使得代码输出符合期望值

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.<补充代码>.collect()
x.<补充代码>.collect()
y.<补充代码>.collect()
[('a', (1, 2)), ('a', (1, 3))]
[('b', (4, None)), ('a', (1, 2)), ('a', (1, 3))]
[('b', (None, 4)), ('a', (2, 1)), ('a', (3, 1))]

(27)补充以下代码,使得输出值为一个字典类型。(注意替换123为你学号的后三位)

rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.map(lambda x:(x,x)).<补充代码>

期望输出值:

{'123': '123',
 '453': '453',
 '877': '877',
 '932': '932',
 '953': '953',
 '997': '997'}

(28)补充以下代码,使得输出值符合期望值。

rdd = sc.parallelize([("a",1),("b",1),("a", 1)])
rdd.<补充代码>.keys()
rdd.<补充代码>.items()

期望输出值:

dict_keys(['a', 'b'])
dict_items([('a', 2), ('b', 1)])

【实验名称】 实验4.3:分析tomcat的访问日志,求访问量最高的两个网页

【实验目的】

掌握RDD的应用

【实验资源】

https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0

【实验原理】

使用 Spark 相应 API 进行开发实现

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm 或 Jupitor Notebook

【实验要求】

Tomcat的访问日志文件为spark-exp04-tomcat-log.txt,格式如下:

(1)求出访问量最高的两个网页。要求输出以下格式: 网页名称,访问量

(2)请自行开发代码实现(可以参考第二章的 WordCount 的开发例子),并在实验报告附上思考过程,开发步骤,代码和结果。


【实验名称】 实验4.4:使用Spark程序进行数据的清洗

【实验目的】

掌握RDD的应用

【实验资源】

https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0

【实验原理】

使用 Spark 相应 API 进行开发实现

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm 或 Jupitor Notebook

【实验要求】

数据源spark-exp04-user-click-log.txt文件记录了用户点击的日志记录,但日志中存在不合规范的数据。请用Spark程序进程数据清洗,完成以下操作:

(1)过滤不满足6个字段的数据

(2)过滤URL为空的数据

(3)请自行开发代码实现,并在实验报告附上思考过程,开发步骤,代码和结果。


【实验名称】 实验4.5:人口身高数据分析

【实验目的】

掌握RDD的应用

【实验资源】

https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0

【实验原理】

使用 Spark 相应 API 进行开发实现

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • 开发环境:PyCharm 或 Jupitor Notebook

【实验要求】

文件spark-exp04-sample-people-info-10w.txt是某个地区的人口性别还有身高数据。三列分别是 ID,性别(M表示男,F表示女),身高 (cm)。要求

(1)统计出男女人数。

(2)男性中的最高,最低身高和平均身高。

(3)女性中的最高,最低身高和平均身高。

(4)请自行开发代码实现,并在实验报告附上思考过程,开发步骤,代码和结果。