【版本】
当前版本号v20200424
版本 | 修改说明 |
---|---|
v20200424 | 新增修改日志级别的步骤 |
v20200419 | 初始化版本 |
实验6.1 :运行网络版的WordCount
【实验名称】 实验6.1 :运行网络版的WordCount
【实验目的】
- 了解NetCat工具的使用。
- 初步了解Streaming运行的方式。
【实验原理】
运行Spark自带的StreamingWordCount程序,
【实验环境】
- 操作系统:Ubuntu 16.04 (确保机器cpu核数大于接收器的数量,或Local模式线程数大于接收器数量)
- Spark:Spark2.x
- Pyspark
【实验步骤】
一、基于netcat的聊天室
- 启动 NetCat 服务端,并在1234端口监听
nc -lk 1234
- 使用xshell 打开一个新的选项卡,连接虚拟机。启动NetCat客户端,并连接Netcat服务端
nc localhost 1234
注意:如果客户端和服务端不在同一台机器,localhost 可以换成实际IP。
- 在服务端输入以下字符串,并按回车,可以在客户端收到消息,并打印出来。这里注意替换学号为你个人学号。
hello 你的学号
- 在客户端输入字符串,并按回车,可以在服务端收到消息,并打印出来。这里注意替换学号为你个人学号。
你好 你的学号
在 NetCat 客户端的选项卡使用
ctrl+c
终止客户端进程。修改
$SPARK_HOME/conf
下的 log4j.properties 下的日志级别,把INFO
修改为WARN
。这一步主要是为了提升日志级别,减少 WARN 级别以下例如 INFO,DEBUG 级别的日志输出,对结果造成干扰。
#修改这一句
log4j.rootCategory=INFO, console
#修改为
log4j.rootCategory=WARN, console
- 启动 Spark NetworkWordCount 例子和服务端建立连接,并统计单词数量。
cd $SPARK_HOME
bin/run-example streaming.NetworkWordCount localhost 1234
- 在 NetCat 服务端输入以下字符串,并按回车,观察Streaming WordCount的输出,并截图。这里注意替换学号为你个人学号。
You jump I jump 你的学号
实验6.2 :开发自己的StreamingWordCount,且支持统计历史数据
【实验名称】 实验6.2 :开发自己的StreamingWordCount,且支持统计历史数据
【实验目的】
- 了解NetCat工具的使用。
- 掌握 Spark Streaming 的开发流程。
【实验原理】
开发自己的StreamingWordCount 程序,用于统计所有流数据的单词数量。
【实验环境】
- 操作系统:Ubuntu 16.04 (确保机器cpu核数大于接收器的数量,或Local模式线程数大于接收器数量)
- Spark:Spark2.x
- Pyspark
【实验要求】
- 开发一个 Spark Streaming WordCount 程序,满足以下要求
- (1)自己编写一个 Spark Streaming 的 WordCount 程序,保存为
StreamingWordCount123.py
。(请替换123为你学号后3位) - (2)程序需要使用到 transform() 和 updateStateByKey() 算子。
- (3)程序需要对NetCat 服务端发送历史的单词出现的数量一起累计。
- (4)使用
spark-submit
运行你的程序。
- 启动 NetCat 服务端,并在1234端口监听。
nc -lk 1234
- 使用 NetCat 服务端分别发送以下内容。这里注意替换学号为你个人学号。
Knock 你的学号
Knock Knock 你的学号
Knock Knock Knock 你的学号
实验6.3:流式日志过滤与分析
【实验名称】 实验6.3:流式日志过滤与分析
【实验目的】
掌握Spark Steaming的应用
【实验原理】
【实验环境】
- 操作系统:Ubuntu 16.04
- Spark:Spark2.x
- Pyspark
【实验资源】
请从以下链接下载日志文件。该日志包含日志级别
、函数名
、日志内容
三个字段,字段之间以"\t"拆分。
https://pan.baidu.com/s/1Vh1w-70x_MpkV_Ycu_54PA#提取码38zq
【实验步骤】
启动 HDFS,创建 HDFS 目录
/sst-123
。请替换123为你的学号后3位。使用 MySQL 的 root 用户创建数据库 spark。并创建表
log123
。请替换123为你的学号后3位。
用户名:root
密码:Mysql0668
create table `log123`(
lvl varchar(15) comment '等级',
method varchar(50) comment '方法',
content varchar(200) comment '内容'
);
- 为了让程序可以调用 MySQL 的JDBC驱动,把驱动文件放入
$SPARK_HOME/jars
。
cp /opt/hive/lib/mysql-connector-java-5.1.48.jar $SPARK_HOME/jars/
- 如果配置了Jupytor Notebook,要把这2个环境变量注释掉,并重启虚拟机。
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=*'
启动 Spark Standalone 模式。
编写程序Log2DB.py,完成以下要求。
- (1)采用StreamingContext.textFileStream()算子监听
hdfs://node0:8020/sst-123
目录。 - (2)把 DStream 转换为 RDD。
- (3)读入日志信息流,将 RDD 转为 DataFrame。
- (4)DataFrame 注册为临时表。
- (5)使用 SQL 过滤出级别为 error 或 warn 的日志。
- (6)将过滤后的数据保存到 MySQL 的 spark.log123表(请替换123为你的学号后3位)。
提示代码(注意修改为你的学号后3位):
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc =SparkContext('spark://node0:7077','ss-123')
spark =SparkSession(sc)
ssc = StreamingContext(sc, 1)
ds1 = ssc.textFileStream(<补充代码>)
def save2DB(fileRDD):
<补充代码>
df2.write.jdbc('jdbc:mysql://localhost:3306/spark?useSSL=false&user=root&password=Mysql0668',table='log123',mode='append',properties={'driver':'com.mysql.jdbc.Driver'})
ds1.foreachRDD(lambda fileRDD:save2DB(fileRDD))
ssc.start();ssc.awaitTermination()
- 使用 spark-submit 运行Log2DB.py。
spark-submit --master spark://node0:7077 ./Log2DB.py
把2个日志文件依次上传
hdfs://node0:8020/sst-123
目录下。观察 MySQL 的spark.log123表记录是否有更新。
【常见问题解答FAQ】
问题1:上传文件到 HDFS 遇到org.apache.hadoop.mapreduce.lib.input.InvalidInputException:Input path does not exist: hdfs://node0:8020/xxx.txt.COPYING
之类的问题
答:可以先把文件上传到 Spark Streaming 非监听目录,例如 HDFS 根目录。再使用hdfs dfs -mv
命令,把该文件移动到监听目录。因为直接上传文件会在 HDFS 上生成一个临时文件,后缀是.COPYING,Spark Streaming 程序监听到该临时文件时,文件可能会因为复制完成被删除。导致文件找不到出错。但是hdfs dfs -mv
命令是一个事务性的操作,不会产生临时文件。