Hadoop Part 6 - 编写 MapReduce 程序

文章导航

«返回课程汇总页面

【版本】

当前版本号v20210411

版本修改说明
v20210411初始化版本

实验6.1 - 编写 MapReduce wordcount 程序

【实验名称】

实验6.1 - 编写 MapReduce wordcount 程序

【实验目的】

【实验环境】

【实验资源】

下载链接:https://pan.baidu.com/s/1ghde86wcK6pwg1fdSSWg0w
提取码:v3wv

【实验步骤】

  1. 打开 Part 4 的 hadoopexp 项目。

  2. 在项目hadoop-exp\src\main\java下创建一个名为hadoop+你学号后4位.mapreduce.wc的包。注意替换为你的学号后4位。

  3. 在包下面新建一个 WordCountMapper 的类。代码如下,注意替换为你的学号后4位。

package hadoop你的学号后4位.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key1,Text v1,Context context)
            throws IOException,InterruptedException{
            String data=v1.toString();
            // 分词
            String[] words=data.split(" ");
            // 输出 k2 v2
            for(String w:words){
                context.write(new Text(w),new IntWritable(1));
            }
    }
}
  1. 在包下面新建一个 WordCountReducer 的类。代码如下,注意替换为你的学号后4位。
package hadoop你的学号后4位.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
		//context 是 reduce 的上下文		
        // 对 v3 求和
		int total = 0;
		for(IntWritable v:v3){
		    total += v.get();
		}
		// 输出: k4 单词 v4 频率
		context.write(k3, new IntWritable(total));
	}
}
  1. 在包下面新建一个 WordCountMain 的类。代码如下,注意替换为你的学号后4位。
package hadoop你的学号后4位.mapreduce.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class WordCountMain {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int argLength=otherArgs.length;
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        //获取一个作业实例,名称为Word Count
        Job job=Job.getInstance(conf,"Word Count");
        //设置运行的jar包里的类
        job.setJarByClass(WordCountMain.class);
        //设置Mapper
        job.setMapperClass(WordCountMapper.class);
        //设置Reducer
        job.setReducerClass(WordCountReducer.class);
        //设置Combiner
        job.setCombinerClass(WordCountReducer.class);
        //设置k4类型
        job.setOutputKeyClass(Text.class);
        //设置v4类型
        job.setOutputValueClass(IntWritable.class);
        //可以接受多个参数作为输入文件路径
        for(int i=0;i<argLength-1;i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //最后的一个参数是结果输出路径
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[argLength-1]));
        //如果作业运行成功就成功退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

  1. 修改项目的pom.xml。其中<mainClass>标签内的类修改为WordCountMain类的包内路径。参考代码如下,注意替换为你的学号后4位。
<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <archive>
                        <!--<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>-->
                        <manifest>
                            <!-- main()所在的类,注意修改 -->
                            <mainClass>hadoop你的学号后4位.mapreduce.wc.WordCountMain</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
  1. 在 IDEA 的左下角找到 Terminal,输入以下命令,使用 Maven 对代码进行打包。
mvn compile package -Dmaven.test.skip=true
  1. 生成的 jar 包会在项目的 target 目录下。名称为hadoop-exp-1.0.jar

  2. hadoop-exp-1.0.jarcountryroad.txt都上传到NodeA/home/hadoop目录下。

  3. NodeA启动Hadoop.

start-hdp.sh
  1. NodeA本地的countryroad.txt文件上传到 HDFS 的根目录/
hdfs dfs -put /home/hadoop/countryroad.txt /
  1. NodeA上执行以下命令,开始运行我们编写的 Wordcount 程序。
hadoop jar /home/hadoop/hadoop-exp-1.0.jar /countryroad.txt /output6
  1. 运行成功以后,查看结果。
hdfs dfs -cat /output6/part-r-00000

实验6.2 - 编写 MapReduce 程序统计数据

【实验名称】

实验6.2 - 编写 MapReduce 程序统计数据

【实验目的】

【实验环境】

【实验资源】

下载链接:https://pan.baidu.com/s/1ghde86wcK6pwg1fdSSWg0w
提取码:v3wv

【实验要求】

【实验步骤】

  1. 打开 Part 4 的 hadoopexp 项目。

  2. 在项目hadoop-exp\src\main\java下创建一个名为hadoop+你学号后4位.mapreduce.sales的包。注意替换为你的学号后4位。

  3. 在包下面新建一个 SalesMapper 的类。下面是部分参考代码,请完成关键的编码部分,注意替换为你的学号后4位。

package hadoop你的学号后4位.mapreduce.sales;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SalesMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
    @Override
    protected void map(LongWritable key1,Text v1,Context context)
            throws IOException,InterruptedException{
            String data=v1.toString();
            // 分词
            String[] words=data.split(",");
            // 输出 k2 v2
            for(String w:words){
                此处关键代码请自己完成
            }
    }
}
  1. 在包下面新建一个 SalesReducer 的类。代码如下,注意替换为你的学号后4位。
package hadoop你的学号后4位.mapreduce.sales;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SalesReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    @Override
    protected void reduce(Text k3, Iterable<DoubleWritable> v3,Context context) throws IOException, InterruptedException {
		此处关键代码请自己完成
	}
}
  1. 在包下面新建一个 SalesMain 的类。代码如下,注意替换为你的学号后4位。
package hadoop你的学号后4位.mapreduce.sales;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class SalesMain {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int argLength=otherArgs.length;
        if (otherArgs.length < 2) {
            System.err.println("Usage: xxx.jar <in> [<in>...] <out>");
            System.exit(2);
        }
        //获取一个作业实例
        Job job=Job.getInstance(conf,"Sales Statistic");
        //设置运行的jar包里的类
        job.setJarByClass(SalesMain.class);
        //设置Mapper
        job.setMapperClass(SalesMapper.class);
        //设置Reducer
        job.setReducerClass(SalesReducer.class);
        //设置Combiner
        job.setCombinerClass(SalesReducer.class);
        //设置k4类型
        job.setOutputKeyClass(Text.class);
        //设置v4类型
        job.setOutputValueClass(DoubleWritable.class);
        //可以接受多个参数作为输入文件路径
        for(int i=0;i<argLength-1;i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //最后的一个参数是结果输出路径
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[argLength-1]));
        //如果作业运行成功就成功退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
  1. 修改项目的pom.xml。其中<mainClass>标签内的类修改为SalesMain类的包内路径。参考代码如下,注意替换为你的学号后4位。
<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <archive>
                        <!--<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>-->
                        <manifest>
                            <!-- main()所在的类,注意修改 -->
                            <mainClass>hadoop你的学号后4位.mapreduce.sales.SalesMain</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
  1. 参考实验6.1 打包、上传包、运行和查看结果的步骤。参考结果如下:
1998	1.6858740465058884E8
1999	1.555396336197083E8
2000	1.6635854633980626E8
2001	1.969552338600405E8

##【常见问题】