将MapReduce分析手机上网记录的结果进行排序操作

上次我们说过了MapReduce对手机上网记录的简单分析和Partitioner分区
这次我们介绍一下如何将手机上网记录根据总流量的多少进行排序

1.编写Java代码,并将其打包成jar包

在eclipse上创建个新的java项目,创建lib文件夹,将上次的jar同样导入进来

然后创建个TelBean类
这里实现了WritableComparable接口,就是序列化的比较,详情查询api文档

public interface Comparator比较功能,对一些对象的集合施加了一个整体排序 。 可以将比较器传递给排序方法(如Collections.sort或Arrays.sort ),以便对排序顺序进行精确控制。 比较器还可以用来控制某些数据结构(如顺序sorted sets或sorted maps ),或对于不具有对象的集合提供的排序natural ordering 。
通过比较c上的一组元素S的确定的顺序对被认为是与equals一致当且仅当c.compare(e1, e2)==0具有用于S每e1和e2相同布尔值e1.equals(e2)。

当使用能够强制排序不一致的比较器时,应注意使用排序集(或排序图)。 假设具有显式比较器c的排序集(或排序映射)与从集合S中绘制的元素(或键)
一起使用 。 如果88446235254451上的c强制的排序与equals不一致,则排序集(或排序映射)将表现为“奇怪”。
特别是排序集(或排序图)将违反用于设置(或映射)的一般合同,其按equals定义。

例如,假设一个将两个元件a和b ,使得(a.equals(b) && c.compare(a, b) != 0)到空TreeSet与比较c
。 因为a和b与树集的角度不相等,所以第二个add操作将返回true(并且树集的大小将增加),即使这与Set.add方法的规范相反。

注意:这通常是一个好主意比较,也能实现java.io.Serializable,因为它们可能被用来作为排序的序列化数据结构的方法(如TreeSet
, TreeMap )。 为了使数据结构成功序列化,比较器(如果提供)必须实现Serializable 。

对于数学上的倾斜,即限定了施加顺序 ,给定的比较器c上一组给定对象的S强加关系式为:

{(x, y) such that c.compare(x, y) <= 0}. 这个总订单的商是: {(x, y) such
that c.compare(x, y) == 0}. 它从合同compare,该商数是S的等价关系紧随其后,而强加的排序是S, 总订单 。
当我们说S上的c所规定的顺序与等于一致时,我们的意思是排序的商是由对象’ equals(Object)方法定义的等价关系: {(x,
y) such that x.equals(y)}. 与Comparable不同,比较器可以可选地允许比较空参数,同时保持对等价关系的要求。

此接口是成员Java Collections Framework 。

从以下版本开始:
1.2 另请参见: Comparable , Serializable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.zy.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class TelBean implements WritableComparable<TelBean>{

private String tel;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;



public String getTel() {
return tel;
}

public void setTel(String tel) {
this.tel = tel;
}

public long getUpPayLoad() {
return upPayLoad;
}

public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}

public long getDownPayLoad() {
return downPayLoad;
}

public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}

public long getTotalPayLoad() {
return totalPayLoad;
}

public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}

public TelBean(String tel, long upPayLoad, long downPayLoad, long totalPayLoad) {
super();
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = totalPayLoad;
}

public TelBean() {
super();
// TODO Auto-generated constructor stub
}

@Override
public String toString() {
return tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t"
+ totalPayLoad ;
}

//反序列化的过程
@Override
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}

//序列化的过程
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(this.tel);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
out.writeLong(this.totalPayLoad);
}
//compare比较,详情查阅java的api文档
@Override
public int compareTo(TelBean bean) {
// TODO Auto-generated method stub
return (int)(this.totalPayLoad-bean.getTotalPayLoad());
}

}

然后在mr包下依次建立SortMapper,SortReducer,SortCount

关于分析可以查看MapReduce对手机上网记录的简单分析和Partitioner分区
SortMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.zy.hadoop.mr2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.zy.hadoop.entity.TelBean;

public class SortMapper extends Mapper<LongWritable, Text, TelBean, NullWritable>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TelBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//value ,第一mr出来的结果中的每一行
String line = value.toString();
//拆分字符串"\t"
String[] strs = line.split("\t");
//直接通过下标取值
//电话号码

String tel = strs[0];
//上行流量
long upPayLoad=Long.parseLong(strs[2]);
//下行流量
long downPayLoad=Long.parseLong(strs[3]);
//总流量
long totalPayLoad=Long.parseLong(strs[4]);
//把去除的值封装到对象中
TelBean telBean = new TelBean(tel, upPayLoad, downPayLoad, totalPayLoad);
//输出k2,v2
context.write(telBean, NullWritable.get());
}

}

SortReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.zy.hadoop.mr2;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import com.zy.hadoop.entity.TelBean;

public class SortReducer extends Reducer<TelBean, NullWritable, TelBean, NullWritable>{

@Override
protected void reduce(TelBean arg0, Iterable<NullWritable> arg1,
Reducer<TelBean, NullWritable, TelBean, NullWritable>.Context arg2)
throws IOException, InterruptedException {
arg2.write(arg0, NullWritable.get());
}


}

SortCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.zy.hadoop.mr2;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zy.hadoop.entity.TelBean;

public class SortCount {

public static void main(String[] args) throws Exception {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.指定job使用的类
job.setJarByClass(SortCount.class);

// 3.设置Mapper的属性
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(TelBean.class);
job.setMapOutputValueClass(NullWritable.class);

// 4.设置输入文件
FileInputFormat.setInputPaths(job, new Path(args[0]));

// 5.设置reducer的属性
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(TelBean.class);
job.setMapOutputValueClass(NullWritable.class);

// 6.设置输出文件夹,查看结果保存到hdfs文件夹中的位置
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.提交 true 提交的时候打印日志信息
job.waitForCompletion(true);
}

}

接下来将项目打包成jar包,上传到虚拟机/usr/tmp下

2.虚拟机上运行jar包,查看结果

启动hadoop集群服务

1
start-all.sh

查看是否成功

我们将之前处理过一次的文件/tel1/part-r-00000(/tel2下的进行过分区了,所以不进行处理)作为源文件进行分析排序

1
hadoop jar tel_3.jar /tel/part-r-00000 /tel3

等待执行完毕查看结果

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@master tmp]# hadoop fs -ls /
hadoopFound 5 items
-rw-r--r-- 1 root supergroup 2315 2018-10-19 19:33 /tel.log
drwxr-xr-x - root supergroup 0 2018-10-19 19:59 /tel1
drwxr-xr-x - root supergroup 0 2018-10-19 20:10 /tel2
drwxr-xr-x - root supergroup 0 2018-10-19 20:47 /tel3
drwx------ - root supergroup 0 2018-10-19 19:58 /tmp
[root@master tmp]# hadoop fs -ls /tel3
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-10-19 20:47 /tel3/_SUCCESS
-rw-r--r-- 1 root supergroup 477 2018-10-19 20:47 /tel3/part-r-00000
[root@master tmp]# hadoop fs -cat /tel3/part-r-00000
13926251106 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13560436666 2232 1908 4140
13602846565 1938 2910 4848
84138413 4116 1432 5548
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726238888 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684

这就是MapReduce进行简单的数据分析

不过hadoop集群的namenode如果只有一个,namenode机器宕机整个集群都会不可用 , 接下来会介绍zookeeper的高可用hadoop集群如何搭建