Spark 第四章 Spark RDD 实验手册
文章导航
【实验手册版本】
当前版本号v20200317
版本 | 修改说明 |
---|---|
v20200317 | 实验4.2,修正练习(8),应该是求交集 |
v20200310 | 初始化版本 |
实验4.1:RDD 的创建
【实验名称】
实验4.1:RDD 的创建
【实验目的】
- 掌握RDD的创建的方式
【实验原理】
略
【实验环境】
- Ubuntu 16.04
- Python 3
- PySpark
- spark 2.4.4
- Hadoop 2.7.3
【实验步骤】
- 在/home/hadoop路径下创建一个文本文件,并命名为
123.txt
(注意替换123为你学号的后三位)。并输入以下内容,注意替换其中汉字为你个人学号后三位。
你学号后三位,997,953,932,877,453
- 启动 hadoop,并把第一步创建的文本文件上传到 HDFS。
(1)启动 Hadoop
start-hdp.sh
(2)上传文本文件
hdfs dfs -mkdir -p /spark-exp4
hdfs dfs -put /home/hadoop/123.txt /spark-exp4/
(3)查看是否上传成功
hdfs dfs -ls /spark-exp4
启动 PySpark 或 PyCharm 进行开发。
使用文件创建RDD。(注意替换123为你学号的后三位)
r4 = sc.textFile("/home/hadoop/123.txt")
r4.flatMap(lambda line:line.split(',')).collect()
- 使用 HDFS 文件创建RDD。(注意替换123为你学号的后三位)
r5 = sc.textFile("hdfs://node0:8020/spark-exp4/123.txt")
r5.flatMap(lambda line:line.split(',')).collect()
- 基于parallelize创建
distData = sc.parallelize([1, 2, 3, 4, 5])
distData.getNumPartitions()
distData = sc.parallelize([1, 2, 3, 4, 5],4)
distData.getNumPartitions()
实验4.2:常见RDD算子练习
【实验名称】
实验4.2:常见RDD算子练习
【实验目的】
- 掌握常用的 RDD
【实验原理】
略
【实验环境】
- Ubuntu 16.04
- Python 3
- PySpark
- spark 2.4.4
- Hadoop 2.7.3
【实验参考资料】
算子 | 说明 |
---|---|
collect | 返回RDD所有元素组成的列表 |
reduce | 类似 MapReduce 的 Reducer,执行归并操作 |
saveAsTextFile | 输出文本结果到文件目录 |
map | 类似 MapReduce的Mapper,处理每个元素,可以返回新的数据结构 |
flatMap | 会把所有返回的集合类型如list,dict,tuple全部遍历展平 |
foreach | 处理每个元素,无返回值 |
filter | 自定义过滤 |
distinct | 去重 |
take(n) | 取前N个 |
first | 取第一个 |
sortBy | 返回排序的数据,默认升序 |
top(n) | 返回降序的数据前N个元素 |
join | 内连接 |
leftOuterJoin | 左外连接 |
rightOuterJoin | 右外连接 |
union(RDD) | 合并 |
intersection(RDD) | 交集 |
subtract(RDD) | 减去交集 |
mapPartitions | 针对每一个分区做Map操作 |
mapPartitionsWithIndex | 针对每一个分区做Map操作,带分区参数 |
glom | 把每个分区数据列表作为元素,组成新列表 |
coalesce | 归并(reduce)分区数量n |
reduceByKey(f) | 将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
groupByKey() | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD,(没有入参) |
mapValues(f) | 对键值对中的每个值(value)应用一个函数,但不改变键(key) |
flatMapValues(f) | 对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。 |
combineByKey( createCombiner, mergeValue, mergeCombiners, numPartitions=None) | 使用不同的返回类型合并具有相同键的值 |
join(otherDataset, numPartitions=None) | 按key值相同的记录进行连接 |
union(otherDataset,numPartitions=None) | 对源RDD和参数RDD求并集后返回一个新的RDD |
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None) | 使用给定的combine(组合)函数和一个中性的“零值”来聚合每个键的值 |
sortByKey(ascending=True, numPartitions=None) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
join(other, numPartitions=None) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
keys() | 获取所有key |
values() | 获取所有value |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
collectAsMap | 与collect相关的算子是将结果返回到driver端。collectAsMap算子是将Kev-Value结构的RDD收集到driver端,并返回成一个字典 |
countByKey | 与count类似,但是是以key为单位进行统计。 |
【实验准备】
在Python开发环境默认只打印最后一行变量,如果需要把所有行的变量都打印,需要运行以下2行代码,每次打开环境只需要运行一次即可。
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
测试
sc.parallelize([1,2]).collect()
sc.parallelize([3,4]).collect()
【实验要求】
以下要求<补充代码>的部分,请各位同学补充相应的代码,并在实验报告内要记录以下题目的完整代码和输出。
(1)要求:使用 map 算子把以下 RDD 列表每个元素转为int类型,并且每个元素都加上1000,并输出结果。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.map(<补充代码>).collect()
(2)补充以下代码,使得输出符合要求。
rdd1 = sc.parallelize([1,2,3,4,5,6],3)
rdd2 = <补充代码>
rdd2.collect()
期望输出值
[[1, 2], [3, 4], [5, 6]]
(3)补充以下代码,使得输出每个分区元素累加总和。
rdd = sc.parallelize([1,2,3,4],2)
def f(iterator): yield sum(iterator)
rdd.<补充代码>.collect()
期望输出值
[3,7]
提示:可以查下关于分区的算子
(4)补充以下代码,使得输出每个分区编号和元素累加总和。
rdd = sc.parallelize([1,2,3,4],2)
def f<补充代码>
rdd.mapPartitionsWithIndex(f).collect()
期望输出值
[(0,[3]),(1,[7])]
提示:mapPartitionsWithIndex传进去的函数应该有2个参数。
(5)补充以下代码,输出个位数是7的元素。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>.collect()
提示:可以使用int()转换字符串为整形
(6)补充以下代码,使得重复的元素只保留1个进行输出。
sc.parallelize([1, 1, 1, 2, 3, 2, 3]).<补充代码>.collect()
提示:distinct 算子
(7)补充以下代码,求2个rdd的并集。
rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([5, 3, 4, 6])
<补充代码>
期望输出值
[1,2,3,4,5,6]
提示:union和distinct的组合可以产生并集的效果,要注意输出结果的排序。
(8)补充以下代码,求2个rdd的交集。
rdd1 = sc.parallelize([1, 4, 2, 3])
rdd2 = sc.parallelize([5, 3, 4, 6])
<补充代码>
期望输出值
[3,4]
(9)补充以下代码,对List(列表)进行降序排序。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
<补充代码>
(10)补充以下代码,对列表元素key进行降序排序。
rdd = sc.parallelize([(101,'Mike'),(134,'John'),(123,'Mary')])
<补充代码>
期望输出值
[(134,'John'),(123,'Mary'),(101,'Mike')]
(11)补充以下代码,按照元组的第三个元素进行降序排序。
rdd = sc.parallelize([(101,'Mike',23),(134,'John',45),(123,'Mary',18)])
<补充代码>
期望输出值
[(134, 'John', 45), (101, 'Mike', 23), (123, 'Mary', 18)]
(12)补充以下代码,把列表中奇数和偶数分别分为2个区。
rdd1 = sc.parallelize([1, 1, 2, 3, 5, 8])
rdd1.groupBy(<补充代码>).map(<补充代码>).collect()
期望输出值
[(0, [2, 8]), (1, [1, 1, 3, 5])]
(13)以下代码都是把RDD数据重新分为2个区,每句执行多次,对比他们三者的结果是否一样。如果不一致是什么原因?
sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).partitionBy(2).glom().collect()
sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).repartition(2).glom().collect()
sc.parallelize([(1, 2), (2, 3), (3,4),(4,5),(5,6),(6,7)],3).coalesce(2).glom().collect()
(14)补充以下代码,返回所有元素的和。
sc.parallelize([1, 2, 3, 4, 5]).reduce(<补充代码>)
(15)补充以下代码,返回所有key相同value的和。
sc.parallelize([(1, 2),(3, 4), (3, 5),(1, 4)]).<补充代码>
期望输出值:
[(1, 6), (3, 9)]
(16)补充以下代码,计算元素的个数。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>
(17)补充以下代码,取前3个最大值,并按降序排列。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.<补充代码>
(18)补充以下代码,取每个分区的最大值,并计算分区最大值的和。
rdd = sc.parallelize([1, 2, 3, 4, 5,6],2)
rdd.aggregate(<补充代码>)
(19)补充以下代码,把每个分区字符连接起来,分区与分区之间用空格隔开。
rdd = sc.parallelize(["H","e","l","l","o","F","r","a","n","k"],2)
rdd.aggregate(<补充代码>)
期望输出值
' Hello Frank'
(20)补充以下代码,对相同的key进行分组。
rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2), ("b", 4)])
rdd.<补充代码>.map(lambda x:<补充代码>)).collect()
期望输出值
[('b', [3, 4]), ('a', [1, 2])]
(21)补充以下代码,对value列表的每个元素的第一个字母转为大写。
rdd = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])])
def upperWordInList(list):
r=[]
for x in list:
<补充把单词第一个字母转为大写代码>
return r
rdd.<补充代码>.collect()
(22)补充以下代码,满足期望的输出值
rdd = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])])
rdd.<补充代码>.collect()
期望输出值:
[('fruites', 'apple'),
('fruites', 'banana'),
('fruites', 'lemon'),
('vegetables', 'tomato'),
('vegetables', 'cabbage')]
(23)补充以下代码,输出由key值组成的列表和value值组成的列表
rdd=sc.parallelize([(1, 2), (3, 4)])
rdd.<补充代码>.collect()
rdd.<补充代码>.collect()
期望输出值:
[1, 3]
[2, 4]
(24)补充以下代码,输出由key值对应value元素个数。
sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])]).flatMapValues(lambda x:x).<补充代码>
期望输出值:
defaultdict(int, {'fruites': 3, 'vegetables': 2})
(25)补充以下代码,输出由key值对应value总和
rdd = sc.parallelize([ ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)], 2)
rdd.aggregateByKey(<补充代码>).collect()
期望输出值:
[('mouse', 6), ('cat', 19), ('dog', 12)]
(26)补充以下代码,使用连接算子,使得代码输出符合期望值
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.<补充代码>.collect()
x.<补充代码>.collect()
y.<补充代码>.collect()
[('a', (1, 2)), ('a', (1, 3))]
[('b', (4, None)), ('a', (1, 2)), ('a', (1, 3))]
[('b', (None, 4)), ('a', (2, 1)), ('a', (3, 1))]
(27)补充以下代码,使得输出值为一个字典类型。(注意替换123为你学号的后三位)
rdd = sc.textFile("/home/hadoop/123.txt").flatMap(lambda line:line.split(','))
rdd.map(lambda x:(x,x)).<补充代码>
期望输出值:
{'123': '123',
'453': '453',
'877': '877',
'932': '932',
'953': '953',
'997': '997'}
(28)补充以下代码,使得输出值符合期望值。
rdd = sc.parallelize([("a",1),("b",1),("a", 1)])
rdd.<补充代码>.keys()
rdd.<补充代码>.items()
期望输出值:
dict_keys(['a', 'b'])
dict_items([('a', 2), ('b', 1)])
实验4.3:分析tomcat的访问日志,求访问量最高的两个网页
【实验名称】 实验4.3:分析tomcat的访问日志,求访问量最高的两个网页
【实验目的】
掌握RDD的应用
【实验资源】
https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0
【实验原理】
使用 Spark 相应 API 进行开发实现
【实验环境】
- 操作系统:Ubuntu 16.04
- Spark:Spark2.x
- 开发环境:PyCharm 或 Jupitor Notebook
【实验要求】
Tomcat的访问日志文件为spark-exp04-tomcat-log.txt
,格式如下:
(1)求出访问量最高的两个网页。要求输出以下格式: 网页名称,访问量
(2)请自行开发代码实现(可以参考第二章的 WordCount 的开发例子),并在实验报告附上思考过程,开发步骤,代码和结果。
实验4.4:使用Spark程序进行数据的清洗
【实验名称】 实验4.4:使用Spark程序进行数据的清洗
【实验目的】
掌握RDD的应用
【实验资源】
https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0
【实验原理】
使用 Spark 相应 API 进行开发实现
【实验环境】
- 操作系统:Ubuntu 16.04
- Spark:Spark2.x
- 开发环境:PyCharm 或 Jupitor Notebook
【实验要求】
数据源spark-exp04-user-click-log.txt
文件记录了用户点击的日志记录,但日志中存在不合规范的数据。请用Spark程序进程数据清洗,完成以下操作:
(1)过滤不满足6个字段的数据
(2)过滤URL为空的数据
(3)请自行开发代码实现,并在实验报告附上思考过程,开发步骤,代码和结果。
实验4.5:人口身高数据分析
【实验名称】 实验4.5:人口身高数据分析
【实验目的】
掌握RDD的应用
【实验资源】
https://pan.baidu.com/s/1nIbyAa19D-MlutP3DIh6bA#提取码dzi0
【实验原理】
使用 Spark 相应 API 进行开发实现
【实验环境】
- 操作系统:Ubuntu 16.04
- Spark:Spark2.x
- 开发环境:PyCharm 或 Jupitor Notebook
【实验要求】
文件spark-exp04-sample-people-info-10w.txt
是某个地区的人口性别还有身高数据。三列分别是 ID,性别(M表示男,F表示女),身高 (cm)。要求
(1)统计出男女人数。
(2)男性中的最高,最低身高和平均身高。
(3)女性中的最高,最低身高和平均身高。
(4)请自行开发代码实现,并在实验报告附上思考过程,开发步骤,代码和结果。