Hadoop大数据技术与应用

hadoop课本代码讲解

第2章 Hadoop集群的搭建及配置

任务2.1示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
代码2-1 ifcfg-ens33文件原有的内容
TYPE="Ethernet"
PROXY_METHOD="none"
BROWSER_ONLY="no"
BOOTPROTO="dhcp"
DEFROUTE="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="6c2a466f-a4b0-4f29-aeee-8ea96252aee4"
DEVICE="ens33"
ONBOOT="no"

代码2-2 修改ifcfg-ens33文件后的内容
TYPE="Ethernet"
PROXY_METHOD="none"
BROWSER_ONLY="no"
BOOTPROTO="static"
DEFROUTE="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="6c2a466f-a4b0-4f29-aeee-8ea96252aee4"
DEVICE="ens33"
ONBOOT="yes"
# 添加内容
IPADDR=192.168.128.130
GATEWAY=192.168.128.2
NETMASK=255.255.255.0
DNS1=8.8.8.8

代码2-3 将除yum本地源以外的其他yum源禁用
mv CentOS-Base.repo CentOS-Base.repo.bak
mv CentOS-Debuginfo.repo CentOS-Debuginfo.repo.bak
mv CentOS-fasttrack.repo CentOS-fasttrack.repo.bak
mv CentOS-Vault.repo CentOS-Vault.repo.bak

代码2-4 CentOS-Media.repo修改前的内容
[c7-media]
name=CentOS-$releasever - Media
baseurl=file:///media/CentOS/
file:///media/cdrom/
file:///media/cdrecorder/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

代码2-5 CentOS-Media.repo修改后的内容
[c7-media]
name=CentOS-$releasever - Media
baseurl=file:///media/
gpgcheck=0
enabled=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

任务2.2示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
代码2-6 修改core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/var/log/hadoop/tmp</value>
</property>
</configuration>

代码2-7 修改hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_281-amd64

代码2-8 修改yarn-env.sh文件
# export JAVA_HOME=/home/y/libexec/jdk1.6.0/
export JAVA_HOME=/usr/java/jdk1.8.0_281-amd64

代码2-9 mapred-site.xml文件添加的内容
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- jobhistory properties -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
</configuration>

代码2-10 yarn-site.xml文件修改的内容
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>${yarn.resourcemanager.hostname}:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>${yarn.resourcemanager.hostname}:8030</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>${yarn.resourcemanager.hostname}:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address</name>
<value>${yarn.resourcemanager.hostname}:8090</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>${yarn.resourcemanager.hostname}:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>${yarn.resourcemanager.hostname}:8033</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/data/hadoop/yarn/local</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/data/tmp/logs</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://master:19888/jobhistory/logs/</value>
<description>URL for job history server</description>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>1</value>
</property>
</configuration>

代码2-11 修改workers文件
slave1
slave2
slave3

代码2-12 修改hdfs-site.xml文件
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///data/hadoop/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///data/hadoop/hdfs/data</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>

代码2-13 修改start-dfs.sh和stop-dfs.sh文件
HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

代码2-14 修改start-yarn.sh和stop-yarn.sh文件
YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

代码2-15 修改/etc/hosts文件
192.168.128.130 master master.centos.com
192.168.128.131 slave1 slave1.centos.com
192.168.128.132 slave2 slave2.centos.com
192.168.128.133 slave3 slave3.centos.com

代码2-16 修改slave1的ifcfg-ens33文件的IPADDR
IPADDR=192.168.128.131

代码2-17 重启网络服务和查看IP
# 重启网络服务
systemctl restart network
# 查看IP
ip addr

代码2-18 修改slave1的主机名称
# 修改slave1的主机名称
hostnamectl set-hostname slave1

代码2-19 将公钥复制到远程机器中的命令
# 依次输入yes,123456(root用户的密码)
ssh-copy-id -i /root/.ssh/id_rsa.pub master
ssh-copy-id -i /root/.ssh/id_rsa.pub slave1
ssh-copy-id -i /root/.ssh/id_rsa.pub slave2
ssh-copy-id -i /root/.ssh/id_rsa.pub slave3

代码2-20 修改master主节点的ntp.conf文件
restrict 192.168.0.0 mask 255.255.255.0 nomodify notrap
server 127.127.1.0
fudge 127.127.1.0 stratum 10

代码2-21 修改子节点的ntp.conf文件
server master

代码2-22 设置环境变量
export HADOOP_HOME=/usr/local/hadoop-3.1.4
export JAVA HOME=/usr/java/jdk1.8.0_281-amd64
export PATH=$HADOOP_HOME/bin:$PATH:$JAVA_HOME/bin

代码2-23 启动集群命令
cd $HADOOP_HOME # 进入Hadoop安装目录
sbin/start-dfs.sh # 启动HDFS相关服务
sbin/start-yarn.sh # 启动YARN相关服务
sbin/mr-jobhistory-daemon.sh start historyserver # 启动日志相关服务

代码2-24 关闭集群命令
cd $HADOOP_HOME # 进入Hadoop安装目录
sbin/stop-yarn.sh # 关闭YARN相关服务
sbin/stop-dfs.sh # 关闭HDFS相关服务
sbin/mr-jobhistory-daemon.sh stop historyserver # 关闭日志相关服务

代码2-25 修改本地host文件
192.168.128.130 master master.centos.com
192.168.128.131 slave1 slave1.centos.com
192.168.128.132 slave2 slave2.centos.com
192.168.128.133 slave3 slave3.centos.com

第3章 Hadoop基础操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
代码3-1 创建目录命令
hdfs dfs -mkdir /user/dfstest

代码3-2 创建多级目录
hdfs dfs -mkdir -p /user/test/example

代码3-3 文件上传命令
hdfs dfs -copyFromLocal a.txt /user/dfstest
hdfs dfs -put a.txt /user/dfstest/c.txt
hdfs dfs -moveFromLocal a.txt /user/dfstest/b.txt

代码3-4 文件下载命令
hdfs dfs -copyToLocal /user/dfstest/a.txt /data/hdfs_test/
hdfs dfs -get /user/dfstest/c.txt /data/hdfs_test/

代码3-5 查看文件内容命令
hdfs dfs -cat /user/dfstest/a.txt
hdfs dfs -tail /user/dfstest/b.txt

代码3-6 删除文件命令
hdfs dfs -mkdir /user/dfstest/rmdir
hdfs dfs -rm /user/dfstest/c.txt
hdfs dfs -rmdir /user/dfstest/rmdir

代码3-7 上传email_log.txt至HDFS的/user/root/目录
hdfs dfs -put /root/hadoop/email_log.txt /user/root/

代码3-8 使用hadoop jar命令提交MapReduce任务命令
hadoop jar \
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount \
/user/root/email_log.txt \
/user/root/output

代码3-9 执行统计登录次数程序的命令
hadoop jar \
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount \
/user/root/email_log.txt \
/user/root/output

代码3-10 执行估算PI值的任务命令
hadoop jar \
/usr/local/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
pi \
10 \
100

代码3-11 统计用户登录次数的命令
hadoop jar \
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount \
/user/root/email_log.txt \
/user/root/output1

代码3-12 执行估算PI值的命令
hadoop jar \
/usr/local/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
pi \
10 \
100

代码3-13 估算PI值
hadoop jar \
/usr/local/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
pi \
30 \
5000

代码3-14 统计用户登录次数
hadoop jar \
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount \
/user/root/email_log.txt \
/user/root/output2

第4章 MapReduce编程入门

任务实现4.2

WordCount源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
 //代码4.1
package Other;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

// 定义一个词频统计的类
public class WordCount {

// Mapper类,用于处理输入数据并输出键值对
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {

// 定义一个常量值1,用于表示每个单词出现的次数
private final static IntWritable one = new IntWritable(1);
// 定义一个Text对象用于存储每个单词
private Text word = new Text();

// map方法,负责将输入的文本数据按单词进行分割并输出<单词, 1>的键值对
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// 将输入的每一行文本转换为字符串并使用StringTokenizer按空格分割单词
StringTokenizer itr = new StringTokenizer(value.toString());
// 遍历所有单词
while (itr.hasMoreTokens()) {
// 取出下一个单词并设置为当前的word
word.set(itr.nextToken());
// 输出<单词, 1>的键值对
context.write(word, one);
}
}
}

// Reducer类,用于将相同的单词累加词频
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
// 定义一个IntWritable对象用于存储最终的结果
private IntWritable result = new IntWritable();

// reduce方法,负责将相同单词的所有值进行累加
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
// 初始化累加器
int sum = 0;
// 遍历当前单词的所有值,并进行累加
for (IntWritable val : values) {
sum += val.get();
}
// 将累加结果设置到result中
result.set(sum);
// 输出<单词, 累加结果>的键值对
context.write(key, result);
}
}

