MapReduce对手机上网记录的简单分析和Partitioner分区

概览

1.MapReduce处理手机上网记录
2.Partitioner分区

上次说过了关于MapReduce的执行流程和原理,下面来说下分区和简单示例

1.MapReduce处理手机上网记录

首先我们需要先模拟一个通话记录文件

在Windows的桌面建个tel.log的文件,里面模拟一些通话记录信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200

这些字段代表的是
在这里插入图片描述

首先我们需要将部分字段提取出来,以便之后进行分析

在主机master上启动hadoop集群,hadoop集群版的搭建可以参照简单的hadoop集群搭建

1
start-all.sh

验证是否启动成功

然后将tel.log文件利用Xftp传输到虚拟机中的/usr/tmp下cd /usr/tmp/

然后上传到hdfs

1
2
3
4
5
6
#上传
[root@master tmp]# hadoop fs -put tel.log /
#查看
[root@master tmp]# hadoop fs -ls /
Found 1 items
-rw-r--r-- 1 root supergroup 2315 2018-10-19 19:33 /tel.log

然后在eclipse上新建java项目,并在项目下建个lib文件夹,然后将jar包放到lib中导入项目
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
然后创建包,创建一个telBean实体类,这次我们分析的是
手机号和其对应的上行流量,下行流量和总流量
所以将其封装成实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.hd.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class TelBean implements Writable{

private String tel;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;



public String getTel() {
return tel;
}

public void setTel(String tel) {
this.tel = tel;
}

public long getUpPayLoad() {
return upPayLoad;
}

public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}

public long getDownPayLoad() {
return downPayLoad;
}

public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}

public long getTotalPayLoad() {
return totalPayLoad;
}

public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}

public TelBean(String tel, long upPayLoad, long downPayLoad, long totalPayLoad) {
super();
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = totalPayLoad;
}

public TelBean() {
super();
// TODO Auto-generated constructor stub
}

@Override
public String toString() {
return tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t"
+ totalPayLoad ;
}

//反序列化的过程
@Override
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}

//序列化的过程
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(this.tel);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
out.writeLong(this.totalPayLoad);
}


}

在mr包下创建个TelMapper类继承Mapper
首先分析一下,我们要传入的第一个需要Map处理的<k1,v1>是long类型(电话号码)和String(Text)类型(与之对应的一行记录),而从Map处理过的<k2,v2>是String类型(电话号码)和TelBean对象(将我们需要的字段封装成对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.hd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.zy.hadoop.entity.TelBean;

//k1,v1 long string k2,v2 string TelBean
public class TelMapper extends Mapper<LongWritable, Text, Text, TelBean> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TelBean>.Context context)
throws IOException, InterruptedException {
//value 对应 tel.log中的每一行数据,行中的数据以\t隔开的
String line = value.toString();
//对正航读取的数据进行拆分
String[] res = line.split("\t");//0---res.length-1
//取数组中的电话号码
String tel = res[1];
//取上行流量
long upPayLoad = Long.parseLong(res[8]);
//取下行流量
long downPayLoad = Long.parseLong(res[9]);
//创建telBean对象
TelBean telBean = new TelBean(tel, upPayLoad, downPayLoad, 0);
context.write(new Text(tel), telBean);
}

}

然后创建个TelReducer类继承Reducer
分析一下,这里传入的<k2,v2>是String(Text)类型和TelBean类型,而我们处理过输出的<k3,v3>也是相同类型,这里要记得将TelBean的toString方法重写,不然输出的是对象地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.hd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.zy.hadoop.entity.TelBean;

public class TelReducer extends Reducer<Text, TelBean, Text, TelBean> {

@Override
protected void reduce(Text key, Iterable<TelBean> value, Reducer<Text, TelBean, Text, TelBean>.Context context)
throws IOException, InterruptedException {
//声明一个上行流量的变量
long upPayLoad = 0;
//声明一个下行流量
long downPayLoad = 0;
for (TelBean telBean : value) {
//统计相同电话的上行流量的和,下行流量的和
upPayLoad += telBean.getUpPayLoad();
downPayLoad += telBean.getDownPayLoad();
}

//k3 v3
TelBean telBean= new TelBean(key.toString(), upPayLoad, downPayLoad, upPayLoad+downPayLoad);

context.write(key, telBean);
}

}

