第六章 Spark Streaming 实验手册

2020-04-19
5分钟阅读时长

【版本】

当前版本号v20200424

版本修改说明
v20200424新增修改日志级别的步骤
v20200419初始化版本

实验6.1 :运行网络版的WordCount

【实验名称】 实验6.1 :运行网络版的WordCount

【实验目的】

  • 了解NetCat工具的使用。
  • 初步了解Streaming运行的方式。

【实验原理】

运行Spark自带的StreamingWordCount程序,

【实验环境】

  • 操作系统:Ubuntu 16.04 (确保机器cpu核数大于接收器的数量,或Local模式线程数大于接收器数量)
  • Spark:Spark2.x
  • Pyspark

【实验步骤】

一、基于netcat的聊天室

  1. 启动 NetCat 服务端,并在1234端口监听
nc -lk  1234
  1. 使用xshell 打开一个新的选项卡,连接虚拟机。启动NetCat客户端,并连接Netcat服务端
nc localhost  1234

注意:如果客户端和服务端不在同一台机器,localhost 可以换成实际IP。

  1. 在服务端输入以下字符串,并按回车,可以在客户端收到消息,并打印出来。这里注意替换学号为你个人学号。
hello 你的学号
  1. 在客户端输入字符串,并按回车,可以在服务端收到消息,并打印出来。这里注意替换学号为你个人学号。
你好  你的学号
  1. 在 NetCat 客户端的选项卡使用ctrl+c终止客户端进程。

  2. 修改$SPARK_HOME/conf下的 log4j.properties 下的日志级别,把INFO修改为WARN。这一步主要是为了提升日志级别,减少 WARN 级别以下例如 INFO,DEBUG 级别的日志输出,对结果造成干扰。

#修改这一句
log4j.rootCategory=INFO, console

#修改为
log4j.rootCategory=WARN, console
  1. 启动 Spark NetworkWordCount 例子和服务端建立连接,并统计单词数量。
cd $SPARK_HOME
bin/run-example streaming.NetworkWordCount localhost 1234
  1. 在 NetCat 服务端输入以下字符串,并按回车,观察Streaming WordCount的输出,并截图。这里注意替换学号为你个人学号。
You jump I jump 你的学号

实验6.2 :开发自己的StreamingWordCount,且支持统计历史数据

【实验名称】 实验6.2 :开发自己的StreamingWordCount,且支持统计历史数据

【实验目的】

  • 了解NetCat工具的使用。
  • 掌握 Spark Streaming 的开发流程。

【实验原理】

开发自己的StreamingWordCount 程序,用于统计所有流数据的单词数量。

【实验环境】

  • 操作系统:Ubuntu 16.04 (确保机器cpu核数大于接收器的数量,或Local模式线程数大于接收器数量)
  • Spark:Spark2.x
  • Pyspark

【实验要求】

  1. 开发一个 Spark Streaming WordCount 程序,满足以下要求
  • (1)自己编写一个 Spark Streaming 的 WordCount 程序,保存为StreamingWordCount123.py。(请替换123为你学号后3位)
  • (2)程序需要使用到 transform() 和 updateStateByKey() 算子。
  • (3)程序需要对NetCat 服务端发送历史的单词出现的数量一起累计。
  • (4)使用 spark-submit 运行你的程序。
  1. 启动 NetCat 服务端,并在1234端口监听。
nc -lk  1234
  1. 使用 NetCat 服务端分别发送以下内容。这里注意替换学号为你个人学号。
Knock 你的学号
Knock Knock 你的学号
Knock Knock  Knock 你的学号

实验6.3:流式日志过滤与分析

【实验名称】 实验6.3:流式日志过滤与分析

【实验目的】

掌握Spark Steaming的应用

【实验原理】

【实验环境】

  • 操作系统:Ubuntu 16.04
  • Spark:Spark2.x
  • Pyspark

【实验资源】

请从以下链接下载日志文件。该日志包含日志级别函数名日志内容三个字段,字段之间以"\t"拆分。

https://pan.baidu.com/s/1Vh1w-70x_MpkV_Ycu_54PA#提取码38zq

【实验步骤】

  1. 启动 HDFS,创建 HDFS 目录/sst-123。请替换123为你的学号后3位。

  2. 使用 MySQL 的 root 用户创建数据库 spark。并创建表log123。请替换123为你的学号后3位。

用户名:root
密码:Mysql0668
create table `log123`(
lvl varchar(15) comment '等级',
method varchar(50) comment '方法',
content varchar(200) comment '内容'
);
  1. 为了让程序可以调用 MySQL 的JDBC驱动,把驱动文件放入$SPARK_HOME/jars
cp /opt/hive/lib/mysql-connector-java-5.1.48.jar $SPARK_HOME/jars/
  1. 如果配置了Jupytor Notebook,要把这2个环境变量注释掉,并重启虚拟机。
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=*'
  1. 启动 Spark Standalone 模式。

  2. 编写程序Log2DB.py,完成以下要求。

  • (1)采用StreamingContext.textFileStream()算子监听hdfs://node0:8020/sst-123目录。
  • (2)把 DStream 转换为 RDD。
  • (3)读入日志信息流,将 RDD 转为 DataFrame。
  • (4)DataFrame 注册为临时表。
  • (5)使用 SQL 过滤出级别为 error 或 warn 的日志。
  • (6)将过滤后的数据保存到 MySQL 的 spark.log123表(请替换123为你的学号后3位)。

提示代码(注意修改为你的学号后3位):

from pyspark.streaming import StreamingContext  
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc =SparkContext('spark://node0:7077','ss-123')
spark =SparkSession(sc)
ssc = StreamingContext(sc, 1)
ds1 = ssc.textFileStream(<补充代码>)
def save2DB(fileRDD):
    <补充代码>
    df2.write.jdbc('jdbc:mysql://localhost:3306/spark?useSSL=false&user=root&password=Mysql0668',table='log123',mode='append',properties={'driver':'com.mysql.jdbc.Driver'})

ds1.foreachRDD(lambda fileRDD:save2DB(fileRDD))
ssc.start();ssc.awaitTermination()
  1. 使用 spark-submit 运行Log2DB.py。
spark-submit --master spark://node0:7077 ./Log2DB.py
  1. 把2个日志文件依次上传hdfs://node0:8020/sst-123目录下。

  2. 观察 MySQL 的spark.log123表记录是否有更新。

【常见问题解答FAQ】

问题1:上传文件到 HDFS 遇到org.apache.hadoop.mapreduce.lib.input.InvalidInputException:Input path does not exist: hdfs://node0:8020/xxx.txt.COPYING之类的问题

答:可以先把文件上传到 Spark Streaming 非监听目录,例如 HDFS 根目录。再使用hdfs dfs -mv命令,把该文件移动到监听目录。因为直接上传文件会在 HDFS 上生成一个临时文件,后缀是.COPYING,Spark Streaming 程序监听到该临时文件时,文件可能会因为复制完成被删除。导致文件找不到出错。但是hdfs dfs -mv命令是一个事务性的操作,不会产生临时文件。