1 Map 、Reduce和主类
package com.wzt.mapreduce.secondsort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.wzt.mapreduce.wordcount.WCRunner; public class SecSortMain { public static class SecSortMapper extends Mapper<LongWritable, Text, FirstSortEntity, IntWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] spilted = line.split(" "); // 为了显示效果而输出Mapper的输出键值对信息 System.out.println("Mapper输出<" + spilted[0] + "," + spilted[1] + ">"+this); context.write(new FirstSortEntity(spilted[0], Integer.parseInt(spilted[1])) , new IntWritable(Integer.parseInt(spilted[1])) ); }; } public static class SecSortReducer extends Reducer<FirstSortEntity, IntWritable , FirstSortEntity, IntWritable> { @Override protected void reduce( FirstSortEntity key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 显示次数表示redcue函数被调用了多少次,表示k2有多少个分组 System.out.println("Reducer输入分组<" + key+ ",N(N>=1)>"+this); StringBuffer sb = new StringBuffer() ; for (IntWritable value : values) { //count += value.get(); // 显示次数表示输入的k2,v2的键值对数量 sb.append( value+" , " ) ; System.out.println("Reducer输入键值对<" + key.toString() + "," + value.get() + "> 组"+sb.toString() ); } // if(sb.length()>0){ // sb.deleteCharAt( -1 ) ; // } context.write(key, key.getSecondkey()); //context.write(key.getFirstkey(), new Text(sb.toString() )); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration() ; Job job = Job.getInstance(conf) ; job.setJarByClass(WCRunner.class ); job.setMapperClass( SecSortMapper.class ); job.setMapOutputKeyClass( FirstSortEntity.class); job.setMapOutputValueClass( IntWritable.class ); //设置分区方法 job.setPartitionerClass( SSPartintioner.class);//不同 //会有几个reduce去执行最后的汇总数据, 有几个分区就要有几个reduce ,最后就会生成几个reduce ,这里设置为1 ,没看到调用但是确实分区了,没弄明白 job.setNumReduceTasks(1);//当任务数为1的时候设置Partitioner是没有用的 //数据做总的排序 job.setSortComparatorClass(MySSSortComparator.class) ; //排序 //总数据 记性分组 job.setGroupingComparatorClass( GroupComparator.class );//分组 job.setReducerClass( SecSortReducer.class ); job.setOutputKeyClass( FirstSortEntity.class ); job.setOutputValueClass(IntWritable.class ); // FileInputFormat.setInputPaths(job, "/wc/input/xiyou.txt"); // FileOutputFormat.setOutputPath(job, new Path("/wc/output6")); FileInputFormat.setInputPaths(job, "/sort/input"); FileOutputFormat.setOutputPath(job, new Path("/sort/output1")); job.waitForCompletion(true) ; } }
2 自定义 组合key
package com.wzt.mapreduce.secondsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; /** * 自定义组合件 * @author root * */ public class FirstSortEntity implements WritableComparable<FirstSortEntity>{ private Text firstkey ; private IntWritable secondkey ; public FirstSortEntity( ) { } public FirstSortEntity(Text firstkey, IntWritable secondkey) { this.firstkey = firstkey; this.secondkey = secondkey; } public FirstSortEntity(String firstkey, int secondkey) { this.firstkey = new Text(firstkey); this.secondkey = new IntWritable(secondkey); } public Text getFirstkey() { return firstkey; } public void setFirstkey(Text firstkey) { this.firstkey = firstkey; } public IntWritable getSecondkey() { return secondkey; } public void setSecondkey(IntWritable secondkey) { this.secondkey = secondkey; } /** * 对象序列化 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstkey.toString() ); out.writeInt( secondkey.get() ); } //对象反序列化 @Override public void readFields(DataInput in) throws IOException { firstkey = new Text(in.readUTF() ); secondkey = new IntWritable(in.readInt()); } /** * 排序在map执行后数据传出后 会调用这个方法对key进行排序 * 数据map后,如果设置了分区并且reduce>1 的话,会执行分区类方法,进行分区 */ @Override public int compareTo(FirstSortEntity entity) { //利用这个来控制升序或降序 //this本对象写在前面代表是升序 //this本对象写在后面代表是降序 return this.firstkey.compareTo( entity.getFirstkey()); //return this.secondkey.get()>entity.getSecondkey().get()?1:-1; } @Override public String toString() { return this.getFirstkey() +" "+this.getSecondkey()+ " " ; } }
3 自定义分区
package com.wzt.mapreduce.secondsort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; //自定义 分区 public class SSPartintioner extends Partitioner<FirstSortEntity, IntWritable>{ /** * key map输出的key * value map 输出的value * map后的数据 经过排序后传进这个分区方法,如果返回的值相同的数据,值相同的数据会分配到一组中 ,即 放到一堆 * 到此 数据为N堆,并且数据是经过排序的 */ @Override public int getPartition(FirstSortEntity key, IntWritable value, int numPartitions) { System.out.println("Partitioner key:"+key.getFirstkey()+" value:"+value+" "+ ( ( key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions ) +" "+this); //System.out.println("Partitioner key:"+key.getFirstkey()+" value:"+value+" "+ ((key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions) +" "+this); return (key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions; //return (key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions; } }
个人理解以上都是在Map阶段进行,即本地操作,以下为Map到Reduce这段进行的
4 自定义整体排序
package com.wzt.mapreduce.secondsort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //组内自定义排序策略 /** * @author root * */ public class MySSSortComparator extends WritableComparator{ public MySSSortComparator() {//注册处理的试题类型 super(FirstSortEntity.class,true); } /** * reduce 处理数据之前 * 对全量数据排序 * 逻辑:分组一样则按照第二个参数排序 ,分组不一样,则按照第一个参数排序 */ @Override public int compare(WritableComparable a, WritableComparable b) { FirstSortEntity e1 = (FirstSortEntity)a; FirstSortEntity e2 = (FirstSortEntity)b; System.out.println( e1.getFirstkey()+"==MySSSortComparator 排序 。。 "+e2.getFirstkey()); //首先要保证是同一个组内,同一个组的标识就是第一个字段相同 if(!e1.getFirstkey().equals( e2.getFirstkey())){ return e1.getFirstkey().compareTo(e2.getFirstkey()); }else{ return e1.getSecondkey().get() - e2.getSecondkey().get() ; } } }
5 自定义分组
package com.wzt.mapreduce.secondsort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //对象分组策略 //数据放到 reduce前 ,对数据进行分组 public class GroupComparator extends WritableComparator{ public GroupComparator() { //注册处理的试题类型 super(FirstSortEntity.class,true ) ; } /** * 对排序后的数据 分组, * 第一个参数相同的,放到一个key的 迭代器 集合中 */ @Override public int compare(WritableComparable a, WritableComparable b) { FirstSortEntity e1 = (FirstSortEntity)a; FirstSortEntity e2 = (FirstSortEntity)b; System.out.println( e1.getFirstkey()+"==GroupComparator = 分组=="+e2.getFirstkey()); return e1.getFirstkey().toString().compareTo( e2.getFirstkey().toString()); //return e1.getSecondkey().compareTo( e2.getSecondkey()); } }
在以后就是主类中的reduce进行数据处理
下面这个类作为自己的记录,这里没用:
package com.wzt.mapreduce.secondsort; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; //自定义分组比较器 //这个类 暂时没用, 分组比较器的 实现,但没有测试 public class SSGroupComparator implements RawComparator<FirstSortEntity>{ @Override public int compare(FirstSortEntity o1, FirstSortEntity o2) { return o1.getSecondkey().get()>o2.getSecondkey().get()?1:-1; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { //对象可以这样反序列化 //IntWritable d ; System.out.println( "SSGroupComparator 自定义分组 =" ); ByteArrayInputStream bis = new ByteArrayInputStream(b1); DataInput in1 = new DataInputStream(bis); FirstSortEntity entity1 = new FirstSortEntity(); ByteArrayInputStream bis2 = new ByteArrayInputStream(b2); DataInput in2 = new DataInputStream(bis2); FirstSortEntity entity2 = new FirstSortEntity(); try { entity1.readFields(in1); entity2.readFields(in2); } catch (IOException e) { e.printStackTrace(); } return entity1.getFirstkey().compareTo( entity2.getFirstkey()); } }
相关推荐
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN 网址:https://blog.csdn.net/chenwewi520feng/article/details/130454036 本文介绍MapReduce常见的基本用法。 前提是hadoop环境可正常运行。 ...
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
Hadoop 培训课程(4)MapReduce_2 标准和自定义计数器* Combiner和Partitioner编程** 自定义排序和分组编程** 常见的MapReduce算法** ---------------------------加深拓展---------------------- 常见大数据处理方法*
在 MapReduce 中,使用复合映射输出键并自定义对哪些字段进行分区、排序和分组可能很乏味,尤其是在跨多个作业执行此操作时。 这个库的目标是提供一个Tuple类,它可以包含多个元素,并提供一个ShuffleUtils类,为您...
主要包括mapreduce自定义分区,排序,分组的过程,最后以实际在生产中的小demo作为演示,可以让初学者对于mapreduce程序有更深层次的了解。
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi ... 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi
温度排序,实现自定义分组分区排序,到出来的jar
技术点46 避免reducer 技术点47 过滤和投影技术点48 使用 combiner技术点49 超炫的使用比较器的快速排序6.4.4 减轻倾斜技术点50 收集倾斜数据技术点51 减轻reducer 阶段倾斜6.4.5 在MapReduce 中优化...
04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码分析.avi 09-倒排索引的mr实现.avi 10-多个job在同一个...
技术点26 在HDFS、MapReduce、Pig 和Hive 中使用数据压缩 技术点27 在MapReduce、Hive 和Pig 中处理可分割的LZOP 5.3 本章小结 6 诊断和优化性能问题 6.1 衡量MapReduce 和你的环境 6.1.1 提取作业统计...
目录网盘文件永久链接 1-MapReduce.rar 2 MapReducel的源简介和自定义类型rar 3 mapReducel的剩余核环节解rar 4 MapReduce的自定V排序和分组rar 5 hadoop的集群安装和安全模式个绍rar 代码部分rar
第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby...
第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 ...