// 主方法,设置作业配置并启动作业
public static void main(String[] args) throws Exception {
// 创建一个Hadoop配置对象
Configuration conf = new Configuration();
// 解析命令行参数,获取输入和输出路径
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 如果没有指定足够的参数,输出错误信息并退出
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// 创建一个MapReduce作业,并设置作业名称为"word count"
Job job = Job.getInstance(conf, "word count");
// 设置作业的主类
job.setJarByClass(WordCount.class);
// 设置Mapper类
job.setMapperClass(TokenizerMapper.class);
// 设置Combiner类,将中间结果进行局部归约
job.setCombinerClass(IntSumReducer.class);
// 设置Reducer类
job.setReducerClass(IntSumReducer.class);
// 设置输出键类型为Text
job.setOutputKeyClass(Text.class);
// 设置输出值类型为IntWritable
job.setOutputValueClass(IntWritable.class);
// 为作业添加输入路径,可能有多个
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置作业的输出路径
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
// 提交作业并等待作业完成,完成后根据结果退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

MapReduce任务初始化的通用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
job.setMapOutputKeyClass(MyMapKeyWritable.class);
job.setMapOutputValueClass(MyMapValueWritable.class);
job.setOutputKeyClass(MyKeyWritable.class);
job.setOutputValueClass(MyValueWritable.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(MyOutputFormat.class);
for (int i = 0; i < args.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,new Path(args[args.length - 1]));
job.waitForCompletion(true);

任务实现 4.3

任务4.3 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 代码4-3 Mapper处理逻辑伪代码
Begin
自定义类MyMapper继承自Mapper;
覆写map方法;
定义初始次数为1
读取用户访问日志文件;
以每一行为单位,以逗号为分隔符进行分拆;
将结果存入Array数组;
将数组中的第5个元素的前9个字符与初始次数组合后输出,格式为<访问日期,1>;
End


//代码4-4 Reducer类处理逻辑伪代码
Begin
自定义MyReducer继承自Reducer;
覆写reduce方法;
读取Mapper输出的键值对;
将相同键的值进行累加;
输出<访问日期,总访问次数>;
End


//代码4-5 Mapper模块代码
// Mapper模块
public static class MyMap extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String arr[] = line.split(",");
context.write(new Text(arr[4].substring(0, 9)), new IntWritable(1));
}
}


//代码4-6 Reducer模块代码
// Reducer模块
public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key, new IntWritable(count));
}
}


//代码4-7 Driver模块代码
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("必须输入读取文件路径和输出路径");
System.exit(2);
}
Job job = Job.getInstance(conf, "visits count");
job.setJarByClass(dailyAccessCount.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

dailyAccessCount.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// 代码4.6
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

// 定义一个统计每日访问量的类
public class dailyAccessCount {

// Mapper类,继承自Hadoop的Mapper类,负责将输入数据进行映射
public static class MyMap
extends Mapper<Object, Text, Text, IntWritable> {

// map方法将输入的每一行数据进行处理,输出键值对
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 将输入的行数据转换为字符串
String line = value.toString();
// 按逗号分隔字符串
String arr[] = line.split(",");
// 将第5列(索引为4)的日期部分(只取到天)作为键,输出键值对<日期, 1>
context.write(new Text(arr[4].substring(0, 9)),
new IntWritable(1));
}
}

// Reducer类,继承自Hadoop的Reducer类,负责对映射后的数据进行归约
public static class MyReduce
extends Reducer<Text, IntWritable, Text, IntWritable> {

// reduce方法将同一个键的所有值进行累加,输出最终的访问次数
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// 初始化计数器
int count = 0;
// 遍历当前键的所有值,并累加计数
for (IntWritable value : values) {
count++;
}
// 输出键值对<日期, 总访问次数>
context.write(key, new IntWritable(count));
}
}

// 主方法,设置作业配置并启动作业
public static void main(String[] args) throws Exception {
// 创建一个Hadoop配置对象
Configuration conf = new Configuration();
// 解析命令行参数,获取输入和输出路径
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 如果没有指定足够的参数,输出错误信息并退出
if (otherArgs.length < 2) {
System.err.println("必须输入读取文件路径和输出路径");
System.exit(2);
}
// 创建一个MapReduce作业,并设置作业名称为"visits count"
Job job = Job.getInstance(conf, "visits count");
// 设置作业的主类
job.setJarByClass(dailyAccessCount.class);
// 设置Mapper类
job.setMapperClass(MyMap.class);
// 设置Reducer类
job.setReducerClass(MyReduce.class);
// 设置输出键类型为Text
job.setOutputKeyClass(Text.class);
// 设置输出值类型为IntWritable
job.setOutputValueClass(IntWritable.class);
// 为作业添加输入路径,可能有多个
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置作业的输出路径
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
// 提交作业并等待作业完成,完成后根据结果退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


输入键类型(Object): 这是输入数据的偏移量或标识符。在默认情况下,输入的键是数据的偏移量(通常是 Long 或 Object),但如果输入格式发生变化,键类型也可以是其他类型。
输入值类型(Text): 这是输入数据的值,通常是一行文本。在此例子中,使用 Text 来表示输入数据的每一行。
输出键类型(Text): 这是 Mapper 的输出键。在这个例子中,输出的键是一个 Text,也就是处理后的文本数据(例如单词或日期等)。
输出值类型(IntWritable): 这是 Mapper 的输出值。在这个例子中,输出的值是 IntWritable,也就是代表该键的计数或某种数值形式。

map(Object key, Text value, Context context)
输入键 (Object key):
这个键是输入数据的标识符。通常,在处理文本文件时,默认的输入键是偏移量(行号),即 Hadoop 提供的 LongWritable 类型。但是你可以根据需要自定义输入键类型。
你可以在 Mapper 的泛型中指定输入键的具体类型。
输入值 (Text value):
这是实际的数据内容,通常是每一行的文本。在处理文本文件时,通常使用 Text 来表示文本行。
输入值的类型可以根据数据格式的不同进行调整。例如,处理二进制文件时,输入值类型可能会是 BytesWritable。
上下文对象 (Context context):

Context 对象是固定的,它提供了与 Hadoop 框架的交互接口,可以通过它输出键值对,以及获取配置信息等。
以hadoop jar提交任务给集群的命令
hadoop jar /opt/jars/Hadoop/dailyAccessCount.jar \
Text.NO4.dailyAccessCount \
/Tipdm/Hadoop/MapReduce/raceData.csv \
/Tipdm/Hadoop/MapReduce/Result/dailyAccessCount

任务实现 4.4

任务4.4 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
代码4-10 输入数据的内容及格式
2020/10/1 18016
2020/10/2 18654
2020/10/3 2669
2020/10/4 1212
2020/10/5 1426

//代码4-11 设置Reduce任务数
job.setNumReduceTasks(num);


//代码4-12 Mapper模块代码
// Mapper模块
public static class MyMap extends Mapper<Object, Text, IntWritable, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//指定tab为分隔符
String arr[] = line.split("\t");
//key:统计结果, value:日期
context.write(new IntWritable(Integer.parseInt(arr[1])),
new Text(arr[0]));
}
}

//代码4-13 Reducer模块代码
// Reducer模块
public static class MyReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}

//代码4-14 Driver模块代码
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("必须输入读取文件路径和输出路径");
System.exit(2);
}
Job job = Job.getInstance(conf, "Visits Sort");
job.setJarByClass(accessTimesSort.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}




accessTimesSort.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 代码4.12 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

// 定义访问次数排序类
public class accessTimesSort {

// Mapper类,输入的键值对为<Object, Text>,输出为<IntWritable, Text>
public static class MyMap extends Mapper<Object, Text, IntWritable, Text> {
// map方法,用于将输入的每一行转换为输出的键值对
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); // 将输入的每一行文本转换为字符串
// 指定tab为分隔符,将行数据分割成数组
String arr[] = line.split("\t");
// 输出的键是统计的访问次数(转换为IntWritable),值是日期(Text类型)
context.write(new IntWritable(Integer.parseInt(arr[1])),
new Text(arr[0]));
}
}

// Reducer类,输入的键值对为<IntWritable, Text>,输出为<Text, IntWritable>
public static class MyReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
// reduce方法,将按访问次数排序的结果输出
public void reduce(IntWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// 遍历同一个访问次数对应的所有日期,输出<日期, 访问次数>
for (Text value : values) {
context.write(value, key);
}
}
}

// 主方法,负责配置并运行MapReduce作业
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 创建Hadoop的配置对象
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs(); // 解析命令行参数,获取输入和输出路径
if (otherArgs.length < 2) {
System.err.println("必须输入读取文件路径和输出路径"); // 如果参数不足,输出错误信息并退出
System.exit(2);
}
Job job = Job.getInstance(conf, "Visits Sort"); // 创建一个MapReduce作业,名称为“Visits Sort”
job.setJarByClass(accessTimesSort.class); // 设置作业的主类
job.setMapperClass(MyMap.class); // 设置Mapper类
job.setReducerClass(MyReduce.class); // 设置Reducer类
job.setMapOutputKeyClass(IntWritable.class); // 设置Mapper输出键类型为IntWritable
job.setMapOutputValueClass(Text.class); // 设置Mapper输出值类型为Text
job.setOutputKeyClass(Text.class); // 设置最终输出键类型为Text
job.setOutputValueClass(IntWritable.class); // 设置最终输出值类型为IntWritable
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); // 设置输入路径
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1])); // 设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作业并等待完成,成功则返回0,否则返回1
}
}



以hadoop jar提交任务给集群的命令
hadoop jar /opt/jars/Hadoop/accessTimesSort.jar \
Text.NO4.test2.accessTimesSort \
/Tipdm/Hadoop/MapReduce/Result/dailyAccessCount \
/Tipdm/Hadoop/MapReduce/Result/accessTimesSort

第5章 MapReduce进阶编程

任务实现5.1

任务5.1示例代码

1
2
3
4
5
6
//代码5-1 设置输入格式代码
job.setInputFotmatClass(TextInputFormat.class)

//代码5-2 设定输出格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);

SelectData.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 代码5.3
package No5.Text.test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

// 定义SelectData类,用于筛选数据
public class SelectData {

// 自定义Mapper类,输入的键值对为<Object, Text>,输出为<Text, Text>
public static class MyMap extends Mapper<Object, Text, Text, Text> {
// map方法,用于处理每一行输入数据
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); // 将输入的每一行文本转换为字符串
String arr[] = line.split(","); // 按逗号分割字符串
// 如果第5列包含“2021/1”或“2021/2”字符串
if (arr[4].contains("2021/1") || arr[4].contains("2021/2")) {
// 输出的键是第3列数据,值是日期的前9个字符(格式化后的日期)
context.write(new Text(arr[2]),
new Text(arr[4].substring(0, 9)));
}
}
}

