博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop学习之路(十九)MapReduce框架排序
阅读量:6473 次
发布时间:2019-06-23

本文共 12358 字,大约阅读时间需要 41 分钟。

流量统计项目案例

样本示例

需求

1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量

2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序

3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中

第一题

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** *    第一题:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量 */public class FlowSumMR {    public static void main(String[] args) throws Exception {                        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "FlowSumMR");        job.setJarByClass(FlowSumMR.class);                job.setMapperClass(FlowSumMRMapper.class);        job.setReducerClass(FlowSumMRReducer.class);                        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                        FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/input/"));        FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sum"));                        boolean isDone = job.waitForCompletion(true);        System.exit(isDone ? 0 : 1);    }        public static class FlowSumMRMapper extends Mapper
{ /** * value = 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 * iface.qiyi.com 视频网站 15 12 1527 2106 200 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String outkey = split[1]; String outValue = split[8] + "\t" + split[9]; context.write(new Text(outkey), new Text(outValue)); } } public static class FlowSumMRReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; for(Text t : values){ String[] split = t.toString().split("\t"); int upTempFlow = Integer.parseInt(split[0]); int downTempFlow = Integer.parseInt(split[1]); upFlow+=upTempFlow; downFlow += downTempFlow; } sumFlow = upFlow + downFlow; context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow)); } }}

第二题

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import comg.ghgj.mr.pojo.FlowBean;/** * 需求: 第二个题目,就是对第一个题目的结果数据,进行按照总流量倒叙排序 *  *  */public class FlowSortMR {    public static void main(String[] args) throws Exception {                Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "FlowSumMR");        job.setJarByClass(FlowSortMR.class);                job.setMapperClass(FlowSortMRMapper.class);        job.setReducerClass(FlowSortMRReducer.class);                job.setOutputKeyClass(FlowBean.class);        job.setOutputValueClass(NullWritable.class);                        FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/output_sum"));        FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sort_777"));                        boolean isDone = job.waitForCompletion(true);        System.exit(isDone ? 0 : 1);            }        public static class FlowSortMRMapper extends Mapper
{ /** * value = 13602846565 26860680 40332600 67193280 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); FlowBean fb = new FlowBean(split[0], Long.parseLong(split[1]), Long.parseLong(split[2])); context.write(fb, NullWritable.get()); } } public static class FlowSortMRReducer extends Reducer
{ @Override protected void reduce(FlowBean key, Iterable
values, Context context) throws IOException, InterruptedException { for(NullWritable nvl : values){ context.write(key, nvl); } } }}

FlowBean.java

1 import java.io.DataInput;  2 import java.io.DataOutput;  3 import java.io.IOException;  4   5 import org.apache.hadoop.io.WritableComparable;  6   7 /**  8  * 第一,定义好属性  9  * 第二,定义好属性的getter 和 setter方法 10  * 第三,定义好构造方法(有参,无参) 11  * 第四:定义好toString(); 12  *  13  *  14  * 详细解释: 15  *  16  * 如果一个自定义对象要作为key 必须要实现 WritableComparable 接口, 而不能实现 Writable, Comparable 17  *  18  * 如果一个自定义对象要作为value,那么只需要实现Writable接口即可 19  */ 20 public class FlowBean implements WritableComparable
{ 21 //public class FlowBean implements Comparable
{
22 23 private String phone; 24 private long upFlow; 25 private long downFlow; 26 private long sumFlow; 27 public String getPhone() { 28 return phone; 29 } 30 public void setPhone(String phone) { 31 this.phone = phone; 32 } 33 public long getUpFlow() { 34 return upFlow; 35 } 36 public void setUpFlow(long upFlow) { 37 this.upFlow = upFlow; 38 } 39 public long getDownFlow() { 40 return downFlow; 41 } 42 public void setDownFlow(long downFlow) { 43 this.downFlow = downFlow; 44 } 45 public long getSumFlow() { 46 return sumFlow; 47 } 48 public void setSumFlow(long sumFlow) { 49 this.sumFlow = sumFlow; 50 } 51 public FlowBean(String phone, long upFlow, long downFlow, long sumFlow) { 52 super(); 53 this.phone = phone; 54 this.upFlow = upFlow; 55 this.downFlow = downFlow; 56 this.sumFlow = sumFlow; 57 } 58 public FlowBean(String phone, long upFlow, long downFlow) { 59 super(); 60 this.phone = phone; 61 this.upFlow = upFlow; 62 this.downFlow = downFlow; 63 this.sumFlow = upFlow + downFlow; 64 } 65 public FlowBean() { 66 super(); 67 // TODO Auto-generated constructor stub 68 } 69 @Override 70 public String toString() { 71 return phone + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow; 72 } 73 74 75 76 77 /** 78 * 把当前这个对象 --- 谁掉用这个write方法,谁就是当前对象 79 * 80 * FlowBean bean = new FlowBean(); 81 * 82 * bean.write(out) 把bean这个对象的四个属性序列化出去 83 * 84 * this = bean 85 */ 86 @Override 87 public void write(DataOutput out) throws IOException { 88 // TODO Auto-generated method stub 89 90 out.writeUTF(phone); 91 out.writeLong(upFlow); 92 out.writeLong(downFlow); 93 out.writeLong(sumFlow); 94 95 } 96 97 98 // 序列化方法中的写出的字段顺序, 一定一定一定要和 反序列化中的 接收顺序一致。 类型也一定要一致 99 100 101 /**102 * bean.readField();103 * 104 * upFlow = 105 */106 @Override107 public void readFields(DataInput in) throws IOException {108 // TODO Auto-generated method stub109 110 phone = in.readUTF();111 upFlow = in.readLong();112 downFlow = in.readLong();113 sumFlow = in.readLong();114 115 }116 117 118 119 /**120 * Hadoop的序列化机制为什么不用 java自带的实现 Serializable这种方式?121 * 122 * 本身Hadoop就是用来解决大数据问题的。123 * 124 * 那么实现Serializable接口这种方式,在进行序列化的时候。除了会序列化属性值之外,还会携带很多跟当前这个对象的类相关的各种信息125 * 126 * Hadoop采取了一种全新的序列化机制;只需要序列化 每个对象的属性值即可。127 */128 129 130 131 /*@Override132 public void readFields(DataInput in) throws IOException {133 value = in.readLong();134 }135 136 @Override137 public void write(DataOutput out) throws IOException {138 out.writeLong(value);139 }*/140 141 142 /**143 * 用来指定排序规则144 */145 @Override146 public int compareTo(FlowBean fb) {147 148 long diff = this.getSumFlow() - fb.getSumFlow();149 150 if(diff == 0){151 return 0;152 }else{153 return diff > 0 ? -1 : 1;154 }155 156 }157 }
View Code

第三题

package comg.ghgj.mr.flow;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.ProvincePartitioner;public class FlowPartitionerMR {    public static void main(String[] args) throws Exception {                        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(conf);        Job job = Job.getInstance(conf, "FlowSumMR");        job.setJarByClass(FlowPartitionerMR.class);                job.setMapperClass(FlowPartitionerMRMapper.class);        job.setReducerClass(FlowPartitionerMRReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                        /**         * 非常重要的两句代码         */        job.setPartitionerClass(ProvincePartitioner.class);        job.setNumReduceTasks(10);                        FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\flow\\input"));        Path outputPath = new Path("E:\\bigdata\\flow\\output_ptn2");        if(fs.exists(outputPath)){            fs.delete(outputPath, true);        }        FileOutputFormat.setOutputPath(job, outputPath);                        boolean isDone = job.waitForCompletion(true);        System.exit(isDone ? 0 : 1);    }        public static class FlowPartitionerMRMapper extends Mapper
{ /** * value = 13502468823 101663100 1529437140 1631100240 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String outkey = split[1]; String outValue = split[8] + "\t" + split[9]; context.write(new Text(outkey), new Text(outValue)); } } public static class FlowPartitionerMRReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; for(Text t : values){ String[] split = t.toString().split("\t"); int upTempFlow = Integer.parseInt(split[0]); int downTempFlow = Integer.parseInt(split[1]); upFlow+=upTempFlow; downFlow += downTempFlow; } sumFlow = upFlow + downFlow; context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow)); } }}

 

转载地址:http://hkvko.baihongyu.com/

你可能感兴趣的文章
【转】性能测试步骤
查看>>
OSI与TCP/IP各层的结构与功能,都有哪些协议
查看>>
Android实例-程序切换到后台及从后台切换到前台
查看>>
spring boot启动定时任务
查看>>
值类型和引用类型
查看>>
查看外键属性
查看>>
[转]html5 Canvas画图教程(6)—canvas里画曲线之arcTo方法
查看>>
maven 常用插件
查看>>
算法 (二分查找算法)
查看>>
java Date 当天时间戳处理
查看>>
Python~迭代
查看>>
linux常用命令-关机、重启
查看>>
css布局 - 九宫格布局的方法汇总(更新中...)
查看>>
画图函数——点,线,矩形等等
查看>>
ejabberd_local
查看>>
BZOJ5020 [THUWC 2017]在美妙的数学王国中畅游LCT
查看>>
hdu 6030 矩阵快速幂
查看>>
tomcat类加载机制
查看>>
ado.net2.0中的缓存使用SqlDependency类
查看>>
Java基础学习总结(94)——Java线程再学习
查看>>