一、需求分析
1、最初的解决方案
如何将这一批数据按组划分?
实际上需要使用到map去进行分组。那么怎么去划分key呢?我们的需求是需要知道气温最高的两天,那能否取日期的“年-月”充当key,而把具体某一天去掉? 答案是不行的,因为这样算出的数据可能会出现最高气温的两天是同一天的记录,因为没有“日”,也就是没有精确到某一天作为key,故此数据是不精确的。
所以我们需要将"年-月"作为一个key,然后利用map按key的这种格式进行分组,与此同时,我们的”日“也是不可以丢掉的,我们可以将日拼接到value前面,用逗号隔开。
map无疑是只做了一个按key分组的动作,然后使用分区器partitioner确定每一个key-value的分区号,放进环形缓冲区中,等待(溢写线程)按key和分区号进行排序生成内部有序外部无序的小文件,然后reduce再拉取对应分区号已排好序的分组记录。
reduce拉取到对应的分组记录后,它需要对拉取过来的分组记录,每一组内的所有气温数据进行遍历比较,使用两个变量,遍历气温,对比日期是否为同一天,如果是,还需要做去重过滤等复杂的逻辑。因此,reduce端的计算压力就比较大。
2、更好的key设计方案
能否对上面这个逻辑进行调优呢?
首先我们要知道,一个map是否合理的对key划分并且排序,对reduce端的计算性能影响是十分重要的。我们的切入点应该在map端key的设计。你想下,能否将气温也做成key中的一员,然后“日”充当value呢?也就是下面的构造:
答案是ok的,除此之外,我们需要对key做一个排序处理,以及分区计算的处理。因为默认的情况下,map端的分区器(partitioner)是 “使用整个key的hash值取模reducer数量” 用来确定分区号的。
因此在分区器上,我们需要自定义,按照以下的逻辑对key进行分区号计算:
也就是刻意不加上温度,而是将""年-月"的hash做分区处理。这样相同"年-月"就肯定会被份到同一个区号,也就是被同一reudcer处理。
除此之外,对key的排序处理也是需要自定义的:
我们将key按照其中的气温值做一个倒序比较。
最终经过mapTask处理后,会得到同一个月的气温倒序的key-value对,例如:
那么reducer0进行拉取的时候,取得下面的分组数据:
在reduce端会将这三条记录视为3组,因为reduce会默认按照map拉取回来的整个key做比较,判断是否为同一组,故此会将不同温度但同月的key视作不同组。因此我们需要在reducer端自定义实现一个分组比较器,因为我们最终目的是找到某个月的最高两天气温。
由于map端已经按"年-月-气温"这个key其中的气温倒序来排列key-value了,故此分组比较器拉过来同一个月的气温数据时,这些数据已经是按照气温倒叙来排好顺序了,这样这些数据的第一条就是当月气温的最高值,我们仅仅需要使用两个变量,一个用来记录气温最高值,一个用来记录气温第二高的值。而且,reduce分组比较器并不需要遍历这个月份的所有气温,只需要拿到最高气温中value记录的”日期“信息,然后取下一条数据做日期对比,只要和最高气温的”日“不一样,那就是次高温!
3、另一种key的设计方案
我们还可以将key设置成如下格式:
实际上原理和上面将的一样,也是需要我们自定义key的排序比较器、分区器、以及reduce端的分组比较器这些组件,来按照温度的倒序进行排序、获取。 这样设计的key相当于一串字符串,字符串在jvm来说是最最复杂的东西;在分析的时候,我们需要对这个key的字符串进行切割,拿出年-月、温度、日,这几个维度信息,然后做比较等操作。
当然我们也可以自定义一种key的类型,也就是不用hadoop为我们提供的远程key类型,比如TextForamat等。但是自定义Key类型的时候,我们也要定义好 key的序列化/反序列化方法、key的排序比较器。因为我们的key是通过map进行分组确认的,最终需要溢写到磁盘存放,这就涉及到了序列化,然后reduce端会拉取这部分文件再将key加载到内存中,这就涉及到反序列化。我们的key也是需要实现按哪些维度去排序,也就涉及到了key的排序器。
注意:当用户在map端指定了sorter排序器的时候,就会覆盖掉key本身的排序器!
二、功能实现
代码实现上面的需求,我们采用这种格式处理。首先我们需要编写下Job任务的入口启动类
1、Job启动类
根据上面的代码,下面梳理以下这个Job所需要用到的一些类:
1、 MapTask所需要定制的类
- 自定义Key类型,实现序列化、反序列化、默认的key排序器,实际上只需要实现WritableComparable接口即可。
- Mapper类,定义map方法进行分组输出,map方法实际上做的工作就是将原始的数据格式,转换成用户自定义的Key的格式,比如将样例数据定制成”年-月-日-温度“这样格式的key,就可以在map方法中实现。
- 自定义分区器Partitioner,因为我们需要按照”年-月“为标准进行分区(需求是求每个月份的最高温的两天)。
- 自定义排序器SortComparator, 因为我们需要对key按照温度的倒叙排,故此需要自己手写排序的标准。
2、ReduceTask所需要定制的类
- 自定义分组比较器GroupingComparator,主要的目的在于,key是需要按照"年-月"这两个维度来算作一组的,而我们map输出的格式的key是以”年-月-日-温度“作为key,故此为了防止reduce使用默认的比较规则(判断key是否相等则为一组),我们需要自己实现分组比较器。
- Reducer类, 定义reduce方法,方法的处理逻辑就是计算同一组key("年-月"为基准划分),迭代这一组key-value值,然后获取这一组key的前两个不同日期的记录,即为当前组(年-月)的最高两天气温,作为reduce的输出。
下面我们就来展示上面提到的这几个定制类。
2、MapTask
2.1、自定义Key类型
这里使用TKey表示TopN的格式化类。也就是map处理后将原始数据封装成Tkey对象,再序列化到磁盘中。
2.2、自定义Partitioner分区器
2.3、自定义key的排序比较器
WritableComparator实现了RawComparator接口,当我们希望自定义比较器里面的排序比较规则时,比如使用自定义的SortComparator而不是使用Key类型自身的Comparator,那么我们就可以去继承WritableComparator,改写它的比较方法。不过WritableComparator的比较方法有好几个,这里列出它的核心两个比较方法如下:
上面的注解很详细了,其中第一个compare方法间接调用的是第二个compare方法,发生真实的key比较行为的方法是第二个compare方法!
那么我们继承WritableComparator的时候,也是重写第二个compare方法,并使用我们自定义的排序比较器来进行key的比较排序!
还有一个特别需要注意的地方:当我们自定义比较器的时候,必须要调用父类WritableComparator的构造器方法(指定key的类型,并且要求创建key的实例)! 否则在上面第一个compare方法调用的时候,key1、key2就是null,会出现空指针异常!
2.4、Mapper类
3、ReduceTask
3.1、自定义分组比较器
3.2 Reducer
其中这里会出现一个现象,就是当前这组数据只是读到前面两个气温最高值的日期就跳出循环了,此时这组数据实际上有可能没有被调用完全,那么当退出reduce方法之后,又会进行context.nextKey()的获取。
那么这里的nextKey()方法实际上并不会还是拿上次没有遍历完的key再调用一次reduce方法,当遇到上一组数据没完全迭代完的时候,就会继续调用nextKeyValue()方法迭代所有剩下的这一组数据:
直到遇到nextKeyIsSame判断不等的时候才会去继续调用nextKeyValue获取下一组的第一条数据,最终返回true或false。然后再调用reduce方法计算下一组数据。