// 主方法,负责配置并运行MapReduce作业
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 创建Hadoop的配置对象
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs(); // 解析命令行参数,获取输入和输出路径
if (otherArgs.length < 2) {
System.err.println("必须输入读取文件路径和输出路径"); // 如果参数不足,输出错误信息并退出
System.exit(2);
}
Job job = Job.getInstance(conf, "Select Data"); // 创建一个MapReduce作业,名称为“Select Data”
job.setJarByClass(SelectData.class); // 设置作业的主类
job.setMapperClass(MyMap.class); // 设置Mapper类
job.setOutputKeyClass(Text.class); // 设置Mapper输出键类型为Text
job.setOutputValueClass(Text.class); // 设置Mapper输出值类型为Text
// 设置输入格式为TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式为SequenceFileOutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 设置Reducer任务数为0,表示没有Reducer任务
job.setNumReduceTasks(0);
// 添加输入路径
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置输出路径
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
// 提交作业并等待完成,成功则返回0,否则返回1
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

任务实现5.2

任务5.2示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// 代码5.4 列举子目录

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class view_folders {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
// 声明文件路径
Path path = new Path("/user/root");
// 获取文件列表
FileStatus[] fileStatuses = fs.listStatus(path);
// 遍历文件列表
for (FileStatus file : fileStatuses) {
// 判断是否是文件夹
if (file.isDirectory()) {
System.out.println(file.getPath().toString());
}
}
// 关闭文件系统
fs.close();
}
}


// 代码5.5 列举文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class view_files {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
// 声明文件路径
Path path = new Path("/user/root");
// 获取文件列表
FileStatus[] fileStatuses = fs.listStatus(path);
// 遍历文件列表
for (FileStatus file : fileStatuses) {
// 判断是否是文件夹
if (file.isFile()) {
System.out.println(file.getPath().toString());
}
}
// 关闭文件系统
fs.close();
}
}

//代码5.6 创建目录
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


public class mkdir_folders {
public static void main(String[] args) throws Exception {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//声明创建的目录
Path path = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test1");
//调用mkdirs函数创建目录
fs.mkdirs(path);
//关闭文件系统
fs.close();
}
}

//代码5.7 删除文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class delete_files {
public static void main(String[] args) throws Exception {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//声明文件路径
Path path = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test1");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}

//代码5.8 上传文件至HDFS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class put_files {
public static void main(String[] args) throws Exception {
// 获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
// 获取文件系统
FileSystem fs = FileSystem.get(conf);
// 声明源文件路径和目标路径
Path fromPath = new Path("D:/hadoop/test.csv");
Path toPath = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test");
// 调用copyFromLocalFile方法上传文件
fs.copyFromLocalFile(fromPath, toPath);
// 关闭文件系统
fs.close();
}
}


//代码5.9 下载HDFS上的文件至本地
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class load_files {
public static void main(String[] args) throws Exception {
// 获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
// 获取文件系统
FileSystem fs = FileSystem.get(conf);
// 声明源文件路径和目标路径
Path fromPath = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test/test.csv");
Path toPath = new Path("D:/");
// 调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath, true);
// 关闭文件系统
fs.close();
}
}

// 5.10 读取文件内容
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class read_files {
public static void main(String[] args) throws Exception {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//声明查看的路径
Path path = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test/test.csv");
//获取指定文件的数据字节流
FSDataInputStream is = fs.open(path);
//读取文件内容并打印出来
BufferedReader br = new BufferedReader(new InputStreamReader(is,
"utf-8"));
String line = "";
while ((line = br.readLine()) != null) {
System.out.println(line);
}
//关闭数据字节流
br.close();
is.close();
//关闭文件系统
fs.close();
}
}

//代码5.11 写入数据

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;

public class write_files {
public static void main(String[] args) throws Exception {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "master:8020");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//声明查看的路径
Path path = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test/test.csv");
//创建新文件
Path newPath = new Path("/Tipdm/Hadoop/MapReduce/mkdir_folders_test/test_new.csv");
fs.delete(newPath, true);
FSDataOutputStream os = fs.create(newPath);
//获取指定文件的数据字节流
FSDataInputStream is = fs.open(path);
//读取文件内容并写入到新文件
BufferedReader br = new BufferedReader(new InputStreamReader(is,
"utf-8"));
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os,
"utf-8"));
String line = "";
while ((line = br.readLine()) != null) {
bw.write(line);
bw.newLine();
}
//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();
//关闭文件系统
fs.close();

}
}

task5_2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//代码 5.12 读取序列化文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;

public class task5_2 {
public static void main(String[] args) throws Exception {
// 创建 Hadoop 配置对象
Configuration conf = new Configuration();
// 设置 Hadoop 文件系统的默认文件系统地址为 master:8020
conf.set("fs.defaultFS", "master:8020");

// 获取文件系统对象
FileSystem fs = FileSystem.get(conf);

// 创建 SequenceFile.Reader 对象,用于读取指定路径的 SequenceFile 文件
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
"/Tipdm/Hadoop/MapReduce/Result/SelectData/part-m-00000"),
conf);

// 创建 Text 类型的 key 和 value 用于存储读取的键值对
Text key = new Text();
Text value = new Text();

// 创建 BufferedWriter 对象,将读取的数据写入到 D 盘的 janfeb.txt 文件中
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream("D:\\janfeb.txt", true)));

// 读取 SequenceFile 文件中的每一个键值对,并写入到 janfeb.txt 文件
while (reader.next(key, value)) {
// 将键值对以“键值\t键值对”格式写入到文件,每行一个键值对
out.write(key.toString() + "\t" + value.toString() + "\r\n");
}

// 关闭 BufferedWriter,释放资源
out.close();

// 关闭 SequenceFile.Reader,释放资源
reader.close();
}
}