最后我们创建个主方法TelCount类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.zy.hadoop.mr1;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zy.hadoop.entity.TelBean;


public class TelCount {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.指定job使用的类
job.setJarByClass(TelCount.class);

// 3.设置Mapper的属性
job.setMapperClass(TelMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TelBean.class);

// 4.设置输入文件 args[0]手动输入输入文件的位置
FileInputFormat.setInputPaths(job, new Path(args[0]));

// 5.设置reducer的属性
job.setReducerClass(TelReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(TelBean.class);

// 6.设置输出文件夹,查看结果保存到hdfs文件夹中的位置
//args[1]手动输入输出文件的位置
FileOutputFormat.setOutputPath(job, new Path(args[1]));


// 7.提交 true 提交的时候打印日志信息
job.waitForCompletion(true);

}

}

最后将其打成Jar包,主方法选择TelCount,然后上传到虚拟机/usr/tmp下

然后执行jar包

1
hadoop jar tel_1.jar /tel.log /tel1

等待执行成功后查看结果文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#结果如下
[root@master tmp]# hadoop fs -cat /tel1/part-r-00000
13480253104 13480253104 180 180 360
13502468823 13502468823 7335 110349 117684
13560436666 13560436666 2232 1908 4140
13560439658 13560439658 2034 5892 7926
13602846565 13602846565 1938 2910 4848
13660577991 13660577991 6960 690 7650
13719199419 13719199419 240 0 240
13726230503 13726230503 2481 24681 27162
13726238888 13726238888 2481 24681 27162
13760778710 13760778710 120 120 240
13826544101 13826544101 264 0 264
13922314466 13922314466 3008 3720 6728
13925057413 13925057413 11058 48243 59301
13926251106 13926251106 240 0 240
13926435656 13926435656 132 1512 1644
15013685858 15013685858 3659 3538 7197
15920133257 15920133257 3156 2936 6092
15989002119 15989002119 1938 180 2118
18211575961 18211575961 1527 2106 3633
18320173382 18320173382 9531 2412 11943
84138413 84138413 4116 1432 5548

这说明执行成功了

2.Partitioner分区

什么是Partitioner?

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。

我们还是处理手机的上网记录
在之前的mr包中见一个TCPartitioner类
我们将135和136开头的号码视为移动用户,处理结果放到一起(part-r-00000),另外的号码处理结果放到一起(part-r-00001)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.hd.hadoop.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import com.zy.hadoop.entity.TelBean;

public class TCPartitioner extends Partitioner<Text, TelBean> {

@Override
public int getPartition(Text text, TelBean telBean, int arg2) {
// TODO Auto-generated method stub

String tel = text.toString();
String sub_tel = tel.substring(0, 3);//取手机号前三位进行分区

//假设135 136的为移动的 放一个分区,其他的放一个分区
if(sub_tel.equals("135")||sub_tel.equals("136")){
//return的数对应着计算结果文件 part-r-00001
return 1;
}

return 0;
}

}

然后在TelCount添加几行代码

在这里插入图片描述

再将项目打成jar包放入到虚拟机/usr/tmp下,然后执行

1
hadoop jar tel_2.jar /tel.log /tel2

等待执行完毕后查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
#查看生成几个结果文件
[root@master tmp]# hadoop fs -ls /tel2
Found 3 items
-rw-r--r-- 1 root supergroup 0 2018-10-19 20:10 /tel2/_SUCCESS
-rw-r--r-- 1 root supergroup 603 2018-10-19 20:10 /tel2/part-r-00000
-rw-r--r-- 1 root supergroup 198 2018-10-19 20:10 /tel2/part-r-00001
#查看135和136开头的手机号的结果
[root@master tmp]# hadoop fs -cat /tel2/part-r-00001
13502468823 13502468823 7335 110349 117684
13560436666 13560436666 2232 1908 4140
13560439658 13560439658 2034 5892 7926
13602846565 13602846565 1938 2910 4848
13660577991 13660577991 6960 690 7650

这就是简单的分区操作

接下来还有如何将分析的结果进行排序操作