/*
FileOutputStream 是一个用于写入文件的字节输出流,创建后会将数据写入指定的文件。
参数 "D:\\janfeb.txt" 指定了文件路径(在 Windows 系统上使用反斜杠 \ 表示路径),该代码会将输出写入 D: 盘的 janfeb.txt 文件。
第二个参数 true 表示以“追加模式”打开文件。这样,如果文件 janfeb.txt 已存在,数据会追加到文件的末尾,而不会覆盖文件中的现有内容。

OutputStreamWriter 将字节流 FileOutputStream 转换为字符流,以便可以使用字符编码写入文本。
默认使用系统字符编码,可以通过指定编码格式来改变编码方式,例如 new OutputStreamWriter(new FileOutputStream("D:\\janfeb.txt", true), "UTF-8")。

BufferedWriter 为字符输出流提供缓冲功能,减少实际的 IO 操作次数以提升性能。
它会将数据存储在内存缓冲区中,缓冲区填满或调用 out.close() 方法时,数据才会被写入文件。

任务实现5.3

任务5.3示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 代码5.14 自定义键类型
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MemberLogTime implements WritableComparable<MemberLogTime> {

private String member_name;
private String logTime;

public MemberLogTime() {
}

public MemberLogTime(String member_name, String logTime) {
this.member_name = member_name;
this.logTime = logTime;
}

public String getMember_name() {
return member_name;
}

public void setMember_name(String member_name) {
this.member_name = member_name;
}

public String getLogTime() {
return logTime;
}

public void setLogTime(String logTime) {
this.logTime = logTime;
}

public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}

public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(member_name);
dataOutput.writeUTF(logTime);
}

public void readFields(DataInput dataInput) throws IOException {
this.member_name = dataInput.readUTF();
this.logTime = dataInput.readUTF();
}

@Override
public String toString() {
return this.member_name + "," + this.logTime;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 代码5.15 Conbiner代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class LogCountCombiner extends
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

//代码5.16 配置Combiner类
job.setCombinerClass(LogCountCombiner.class);

//代码5.17 Combiner与Reducer的实现逻辑相同时同时配置Combiner
job.setCombinerClass(LogCountReducer.class);

//代码5.18 HashPartitoner源码
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2>{
public void configure(JobConf job){}
public int getPartition(K2 key,V2 value,int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

//代码5.19 自定义Partitioner
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class LogCountPartitioner
extends Partitioner<MemberLogTime, IntWritable> {
public int getPartition(
MemberLogTime key, IntWritable value, int numPartitions) {
String date = key.getLogTime();
if (date.contains("2021/1")) {
return 0 % numPartitions;
} else {
return 1 % numPartitions;
}
}
}

//代码5.20 设置Partitioner类和Reducer个数
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);

//代码5.21 定义枚举类型
enum LogCounter {
January,
February
}

//代码5.22 使用计数器
if (logTime.contains("2021/1")) {
context.getCounter(LogCounter.January).increment(1);
} else if (logTime.contains("2021/2")) {
context.getCounter(LogCounter.February).increment(1);
}

//代码5.23 使用动态计数器
if (key.getLogTime().contains("2021/1")) {
context.getCounter("OutputCounter", "JanuaryResult").increment(1);
} else if (key.getLogTime().contains("2021/2")) {
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}

LogCount.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.hadoop.conf.Configuration;  // 引入Hadoop的配置类,用于配置Hadoop作业的属性
import org.apache.hadoop.fs.FileSystem; // 引入Hadoop文件系统类,用于处理文件系统的操作
import org.apache.hadoop.fs.Path; // 引入Hadoop路径类,用于表示文件的路径
import org.apache.hadoop.io.IntWritable; // 引入Hadoop整数类型的可序列化类
import org.apache.hadoop.mapreduce.Job; // 引入Hadoop MapReduce作业类,用于配置和提交MapReduce作业
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // 引入Hadoop的输入格式类,用于读取输入数据
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat; // 引入Hadoop序列化文件格式的输入格式类
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // 引入Hadoop的输出格式类,用于输出结果到文件
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; // 引入Hadoop文本文件输出格式类

import java.io.IOException; // 引入IOException类,用于捕获输入输出异常

public class LogCount {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {

// 创建Hadoop作业的配置对象
Configuration conf = new Configuration();
// 创建一个MapReduce作业实例,作业名为“Log Count”
Job job = Job.getInstance(conf, "Log Count");
// 设置作业的jar文件来源类,指定MapReduce作业的主类
job.setJarByClass(LogCount.class);
// 设置Map阶段的处理类(Mapper类)
job.setMapperClass(LogCountMapper.class);
// 设置Reduce阶段的处理类(Reducer类)
job.setReducerClass(LogCountReducer.class);
// 设置Combiner阶段的处理类(用于合并Mapper输出结果)
job.setCombinerClass(LogCountCombiner.class);
// 设置Partitioner类,用于控制数据如何分配给各个Reducer
job.setPartitionerClass(LogCountPartitioner.class);
// 设置Reduce任务的数量
job.setNumReduceTasks(2);
// 设置作业的输出键类型为MemberLogTime类
job.setOutputKeyClass(MemberLogTime.class);
// 设置作业的输出值类型为IntWritable类
job.setOutputValueClass(IntWritable.class);
// 设置输入数据的格式为SequenceFileAsTextInputFormat,即从序列化文件中读取文本数据
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
// 设置输出数据的格式为TextOutputFormat,即将输出写入文本文件
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入路径,将args[0]作为输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 删除指定的输出路径(如果该路径存在)
FileSystem.get(conf).delete(new Path(args[1]), true);
// 设置输出路径,将args[1]作为输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待完成,返回作业执行结果,-1表示成功,1表示失败
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
}

LogCountMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.hadoop.io.IntWritable;  // 导入Hadoop的IntWritable类,用于表示整型值
import org.apache.hadoop.io.Text; // 导入Hadoop的Text类,用于表示文本类型
import org.apache.hadoop.mapreduce.Mapper; // 导入Hadoop的Mapper类,用于MapReduce的映射操作

import java.io.IOException; // 导入IO异常类

// 定义LogCountMapper类,继承自Mapper类
public class LogCountMapper
extends Mapper<Text, Text, MemberLogTime, IntWritable> { // Mapper的输入类型为<Text, Text>,输出类型为<MemberLogTime, IntWritable>

// 创建一个MemberLogTime类型的对象,用于存储每一条日志记录的相关信息
private MemberLogTime mt = new MemberLogTime();

// 创建一个IntWritable类型的对象,用于计数
private IntWritable one = new IntWritable(1);

// 定义一个枚举类LogCounter,用于统计不同月份的日志数量
enum LogCounter {
January, // 1月
February // 2月
}

@Override
// 重写map方法,执行Map操作
protected void map(
Text key, Text value,
Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException { // 输入参数:key为成员名,value为日志时间;输出参数:context用于输出结果
String member_name = key.toString(); // 将key(成员名)转换为字符串
String logTime = value.toString(); // 将value(日志时间)转换为字符串

// 如果日志时间包含"2021/1",表示1月的日志
if (logTime.contains("2021/1")) {
// 增加January计数器的值
context.getCounter(LogCounter.January).increment(1);
} else if (logTime.contains("2021/2")) { // 如果日志时间包含"2021/2",表示2月的日志
// 增加February计数器的值
context.getCounter(LogCounter.February).increment(1);
}

// 设置MemberLogTime对象的成员名
mt.setMember_name(member_name);
// 设置MemberLogTime对象的日志时间
mt.setLogTime(logTime);

// 将MemberLogTime对象和计数1输出到context中
context.write(mt, one);
}
}

LogCountReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.hadoop.io.IntWritable;  // 导入Hadoop的IntWritable类,用于表示整型值
import org.apache.hadoop.mapreduce.Reducer; // 导入Hadoop的Reducer类,用于MapReduce的归约操作

import java.io.IOException; // 导入IO异常类

// 定义LogCountReducer类,继承自Reducer类
public class LogCountReducer
extends Reducer<MemberLogTime, IntWritable,
MemberLogTime, IntWritable> { // Reducer的输入类型为<MemberLogTime, IntWritable>,输出类型为<MemberLogTime, IntWritable>

@Override
// 重写reduce方法,执行归约操作
protected void reduce(
MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime,
IntWritable>.Context context)
throws IOException, InterruptedException { // 输入参数:key为MemberLogTime,value为多个IntWritable的集合;输出参数:context用于输出结果
// 如果日志时间包含"2021/1",表示1月的日志
if (key.getLogTime().contains("2021/1")) {
// 增加"OutputCounter"计数器中的"JanuaryResult"计数值
context.getCounter("OutputCounter", "JanuaryResult").increment(1);
} else if (key.getLogTime().contains("2021/2")) { // 如果日志时间包含"2021/2",表示2月的日志
// 增加"OutputCounter"计数器中的"FebruaryResult"计数值
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}

int sum = 0; // 初始化sum变量,用于累计该key的所有值
// 遍历value中所有的IntWritable值
for (IntWritable val : value) {
sum += val.get(); // 将每个val的值加到sum中
}

// 将最终的结果(key和累计的sum)输出到context中
context.write(key, new IntWritable(sum));
}
}

任务实现5.4

任务5.4示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//代码5-29 驱动类使用ToolRunner
package No5.Text.test3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LogCount extends Configured implements Tool {
public static void main(String[] args) {
String[] myArgs = {
"/Tipdm/Hadoop/MapReduce/Result/Select_Data/part-m-00000",
"/Tipdm/Hadoop/MapReduce/Result/Log_Count"
};
try {
ToolRunner.run(new Configuration(), new LogCount(), myArgs);
} catch (Exception e) {
e.printStackTrace();
}
}

public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Log Count");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? -1 : 1;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//代码5-34 工具类实现代码
package No5.Text.test3;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;

public class JarUtil {
public static String jar(Class<?> cls) {
String outputJar = cls.getName() + ".jar";
String input = cls.getClassLoader().getResource("").getFile();
input = input.substring(0, input.length() - 1);
input = input.substring(0, input.lastIndexOf("/") + 1);
jar(input, outputJar);
return outputJar;
}

private static void jar(String inputFileName, String outputFileName) {
JarOutputStream out = null;
try {
out = new JarOutputStream(new FileOutputStream(outputFileName));
File f = new File(inputFileName);
jar(out, f, "");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

private static void jar(JarOutputStream out, File f, String base) throws Exception {
if (f.isDirectory()) {
File[] fl = f.listFiles();
// 注意,这里用左斜杠
base = base.length() == 0 ? "" : base + "/";
for (int i = 0; i < fl.length; i++) {
jar(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 代码5-27 set方法使用
// 初始化配置,getConf()为自定义获取配置方法
Configuration conf = getConf();
// 设置第1个参数的属性名称为inputPath
conf.set("inputPath",args[0]);
// 设置第2个参数的属性名称为ouputPath
conf.set("outputPath",args[1]);
// 设置第3个参数的属性名称为splitter
conf.set("splitter",args[2]);

//代码5-28 获取参数值
// 通过属性名称inputPath获取第1个参数值
String inputPath=context.getConfiguration().get("inputPath");
// 通过属性名称outputPath获取第2个参数值
String outputPath=context.getConfiguration().get("outputPath");
// 通过属性名称splitter获取第3个参数值
String splitter=context.getConfiguration().get("splitter");

//代码5-30 hadoop jar提交命令
hadoop jar /opt/jars/Hadoop/logcount.jar No5.Text.test3.LogCount

//代码5-31 连接Hadoop集群配置的方法
public static Configuration getMyConfiguration() {
// 声明配置
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
// 指定namenode
conf.set("fs.defaultFS", "hdfs://master:8020");
// 指定使用yarn框架
conf.set("mapreduce.framework.name", "yarn");
String resourcenode = "master";
// 指定resourcemanager
conf.set("yarn.resourcemanager.address", resourcenode + ":8032");
// 指定资源分配器
conf.set("yarn.resourcemanager.scheduler.address", resourcenode + ":8030");
conf.set("mapreduce.jobhistory.address", resourcenode + ":10020");
conf.set("mapreduce.job.jar", "E:\\LC.jar");
return conf;
}

//代码5-32 调用自定义的获取集群配置的方法
public static Configuration getMyConfiguration() {
// 声明配置
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
// 指定namenode
conf.set("fs.defaultFS", "hdfs://master:8020");
// 指定使用yarn框架
conf.set("mapreduce.framework.name", "yarn");
String resourcenode = "master";
// 指定resourcemanager
conf.set("yarn.resourcemanager.address", resourcenode + ":8032");
// 指定资源分配器
conf.set("yarn.resourcemanager.scheduler.address", resourcenode + ":8030");
conf.set("mapreduce.jobhistory.address", resourcenode + ":10020");
conf.set("mapreduce.job.jar", "E:\\LC.jar");
return conf;
}

//代码5-33 log4j.properties文件代码
log4j.rootLogger = INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss,SSS}- %c{1}: %m%n


//代码5-35 设置自动打包
conf.set("mapreduce.job.jar",JarUtil.jar(LogCount.class))

LogCount.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package No5.Text.test3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LogCount extends Configured implements Tool {
public static void main(String[] args) {
String[] myArgs = {
"/Tipdm/Hadoop/MapReduce/Result/Select_Data/part-m-00000",
"/Tipdm/Hadoop/MapReduce/Result/Log_Count3"
};
try {
ToolRunner.run(new Configuration(), new LogCount(), myArgs);
} catch (Exception e) {
e.printStackTrace();
}
}

public int run(String[] args) throws Exception {
Configuration conf = getMyConfiguration();
Job job = Job.getInstance(conf, "Log Count");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? -1 : 1;
}

public static Configuration getMyConfiguration() {
// 声明配置
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
// 指定namenode
conf.set("fs.defaultFS", "hdfs://master:8020");
// 指定使用yarn框架
conf.set("mapreduce.framework.name", "yarn");
String resourcenode = "master";
// 指定resourcemanager
conf.set("yarn.resourcemanager.address", resourcenode + ":8032");
// 指定资源分配器
conf.set("yarn.resourcemanager.scheduler.address", resourcenode + ":8030");
conf.set("mapreduce.jobhistory.address", resourcenode + ":10020");
conf.set("mapreduce.job.jar",JarUtil.jar(LogCount.class));
return conf;
}
}

第6章 Hive数据仓库

任务6.2 访问Hive的3种方式

6.2.1 设置内嵌Derby模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
代码6-1 创建文件夹
// 由于要将安装包上传到/opt/apps文件夹下,所以先在/opt/下面创建apps文件夹
mkdir -p /opt/apps
// 切换至/opt/apps目录下
cd /opt/apps/
//切换至/opt/apps目录下后,通过Xshell提供的文件传输工具将apache-hive-3.1.2-bin.tar.gz上传至Linux系统中的/opt/apps目录下。

代码6-2 解压压缩包
tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /opt/

代码6-3 启动Hive
# 删除guava-19.0.jar
rm -rf /opt/apache-hive-3.1.2-bin/lib/guava-19.0.jar
# 复制guava-27.0-jre.jar
cp /usr/local/hadoop-3.1.4/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apache-hive-3.1.2-bin/lib/guava-27.0.jar

代码6-4 解决日志jar包冲突
mv log4j-slf4j-impl-2.10.0.jar log4j-slf4j-impl-2.10.0.jar.bak

代码6-5 初始化元数据库
# 切换至Hive的安装目录下的bin目录执行如下命令
cd /usr/local/hive/bin
./schematool -dbType derby -initSchema

6-3 Hive环境变量配置信息
vim /etc/profile
#添加环境变量
export JAVA_HOME=/usr/local/jdk1.8.0_221-amd64
export HADOOP_HOME=/opt/hadoop-3.1.4
export HIVE_HOME=/opt/apache-hive-3.1.3-bin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin
#source配置文件
source /etc/profile

6-46-5 查询Hive数据仓库列表
hive
show databases;

6.2.2 设置直连数据库模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
代码6-6 删除旧版本MySQL的残留信息
rm -rf /usr/lib/mysql
rm -rf /usr/include/mysql
rm -rf /etc/my.cnf
rm -rf /var/lib/mysql
rm -rf /usr/share/mysql

代码6-7 MySQL包的安装
rpm -ivh mysql-community-common-5.7.18-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-5.7.18-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-5.7.18-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-5.7.18-1.el7.x86_64.rpm
rpm -ivh mysql-community-*

6-6 修改my.cnf文件,添加配置信息
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character_set_server=utf8

代码6-8 启动MySQL服务器
// 启动MySQL服务
systemctl start mysqld
service mysqld restart
Redirecting to /bin/systemctl restart mysqld.service

6-76-8 查看初始密码并登录Mysql
cat /var/log/mysqld.log
#登录Mysql,再输入初始密码
mysql -u root -p

代码6-9 修改默认密码
// 设置新密码的验证策略为0,表示最低
mysql> set global validate_password_policy=0;
// 设置新密码的长度最小值为6位
mysql> set global validate_password_length=6;
// 设置新密码为123456
mysql> set password for 'root'@'localhost'=password('123456');

代码6-10 设置远程登录和开机启动
// 给root设置远程登录权限
mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
mysql>FLUSH PRIVILEGES;
// 使用“quit;”命令退出MySQL,在centos中设置MySQL开机启动
$ chkconfig mysqld on

6-9 Hive全局变量配置信息
vim /etc/profile
#添加环境变量
export JAVA_HOME=/usr/java/jdk1.8.0_281-amd64
export HADOOP_HOME=/usr/local/hadoop-3.1.4
export HIVE_HOME=/opt/apache-hive-3.1.2-bin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin
#source配置文件
source /etc/profile

代码6-11 配置hive-site.xml配置信息
cd /opt/apache-hive-3.1.2-bin/conf
vim hive-site.xml
#添加相关内容
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.128.130:3306/hive?createDatabaseIfNotExist=true</value>
<description>Mysql连接协议</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>JDBC连接驱动</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>用户名</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>密码</description>
</property>
</configuration>

代码6-12 复制MySQL连接驱动的jar包至Hive安装包的lib目录
cp /opt/apps/mysql-connector-java-5.1.32-bin.jar /opt/apache-hive-3.1.2-bin/lib/

6-10 初始化数据库
schematool -dbType mysql -initSchema

6.2.3 设置远程服务器模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#前提步骤 将hive安装包发送各节点,并配置环境变量
scp -r /opt/apache-hive-3.1.2-bin slave2:/opt
scp -r /opt/apache-hive-3.1.2-bin slave3:/opt
#将环境变量发送到slave2、slave3,去对应节点source /etc/profile
scp -r /etc/profile slave2:/etc
scp -r /etc/profile slave3:/etc

代码6-13 slave2上hive-site.xml的配置
cd /opt/apache-hive-3.1.2-bin/conf
vim hive-site.xml
#添加相关内容
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_remote/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://192.168.128.130:3306/hive_remote?createDatabaseIfNotExist=true</value>
<description>Mysql连接协议</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>JDBC连接驱动</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>用户名</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>密码</description>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>

代码6-14 slave3上hive-site.xml的配置
cd /opt/apache-hive-3.1.2-bin/conf
vim hive-site.xml
#添加相关内容
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_remote/warehouse</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.128.130:9083</value>
</property>
</configuration>

6-13 在slave2节点上启动hive服务,在slave3上开启Hive访问服务器
#在slave2上
hive --service metastore &
#在slave3上
hive

6-15 查看新数据库,在master上
mysql -u root -p 123456
show databases;

任务6.3 实现Hive表的创建与修改

6.3.1 了解Hive数据定义语言基本语法

1
2
3
4
5
6
7
6-16 创建student数据库
#启动hive服务
hive --service metastore &
#开启hive
hive
#创建student数据库
create database student;

6.3.2 创建表基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
代码6-15 创建user_info表
create table user_info(id int,name string);

代码6-16 上传student_info至HDFS的/stu目录下
hdfs dfs -mkdir /stu
hdfs dfs -put student_info.txt /stu

代码6-17 创建外部表student_info
create external table student_info(
stu_no string,
stu_name string,
stu_sex string,
telphone string,
stu_class string)
row format delimited fields terminated by ',' location '/stu';
#查看数据库
show tables;
#查看student_info
select * from student_info;

代码6-18 创建分区表
create table teacher_info(t_no string,t_name string,t_sex string,t_age string) partitioned by(depart string) row format delimited fields terminated by ','stored as textfile;

6.3.3 修改表基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
代码6-19 创建成绩信息表
create table score(
stu_no string,
cla_no string,
grade float
) partitioned by (class_name string);
#查看score表结构
describe extended score ;

代码6-20 将表score重命名成stu_score
alter table score rename to stu_score;

代码6-21 向表stu_score添加列
alter table stu_score add columns (credit int,gpa float);
#查看score表结构
describe extenden stu_score

代码6-22 将表stu_score中列credit重命名为Credits,并将数据类型修改为Float
alter table stu_score change column credit Credits float;.

代码6-23 修改表stu_score中的分区,新增class_name为07111301的分区
alter table stu_score add partition(class_name='07111301');

代码6-24 删除分区,将6-32新增的class_name为07111301的分区删除
alter table stu_score drop if exists partition(class_name = '07111301');

代码6-25 删除表stu_score,可直接使用drop table命令删除表
drop table stu_score;

任务6.4 实现Hive表中数据的增删改查

6.4.2 向表中装载(Load)文件

1
2
3
4
5
6
7
8
9
10
11
代码6-26 创建student、course、sc3个表结构
create table student(Sno int,Sname string,Sex string,Sage int,Sdept string)row format delimited fields terminated by ','stored as textfile;
create table course(Cno int,Cname string) row format delimited fields terminated by ',' stored as textfile;
create table sc(Sno int,Cno int,Grade int)row format delimited fields terminated by ',' stored as textfile;

代码6-27 装载数据
load data local inpath '/root/hivedata/students.txt' overwrite into table student;
load data local inpath '/root/hivedata/sc.txt' overwrite into table sc;
load data local inpath '/root/hivedata/course.txt' overwrite into table course;
#查询Hive中的myhive数据库下的student、course和sc表的数据
select * from student;

6.4.3 查询数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
代码6-28 select查询
// 查询全体学生的学号与姓名
select Sno,Sname from student;

代码6-29 where条件查询
// 查询所有的男生的学号和姓名
select Sno,Sname from student where Sex='男';

代码6-30 join连接查询
// 查询选修了课程的学生姓名
select distinct Sname from student inner join sc on student.Sno=Sc.Sno;
// 查询学生的课程成绩情况
select student.Sname,course.Cname,sc.Grade from student join sc on student.Sno=sc.Sno join course on sc.cno=course.cno;
// 查询选修2号课程且成绩在90分以上的所有学生。
select student.Sname,sc.Grade from student join sc on student.Sno=sc.Sno where sc.Cno=2 and sc.Grade>90;

代码6-31 Group by 分组查询
// 各个课程号及相应的选课人数
select Cno,count(1) from sc group by Cno;

代码6-32 having 条件查询
// 查询选修了3门以上的课程的学生学号
select Sno from sc group by Sno having count(Cno)>3;

代码6-33 order by 排序查询
// 查询学生信息,结果按学号全局升序排序
select Sno from student order by Sno;

代码6-34 sort by 排序查询
//设置reduce个数
set mapred.reduce.tasks=2;
// 查询学生信息,按性别分区,在分区内按年龄有序
insert overwrite local directory '/home/hadoop/out' select * from student distribute by Sex sort by Sage;

6.4.4 插入数据

1
2
3
4
5
6
7
8
9
10
11
代码6-35 insert插入数据
// 向student表中插入一条新的记录
insert into table student values(2018213223,'王小哲','男',18,'IS');
#查询student表的数据进行验证
select * from student;

代码6-36 将查询结果作为新记录插入表中
// 将student中查询的结果再插入到student表的末尾
insert into table student select * from student;
// 将student中查询的结果再插入到student中,并覆盖原有表中的内容
insert overwrite table student select * from student;

6.4.5 删除表中数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
代码6-37 默认情况下使用update和delete操作的报错信息
hive> delete from student where Sno=’2018213026
FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

代码6-38 hive-site.xml配置文件增加的内容
cd /opt/apache-hive-3.1.2-bin/conf
vim hive-site.xml
#添加相关内容
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>

代码6-39 删除表的指定数据
// 删除学号为2018213026的学生信息
delete from student where Sno=’2018213026

代码6-40 清空表数据
// 清空学生表student的信息
truncate table student

任务6.5 掉线率top20基站统计

代码6-41 创建jizhan表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
代码6-41 创建jizhan表结构
hive>use myhive;
hive> create table jizhan(
record_time string,
imei int,
cell string,
ph_num int,
call_num int,
drop_num int,
duration int,
drop_rate double,
net_type string,
erl int)
row format delimited fields terminated by ',';

代码6-42 装载jizhan_information.scv文件至jizhan表

1
2
3
代码6-42 装载jizhan_information.scv文件至jizhan表
load data local inpath '/root/data/jizhan_information.csv' into table jizhan;
select * from jizhan limit 10;

代码6-43 创建jizhan_result表结构

1
2
3
4
5
6
7
代码6-43 创建jizhan_result表结构
hive>create table jizhan_result(
imei string,
drop_num int,
duration int,
drop_rate double
);

代码6-44 基站掉话率统计

1
2
3
4
5
6
代码6-44 基站掉话率统计
hive>from jizhan
insert into jizhan_result
select imei,sum(drop_num) as sdrop,sum(duration) as sdura,sum(drop_num)/sum(duration) as drop_rate
group by imei
order by drop_rate desc;

第7章 HBase分布式数据库

任务7.2 安装部署HBase集群

7.2.1了解并安装Zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
代码7-1 切换到/opt/apps 目录
cd /opt/apps

代码7-2 解压安装包
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /opt

代码7-3 复制配置文件
cd /opt/apache-zookeeper-3.5.6-bin/conf/
cp zoo_sample.cfg zoo.cfg

代码7-4 配置zoo.cfg文件
vim zoo.cfg
#添加内容为
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# 设置数据文件目录+数据持久化路径
dataDir=/root/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 配置zookeeper集群的服务器编号以及对应的主机名、选举端口号和通信端口号(心跳端口号)
server.1=slave1:2888:3888
server.2=slave2:2888:3888
server.3=slave3:2888:3888

代码7-5 创建并配置myid文件
mkdir -p /root/data/zookeeper/
cd /root/data/zookeeper/
echo 1 > myid

代码7-6 配置Zookeeper的环境变量
#ZK_HOME
export ZK_HOME=/opt/apache-zookeeper-3.8.4-bin
export PATH=$PATH:$ZK_HOME/bin

代码7-7 分发zookeeper相关文件
分发Zookeeper安装包至slave2和slave3上
$ scp -r /opt/apache-zookeeper-3.5.6-bin/ slave2:/opt/
$ scp -r /opt/apache-zookeeper-3.5.6-bin/ slave3:/opt/
# 分发myid文件至slave2和slave3上
$ scp -r /root/data/zookeeper/myid slave2:`pwd`
$ scp -r /root/data/zookeeper/myid slave2:`pwd`
# 分别切换至slave2和slave3的/root/data/zookeeper目录下
# 修改slave2和slave3上的myid文件的值分别为23
echo 2 > myid
echo 3 > myid
# 发分profile文件至slave2和slave3服务器上
$ scp /etc/profile slave2:/etc/profile
$ scp /etc/profile slave3:/etc/profile

代码7-8 生效环境变量
#分别在slave1、slave2、slave3服务器上刷新profile配置文件,使得环境变量生效。
source /etc/profile

代码7-9 启动Zookeeper服务
#启动Zookeeper服务。首先,依次在slave1、slave2、slave3服务器上启动Zookeeper服务,
#以在slave1节点上启动Zookeeper服务为例,
#并查看该节点Zookeeper服务的状态及角色
zkServer.sh start
zkServer.sh status

代码7-10 关闭Zookeeper服务
$ zkServer.sh stop

7.2.2安装配置HBase集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#从HBase的官网下载HBase安装包,本书使用的HBase版本为HBase 2.2.2
#将下载的HBase安装包上传至Linux系统的/opt/apps目录下。

代码7-11 解压HBase安装包
cd /opt/apps/
tar -zxvf hbase-2.2.2-bin.tar.gz -C /opt

代码7-12 配置hbase-env.sh文件
cd /opt/hbase-2.2.2/conf
vim hbase-env.sh
#添加内容
export JAVA_HOME=/usr/java/jdk1.8.0_281-amd64/
export HBASE_MANAGES_ZK=false

代码7-13 修改hbase-site.xml
vim hbase-site.xml
#添加内容
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:8020/hbase</value>
</property>
<property>
<name>hbase.master</name>
<value>master</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>slave1,slave2,slave3</value>
</property>
<property>
<name>zookeeper.session.timeout</name>
<value>60000000</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<!-- 防止连接16010失败 -->
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
</configuration>

代码7-14 修改regionservers文件
vim regionservers
#添加内容
master
slave1
slave2
slave3

代码7-15 添加HBase的环境变量
vim /etc/profile
#HBASE_HOME
export HBASE_HOME=/opt/hbase-2.2.2
export PATH=$PATH:$HBASE_HOME/bin

代码7-16 分发HBase相关文件
#分发HBase相关配置文件至其他节点。首先,将HBase安装目录分发至slave1、slave2和slave3节点上;
#其次profile文件也分发至slave1、slave2和slave3节点上
# 分发HBase安装包至slave1、slave2和slave3上
scp -r /opt/hbase-2.2.2/ slave1:/opt/
scp -r /opt/hbase-2.2.2/ slave2:/opt/
scp -r /opt/hbase-2.2.2/ slave3:/opt/
#分发profile文件至slave2和slave3节点上
scp /etc/profile slave1:/etc/profile
scp /etc/profile slave2:/etc/profile
scp /etc/profile slave3:/etc/profile
#使环境变量生效。在slave1、slave2、slave3节点上分别
#使用“source /etc/profile”命令刷新profile配置文件

代码7-17 启动HBase服务
start-all.sh
zkServer.sh start
start-hbase.sh
#启动成功后,可通过浏览器输入“http://192.168.65.141:16010”网址,查看Master信息
#打开浏览器输入“http://192.168.65.142:16030”网址,查看Region Server信息
#关闭HBase服务。若关闭HBase服务,则只需在slave1机器上执行hbase-stop.sh命令

任务7.3 掌握HBase常用的Shell命令

7.3.1 修改与删除表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
代码7-18 表的创建
hbase shell
hbase(main):001:0> create 'student','info'

代码7-19 获取表信息
hbase(main):003:0> describe 'student'

代码7-20 查询表student是否存在
hbase(main):009:0> exists 'student'

代码7-21判断表是否enable/disable
hbase(main):011:0> is_enabled 'student'
hbase(main):012:0> is_disabled 'student'

代码7-22 增加表列族relationship
hbase(main):004:0> disable 'student'
hbase(main):005:0> alter 'student',NAME=> 'relationship'

代码7-23 删除列族relationship
hbase(main):007:0> disable 'student'
hbase(main):008:0> alter 'student','delete'=>'relationship'

代码7-24 删除student表
hbase(main):014:0> disable 'student'
hbase(main):015:0> drop 'student'
hbase(main):016:0> exists 'student'

7.3.2 查询表数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
代码7-25 插入数据
hbase(main):018:0> put 'student','07112001','info:name','Ben'
hbase(main):019:0> put 'student','07112001','relationship:father','Bill'
hbase(main):022:0> put 'student','07112001','relationship:mather','Rose'
hbase(main):020:0> put 'student','07112002','info:name','Bobby'
hbase(main):026:0> put 'student','07112002','relationship:father','Bert'
hbase(main):023:0> put 'student','07112002','relationship:mather','Anna'
hbase(main):027:0> put 'student','07112003','info:name','Jerry'
hbase(main):033:0> put 'student','07112003','relationship:father','Jason'
hbase(main):032:0> put 'student','07112003','relationship:mather','Lori'

代码7-26 查询表数据student
hbase(main):039:0> get 'student','07112002'
hbase(main):040:0> get 'student','07112002','relationship'

代码7-27 扫描student表数据
hbase(main):041:0> scan 'student'

代码7-28 统计student表记录数
hbase(main):042:0> count 'student'

代码7-29 删除列
hbase(main):044:0> delete 'student','07112003','relationship:father'
hbase(main):045:0> get 'student','07112003','relationship'

代码7-30 删除RowKey为07112003的所有行
hbase(main):047:0> deleteall 'student','07112003'
hbase(main):048:0> scan 'student'

代码7-31 删除表中所有的数据
hbase(main):049:0> truncate 'student'
1
2


任务7.4 使用HBase Java API 实现表设计

7.4.1 创建Java项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
代码7-32 pom.xml依赖
#Maven项目的相关Jar包会自动下载。pom.xml文件添加的内容
<dependencies>
<!-- 单元测试依赖包 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- hbase客户端依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>
<!-- hbase核心依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>

代码7-33 创建HBaseTest类
package com.cqyti.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseTest {
public static Configuration configuration; //管理HBase的配置信息
public static Connection connection; //管理HBase的连接
public static Admin admin; //管理HBase数据库表信息
public static void main(String[] args)throws IOException{
init();
createTable("students",new String[]{"score"});
insertData("students","George","score","Bigdata","69");
insertData("students","George","score","Python","86");
insertData("students","George","score","JavaWeb","77");
close();
}
}

代码7-34 建立连接/关闭连接
//建立连接
public static void init(){
configuration = HBaseConfiguration.create();
configuration.set("hbase.rootdir","hdfs://master:9000/hbase");
try{
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
}catch (IOException e){
e.printStackTrace();
}
}
// 关闭连接
public static void close(){
try{
if(admin != null){
admin.close();
}
if(null != connection){
connection.close();
}
}catch (IOException e){
e.printStackTrace();
}

7.4.2 实现表的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 创建HBase表
*
* @param myTableName 表名
* @param colFamily 列族数组
* @throws IOException 如果在创建表过程中出现异常
*/
public static void createTable(String myTableName, String[] colFamily) throws IOException {
// 将表名字符串转换为HBase的TableName对象
TableName tableName = TableName.valueOf(myTableName);
// 检查表是否已经存在
if(admin.tableExists(tableName)){
// 如果表存在,输出提示信息
System.out.println("table is exists!");
}else {
// 如果表不存在,开始创建新表
// 使用TableDescriptorBuilder构建表的描述符
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
// 遍历列族数组,为每个列族创建并添加列族描述符
for(String str : colFamily){
// 使用ColumnFamilyDescriptorBuilder为列族创建描述符,并将列族名转换为字节数组
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
// 将列族描述符添加到表的描述符中
tableDescriptor.setColumnFamily(family);
}
// 使用管理员对象(admin)创建表
admin.createTable(tableDescriptor.build());
}
}

7.4.3 向表中插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
代码7-36 insertData()方法插入数据
/**
* 添加数据
* @param tableName 表名
* @param rowKey 行键
* @param colFamily 列族
* @param col 列限定符
* @param val 数据
* @throws IOException
*/
public static void insertData(String tableName, String rowKey, String colFamily, String col, String val) throws IOException {
// 获取HBase连接中的指定表
Table table = connection.getTable(TableName.valueOf(tableName));
// 创建一个Put对象,指定行键
Put put = new Put(rowKey.getBytes());
// 向Put对象中添加列族、列名和对应的值
put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());
// 将Put对象提交到表中,插入数据
table.put(put);
// 关闭表连接
table.close();
}

代码7-37 插入数据
insertData("students","George","score","Bigdata","96");
insertData("students","George","score","Python","88");
insertData("students","George","score","JavaWeb","67");

代码7-38 HBase Shell插入数据命令
put 'students','George','score:Bigdata','96'
put 'students','George','score:Python','88'
put 'students','George','score:JavaWeb','67'

7-20 插入数据结果
hbase>scan 'students'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/**工程*/
package com.cqyti.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseTest {
public static Configuration configuration; //初始化HBase的配置信息
public static Connection connection; //初始化HBase的连接
public static Admin admin; //初始化HBase数据库表信息

public static void main(String[] args) throws IOException {
init();
createTable("students", new String[]{"score"});
insertData("students", "George", "score", "Bigdata", "69");
insertData("students", "George", "score", "Python", "86");
insertData("students", "George", "score", "JavaWeb", "77");
close();
}

//建立连接
public static void init() {
configuration = HBaseConfiguration.create();
// configuration.set("hbase.rootdir", "hdfs://master:9000/hbase");
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* @param myTableName 表名
* @param colFamily 列族
* @throws IOException
*/
public static void createTable(String myTableName, String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName);
if (admin.tableExists(tableName)) {
System.out.println("talbe is exists!");
} else {
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.
newBuilder(tableName);
for (String str : colFamily) {
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.
toBytes(str)).build();
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
}
}

/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 行键
* @param colFamily 列族
* @param col 列限定符
* @param val 数据
* @throws IOException
*/
public static void insertData(String tableName, String rowKey, String colFamily, String col, String val) throws
IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(rowKey.getBytes());
put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());
table.put(put);
table.close();
}

// 关闭连接
public static void close() {
try {
if (admin != null) {
admin.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}

}
}

任务7.5 查询分析通话记录数据

7.5.2 任务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
代码7-39  配置pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>

代码7-40 代码实现框架
import com.sun.deploy.security.ruleset.RuleSetParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Random;
public class PhoneLogDemo {
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static Random random;
public static SimpleDateFormat sdf;
public static void main(String[] args) throws IOException, ParseException {
random = new Random();
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 初始化并建立连接
init();
// 创建一个表名为phone_log、列族名为basic的表
createTable("phone_log", new String[]{"basic"});
// 插入随机生成的数据
insert("phone_log","basic");
// 关闭连接
close();
}

代码7-41 定义init()方法
// 初始化并建立连接
public static void init() {
// 创建配置对象
Configuration conf = HBaseConfiguration.create();
try {
// 获取HBase连接对象
connection = ConnectionFactory.createConnection(conf);
// 获取操作对象
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭admin和connection
public static void close() {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

代码7-42 定义createTable()方法
public static void createTable(String myTableName, String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName);
// 判断表是否存在
if (admin.tableExists(tableName)) {
// 如果存在那么输出“table exists!”
System.out.println("table exists!");
} else {
// 如果不存在那么添加表描述符信息及列族信息
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.
newBuilder(tableName);
for (String str : colFamily) {
ColumnFamilyDescriptor family=
ColumnFamilyDescriptorBuilder.newBuilder
(Bytes.toBytes(str)).build();
//将列族描述符添加到表描述符上
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
}
}

代码7-43 定义insert()方法
/**
* 10个用户,每个用户每年产生1000条通话记录
* dnum:对方用户电话号码;
type:通话类型,0代表主叫,1代表被叫;
length:通话时长;
date:通话日期
* rowkey:当前用户手机号码(Long.MAX_VALUE-timestamp)
*/
public static void insert(String tableName, String colFamily) throws IOException, ParseException {
Table table = connection.getTable(TableName.valueOf(tableName));
List<Put> putList = new ArrayList<Put>();
for (int i =1; i <= 10; i++) {
// 当前用户电话号码
String phoneNum = getPhoneNum("158"); // 生成以“158”开头的用户电话号码
System.out.println(phoneNum);
// 清空集合
putList.clear();
// 模拟1000条通话记录
for (int j = 1; j <= 1000; j++) {
// 生成数据
String dnum = getPhoneNum("199"); // 生成以“199”开头的对方用户电话号码
int length = random.nextInt(99) + 1; // 随机生成通话时长
int type = random.nextInt(2); // 随机生成通话类型
String date = getDate(2019); // 随机生成通话日期
// rowkey的设计
String rowkey = phoneNum + "_" + (Long.MAX_VALUE - sdf.parse(date).getTime());
Put put = new Put(rowkey.getBytes());
put.addColumn(colFamily.getBytes(), "dnum".getBytes(), Bytes.
toBytes(dnum));
put.addColumn(colFamily.getBytes(), "length".getBytes(), Bytes. toBytes(length));
put.addColumn(colFamily.getBytes(), "type".getBytes(), Bytes. toBytes(type));
put.addColumn(colFamily.getBytes(), "date".getBytes(), Bytes. toBytes(date));
// 将put添加到集合
putList.add(put);
}
// 执行添加操作,每次添加1000条通话记录
table.put(putList);
}
}
// 2019-01-01 00:00:00 - 2019-12-31 23:59:59
private static String getDate(int year) {
Calendar calendar = Calendar.getInstance();
calendar.set(year, 0, 1);//2019-01-01
calendar.add(Calendar.MONTH, random.nextInt(12));
calendar.add(Calendar.DAY_OF_MONTH, random.nextInt(31));
calendar.add(Calendar.HOUR_OF_DAY, random.nextInt(12));
calendar.add(Calendar.MINUTE, random.nextInt(60));
calendar.add(Calendar.MILLISECOND, random.nextInt(60));
return sdf.format(calendar.getTime());
}
private static String getPhoneNum(String prefixNum) {
return prefixNum + String.format("%08d", random.nextInt(99999999));
}

7-22 查看phone_log表结构
hbase>desc 'phone_log'

代码7-44 Search类的main()方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Random;

public class Search {

public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static Random random;
public static SimpleDateFormat sdf;

public static void main(String[] args) throws IOException, ParseException {
random = new Random();
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 初始化并建立连接
init();
// 查询phone_log表中15894116226用户的某个时间段的通话记录
scan("phone_log","15894116226");
// 关闭连接
close();
}

代码7-45 查询指定用户某个时间段的通话记录
public static void scan(String tableName, String num) throws ParseException, IOException {
String phoneNume = num;
Scan scan = new Scan();
String startRow = phoneNume+"_"+(Long.MAX_VALUE -sdf.parse("2019-01-08 21:38:13").getTime());
scan.setStartRow(startRow.getBytes());
String stopRow = phoneNume+"_"+(Long.MAX_VALUE -sdf.parse("2019-01-02 01:58:13").getTime());
scan.setStopRow(stopRow.getBytes());
// 执行查询并返回结果集
Table table = connection.getTable(TableName.valueOf(tableName));
ResultScanner resultScanner = table.getScanner(scan);
// 遍历输出
for(Result result:resultScanner){
printMsg(result);
}
// 关闭
resultScanner.close();
}

代码7-46 设置输出结果
// 设置输出结果
public static void printMsg(Result result) {
System.out.print(Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell
("basic".getBytes(),"dnum".getBytes())))+"\t");
System.out.print(Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(),"type".getBytes())))+"\t");
System.out.print(Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(),"length".getBytes())))+"\t");
System.out.println(Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(),"date".getBytes()))));
}


PhoneLogDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import com.sun.deploy.security.ruleset.RuleSetParser;  // 引入RuleSetParser类(未使用)
import org.apache.hadoop.conf.Configuration; // 引入Hadoop配置类
import org.apache.hadoop.hbase.*; // 引入HBase相关类
import org.apache.hadoop.hbase.client.*; // 引入HBase客户端操作类
import org.apache.hadoop.hbase.util.Bytes; // 引入字节数组操作工具类
import org.junit.Before; // 引入JUnit注解(未使用)

import java.io.IOException; // 引入IOException异常类
import java.text.ParseException; // 引入ParseException异常类
import java.text.SimpleDateFormat; // 引入日期格式化类
import java.util.ArrayList; // 引入ArrayList类
import java.util.Calendar; // 引入Calendar类,用于日期和时间的操作
import java.util.List; // 引入List接口
import java.util.Random; // 引入Random类,用于生成随机数据

public class PhoneLogDemo {
// 声明静态变量
public static Configuration configuration; // HBase配置对象
public static Connection connection; // HBase连接对象
public static Admin admin; // HBase管理员对象
public static Random random; // 用于生成随机数据的对象
public static SimpleDateFormat sdf; // 日期格式化工具类

public static void main(String[] args) throws IOException, ParseException {
random = new Random(); // 初始化随机数生成器
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 创建SimpleDateFormat对象,日期格式
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 将SimpleDateFormat实例赋值给sdf

// 初始化并建立连接
init();

// 创建名为phone_log的表,列族为basic
createTable("phone_log", new String[]{"basic"});

// 插入随机生成的数据
insert("phone_log", "basic");

// 关闭连接
close();
}

// 初始化并建立HBase连接
public static void init() {
// 创建HBase配置对象
Configuration conf = HBaseConfiguration.create();
try {
// 获取HBase连接对象
connection = ConnectionFactory.createConnection(conf);
// 获取HBase管理员对象,用于表的管理操作
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}

// 创建HBase表的方法
public static void createTable(String myTableName, String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName); // 将表名字符串转换为TableName对象
// 判断表是否已经存在
if (admin.tableExists(tableName)) {
System.out.println("table exists!"); // 如果表存在,输出提示信息
} else {
// 如果表不存在,创建表描述符对象
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
// 遍历列族数组,添加列族信息
for (String str : colFamily) {
// 创建列族描述符
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
// 将列族描述符添加到表描述符
tableDescriptor.setColumnFamily(family);
}
// 创建表
admin.createTable(tableDescriptor.build());
}
}

// 插入数据的方法
/**
* 模拟10个用户,每个用户每年生成1000条通话记录
* dnum:对方用户电话号码;
* type:通话类型,0代表主叫,1代表被叫;
* length:通话时长;
* date:通话日期;
* rowkey:当前用户手机号码(Long.MAX_VALUE - timestamp)
*/
public static void insert(String tableName, String colFamily) throws IOException, ParseException {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
List<Put> putList = new ArrayList<Put>(); // 存储Put对象的集合
// 模拟10个用户
for (int i = 1; i <= 10; i++) {
// 生成当前用户电话号码
String phoneNum = getPhoneNum("158"); // 生成以“158”开头的手机号码
System.out.println(phoneNum); // 输出当前用户电话号码
putList.clear(); // 清空Put集合
// 模拟每个用户1000条通话记录
for (int j = 1; j <= 1000; j++) {
// 生成对方用户电话号码
String dnum = getPhoneNum("199"); // 生成以“199”开头的对方电话号码
// 随机生成通话时长(1到99秒之间)
int length = random.nextInt(99) + 1;
// 随机生成通话类型(0为主叫,1为被叫)
int type = random.nextInt(2);
// 随机生成通话日期
String date = getDate(2019); // 生成2019年的随机日期
// rowkey的设计:手机号码 + 通话日期的倒序时间戳
String rowkey = phoneNum + "_" + (Long.MAX_VALUE - sdf.parse(date).getTime());
// 创建Put对象,指定rowkey
Put put = new Put(rowkey.getBytes());
// 添加列族中的各列数据
put.addColumn(colFamily.getBytes(), "dnum".getBytes(), Bytes.toBytes(dnum));
put.addColumn(colFamily.getBytes(), "length".getBytes(), Bytes.toBytes(length));
put.addColumn(colFamily.getBytes(), "type".getBytes(), Bytes.toBytes(type));
put.addColumn(colFamily.getBytes(), "date".getBytes(), Bytes.toBytes(date));
// 将Put对象添加到集合中
putList.add(put);
}
// 执行批量插入操作,每次插入1000条通话记录
table.put(putList);
}
}

// 随机生成通话日期(2019年范围内)
private static String getDate(int year) {
Calendar calendar = Calendar.getInstance(); // 获取当前日期
calendar.set(year, 0, 1); // 设置为2019年1月1日
// 随机生成日期(随机月份、日期、小时、分钟等)
calendar.add(Calendar.MONTH, random.nextInt(12));
calendar.add(Calendar.DAY_OF_MONTH, random.nextInt(31));
calendar.add(Calendar.HOUR_OF_DAY, random.nextInt(12));
calendar.add(Calendar.MINUTE, random.nextInt(60));
calendar.add(Calendar.MILLISECOND, random.nextInt(60));
return sdf.format(calendar.getTime()); // 返回格式化后的日期字符串
}

// 随机生成手机号码
private static String getPhoneNum(String prefixNum) {
// 返回以指定前缀开头的手机号码(8位随机数)
return prefixNum + String.format("%08d", random.nextInt(99999999));
}

// 关闭HBase连接
public static void close() {
if (admin != null) {
try {
admin.close(); // 关闭管理员对象
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}
if (null != connection) {
try {
connection.close(); // 关闭HBase连接
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}
}
}

Search.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import org.apache.hadoop.conf.Configuration;  // 引入Hadoop配置类
import org.apache.hadoop.hbase.CellUtil; // 引入HBase单元格工具类,用于处理HBase中的单元格数据
import org.apache.hadoop.hbase.HBaseConfiguration; // 引入HBase配置工具类
import org.apache.hadoop.hbase.TableName; // 引入HBase表名类
import org.apache.hadoop.hbase.client.*; // 引入HBase客户端操作类(如Connection、Admin、Table、Scan等)
import org.apache.hadoop.hbase.util.Bytes; // 引入字节数组工具类,HBase的很多操作是基于字节数组的

import java.io.IOException; // 引入IOException异常类
import java.text.ParseException; // 引入ParseException异常类
import java.text.SimpleDateFormat; // 引入日期格式化类
import java.util.Calendar; // 引入Calendar类,用于日期和时间的操作
import java.util.Random; // 引入Random类,用于生成随机数

public class Search {

// 声明静态变量
public static Configuration configuration; // HBase配置对象
public static Connection connection; // HBase连接对象
public static Admin admin; // HBase管理员对象
public static Random random; // 用于生成随机数的对象
public static SimpleDateFormat sdf; // 日期格式化工具类,用于处理时间格式

public static void main(String[] args) throws IOException, ParseException {
random = new Random(); // 初始化随机数生成器
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 创建SimpleDateFormat实例,用于日期格式化

// 初始化并建立连接
init();

// 查询指定用户在某个时间段内的通话记录
scan("phone_log", "15894116226");

// 关闭连接
close();
}

// 初始化并建立HBase连接
public static void init() {
// 创建HBase配置对象
Configuration conf = HBaseConfiguration.create();
try {
// 获取HBase连接对象
connection = ConnectionFactory.createConnection(conf);
// 获取HBase管理员对象,用于表的管理操作
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}

// 执行HBase表的扫描操作,根据指定的用户和时间范围查询记录
public static void scan(String tableName, String num) throws ParseException, IOException {
String phoneNume = num; // 保存用户电话号码

// 创建Scan对象,表示一次查询
Scan scan = new Scan();

// 设置查询的起始行(以用户手机号和时间戳构造)
String startRow = phoneNume + "_" + (Long.MAX_VALUE - sdf.parse("2019-1-08 21:38:13").getTime());
scan.setStartRow(startRow.getBytes()); // 设置起始行键

// 设置查询的结束行(以用户手机号和时间戳构造)
String stopRow = phoneNume + "_" + (Long.MAX_VALUE - sdf.parse("2019-11-02 01:58:13").getTime());
scan.setStopRow(stopRow.getBytes()); // 设置结束行键

// 获取指定表的Table对象
Table table = connection.getTable(TableName.valueOf(tableName));

// 执行查询并获取结果扫描器
ResultScanner resultScanner = table.getScanner(scan);

// 遍历扫描结果并输出
for (Result result : resultScanner) {
printMsg(result); // 打印每一条查询结果
}

// 关闭扫描器
resultScanner.close();
}

// 设置输出查询结果的格式
public static void printMsg(Result result) {
// 获取并输出列族basic中的dnum列的值(对方电话号码)
System.out.print(Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(), "dnum".getBytes()))) + "\t");

// 获取并输出列族basic中的type列的值(通话类型:主叫或被叫)
System.out.print(Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(), "type".getBytes()))) + "\t");

// 获取并输出列族basic中的length列的值(通话时长)
System.out.print(Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(), "length".getBytes()))) + "\t");

// 获取并输出列族basic中的date列的值(通话日期)
System.out.println(Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell("basic".getBytes(), "date".getBytes()))));
}

// 关闭HBase连接和管理员对象
public static void close() {
if (admin != null) {
try {
admin.close(); // 关闭管理员对象
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}
if (null != connection) {
try {
connection.close(); // 关闭HBase连接
} catch (IOException e) {
e.printStackTrace(); // 捕获并打印IOException异常
}
}
}
}


Hadoop大数据技术与应用
http://example.com/2024/09/02/Hadoop大数据技术与应用/
作者
Tingfeng
发布于
2024年9月2日
许可协议