Part 10 - Flume 和 Sqoop 操作实例

2021-06-01
7分钟阅读时长

【版本】

当前版本号v20230414

版本修改说明
v20230414修改nc命令的注释,增加安装命令
v20220418上传commons-lang包,修复sqoop从MariaDB到HBase导入数据的错误
v20210611新增Sqoop的内容
v20210607初始化版本

实验10.1 - 部署 Flume

【实验目的】

  • 部署 Flume

【实验环境】

  • Hadoop 3
  • CentOS 7

【实验资源】

  • Flume 安装包
链接:https://pan.baidu.com/s/1MoQ0iU0Qb1o8_o5JV6X6iw 
提取码:3rno

【实验内容】

  • 部署 Flume

【实验步骤】

  1. 在NodeA节点运行以下语句,注意提升为root权限执行。
  • 创建 Flume 的安装目录。
su
mkdir /opt/flume
chown hadoop:wheel /opt/flume
  • 提升 root 用户权限执行以下语句,加入 ZooKeeper 环境变量。
su
echo "export FLUME_HOME=/opt/flume
export PATH=\$FLUME_HOME/bin:\$PATH" >>/etc/profile
  • 切换回 hadoop 用户
su hadoop
  • 使环境变量生效。
source /etc/profile
  1. 使用 hadoop 登录NodeA节点。
su hadoop
  1. 上传 Flume 安装包apache-flume-1.8.0-bin.tar.gz 到 NodeA 节点 /home/hadoop 目录。

  2. 解压apache-flume-1.8.0-bin.tar.gz/home/hadoop 目录。

tar -xvf apache-flume-1.8.0-bin.tar.gz
  1. 把解压以后的目录移到安装目录
sudo mv ~/apache-flume-1.8.0-bin/* /opt/flume
  1. 输入命令查看 Flume 版本
flume-ng version

实验10.2 Flume 的配置与使用 Avro(Source) -> Memory(Channel) -> Logger(Sink)

【实验目的】

    1. 理解 Flume 的基本原理,掌握各组件的作用及关系。
    1. 熟悉 Flume 的常用配置。

【实验原理】

Avro Source 会启动一个 RPC 的 Netty 服务器,此实验配置监听端口为 4141 。Avro Source 通过监听发送过来的文件,通过 Flume 的 Channel,输出到 Logger Sink。Logger Sink 可以打印文件的内容到控制台。

【实验环境】

  • Hadoop 3
  • CentOS 7
  • Flume 1.8.0 (下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz)

【实验内容】

  • 配置 Flume 与使用 Avro(Source)、Memory(Channel)、Logger(Sink)组合的Agent

【实验步骤】

  1. 创建 Agent 配置文件/opt/flume/conf/avro.conf,并按如下配置。
vim /opt/flume/conf/avro.conf
agent1.sources = srch2
agent1.sinks = ssink2
agent1.channels = ch1

# 配置 Source 监听端口为4141的 avro 服务
agent1.sources.srch2.type = avro
agent1.sources.srch2.bind = 0.0.0.0
agent1.sources.srch2.port = 4141
agent1.sources.srch2.channels = ch1

# 配置 Sink
agent1.sinks.ssink2.type = logger
agent1.sinks.ssink2.channel = ch1

# 配置 Channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 100

  1. 启动 Agent agent1,命令如下。
flume-ng agent --conf /opt/flume/conf/ --conf-file /opt/flume/conf/avro.conf --name agent1 -Dflume.root.logger=INFO,console
  1. 用 XShell 重新打开一个新终端窗口,创建 avro-input1.txt 输入一些信息。
echo "hello flume" > ~/avro-input1.txt
  1. 在新终端窗口使用 avro-client 向 agent1 监听的 avro 服务发送文件。
flume-ng avro-client -c /opt/flume/conf/ -H 0.0.0.0 -p 4141 -F ~/avro-input1.txt
  1. 在第一个终端窗口输出信息的最后一行可看到发送文件的内容。
2021-06-07 17:00:36,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65                hello flume }

实验 10.3 Flume 的配置与使用 Syslog(Source) -> Memory(Channel) -> HDFS(Sink)

【实验目的】

1.理解 Flume 的基本原理,掌握各组件的作用及关系。 2.熟悉 Flume 的常用配置。

【实验原理】

Syslog Source 读取 syslog 数据,产生 Event,syslog 支持 UDP 和 TCP 协议。通过 Memory Channel,把 Event 写入 HDFS。

【实验环境】

  • Hadoop 3
  • CentOS 7
  • Flume 1.8.0 (下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz)

【实验内容】

  • 配置 Flume 与使用 Syslog(Source)、Memory(Channel) 、HDFS(Sink)组合的Agent

【实验步骤】

  1. 创建 Agent 配置文件/opt/flume/conf/syslogtcp.conf,并按如下配置。注意替换为你的学号。
vim /opt/flume/conf/syslogtcp.conf
agent2.sources = src2
agent2.sinks = sink2
agent2.channels = ch2
# 配置 Source
agent2.sources.src2.type = syslogtcp
agent2.sources.src2.port = 5140
agent2.sources.src2.host = localhost
agent2.sources.src2.channels = ch2
# 配置 Sink
agent2.sinks.sink2.type = hdfs
#注意这里的 HDFS 的IP地址替换为你的真实IP地址
agent2.sinks.sink2.hdfs.path = hdfs://nodea你的学号后4位:8020/user/hadoop/flume/syslogtcp输入你的学号后4位
agent2.sinks.sink2.hdfs.filePrefix = Syslog
agent2.sinks.sink2.hdfs.round = true
agent2.sinks.sink2.hdfs.roundValue = 10
agent2.sinks.sink2.hdfs.roundUnit = minute
agent2.sinks.sink2.channel = ch2

# 配置 Channel
agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 1000
agent2.channels.ch2.transactionCapacity = 100
# 绑定 Source 和 Sink 到 Channel
  1. 启动 HDFS。
start-dfs.sh
  1. 启动 Agent agent2,命令如下。
flume-ng agent -c /opt/flume/conf/ -f /opt/flume/conf/syslogtcp.conf -n agent2 -Dflume.root.logger=INFO,console
  1. XShell 启动一个新的终端窗口,输入以下命令,测试产生 syslog。请注意以下命令需要替换为你的个人学号。
echo "hello 替换为你的学号" | nc localhost 5140

注1:nc命令找不到,可以使用 yum install nc -y 命令安装。关于nc(netcat)命令,大家可以搜索“Linux netcat 命令”或参考 http://www.linuxso.com/command/nc.html

注2:关于 | (管道)命令,大家可以搜索“Linux 管道 命令”或参考 https://blog.csdn.net/hellojoy/article/details/77337854

  1. 在新的终端窗口查看 HDFS 相应配置的路径上是否生成了 syslogtcp 文件,并查看文件内容,正确能看到“hello 你的学号”。(请注意以下命令需要替换为你的个人学号。)
hdfs dfs -ls /user/hadoop/flume/syslogtcp你的学号后4位
hdfs dfs -cat /user/hadoop/flume/syslogtcp你的学号后4位/Syslog.xxxxxx

实验 10.4 Sqoop 的安装

【实验目的】

  • 熟悉 Sqoop 的安装

【实验环境】

  • Hadoop 3
  • CentOS 7
  • Sqoop 1.4.7

【实验资源】

  • Sqoop 安装包
链接:https://pan.baidu.com/s/1MoQ0iU0Qb1o8_o5JV6X6iw 
提取码:3rno

【实验内容】

  • 安装 Sqoop

【实验步骤】

  1. 在NodeA节点运行以下语句,注意提升为root权限执行。
  • 创建 Sqoop 的安装目录。
su
mkdir /opt/sqoop
chown hadoop:wheel /opt/flume
  • 提升 root 用户权限执行以下语句,加入 ZooKeeper 环境变量。
su
echo "export SQOOP_HOME=/opt/sqoop
export PATH=\$SQOOP_HOME/bin:\$PATH" >>/etc/profile
  • 切换 hadoop 用户
su hadoop
  • 使环境变量生效。
source /etc/profile
  1. 使用 hadoop 登录NodeA节点。
su hadoop
  1. 上传 Sqoop 安装包sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz 到 NodeA 节点 /home/hadoop 目录。

  2. 解压sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz/home/hadoop 目录。

tar -xvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
  1. 把解压以后的目录移到安装目录
sudo mv ~/sqoop-1.4.7.bin__hadoop-2.6.0/* /opt/sqoop
  1. 查看 Sqoop 版本,验证 Sqoop 是否正确安装
sqoop version
  1. 上传 MySQL 的驱动包 mysql-connector-java-5.1.48.jar/home/hadoop 目录。

  2. 拷贝驱动包到 Sqoop 的 lib 目录下。

mv ~/mysql-connector-java-5.1.48.jar $SQOOP_HOME/lib
  1. 拷贝 Hive 的依赖包到 Sqoop 的 lib 目录下。
cp $HIVE_HOME/lib/hive-common-2.3.8.jar $SQOOP_HOME/lib/

实验 10.5 使用 Sqoop 进行数据转换

【实验目的】

  • 掌握使用 Sqoop 在 MariaDB、Hive、HDFS、HBase 之间进行数据转换

【实验环境】

  • Hadoop 3
  • CentOS 7
  • Sqoop 1.4.7 (下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz)

【实验资源】

  • commons-lang-2.6.jar

【实验内容】

  • 使用 Sqoop 在 MariaDB、Hive、HDFS、HBase 之间进行数据转换

【实验步骤】

  1. 使用 hadoop 登录NodeA节点。
su hadoop
  1. 上传员工表SQL脚本文件 EMP.sql/home/hadoop

  2. 使用 root 用户登录 MariaDB。

mysql -u root -p
  1. 在 MariaDB 中创建 sqoop 库和 sqoop 用户。
create database sqoopdb;
use sqoopdb;
create user 'sqoop'@'localhost' identified by 'sqoop123';
create user 'sqoop'@'%' identified by 'sqoop123';
grant all on sqoopdb.* to 'sqoop'@'localhost';
grant all on sqoopdb.* to 'sqoop'@'%';
  1. 执行 EMP.sql 文件 SQL 语句。
use sqoopdb;
source /home/hadoop/EMP.sql
  1. 在 MariaDB 查询员工表内容。
select * from EMP;
  1. 退出 MariaDB 的命令终端
exit
  1. 使用 Sqoop 对 MariaDB 的数据库进行查询。是否存在库 sqoopdb
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root -password 123456
  1. 使用 Sqoop 对 MariaDB 的数据库 sqoopdb 进行查询。是否存在表EMP
sqoop list-tables --connect jdbc:mysql://localhost:3306/sqoopdb --username sqoop -password sqoop123

导入数据从 MariaDB 到 HDFS

  1. 启动 Hadoop。
start-hdp.sh
  1. 把 MariaDB 中的 EMP表导出到 HDFS 的 /sqoop目录。其中参数 -m 1 表示使用1个 Mapper。注意替换为你的学号后4位。
hdfs dfs -rm -r /sqoop
sqoop import --connect jdbc:mysql://nodea你的学号后4位:3306/sqoopdb --username sqoop -password sqoop123 --table EMP -target-dir /sqoop -m 1
  1. 输出导入的结果进行查看
hdfs dfs -cat /sqoop/part-m-00000

导入数据从 MariaDB 到 Hive

  1. 把 MariaDB 的EMP表导入到 Hive。注意替换为你的学号。
sqoop import --connect jdbc:mysql://nodea你的学号后4位:3306/sqoopdb --username sqoop -password sqoop123 --table EMP --fields-terminated-by '\t' --target-dir /user/hadoop/db2hive --num-mappers 1 --hive-database default --hive-import --hive-table emp
  1. 进入 Hive,并查询 emp 表的内容。
hive
hive (default)> use default;
hive (default)> select * from emp;

导入数据从 MariaDB 到 HBase

  1. 启动 HBase,并登录 HBase。
start-dfs.sh
start-hbase.sh
hbase shell
  1. 创建一个 HBase 表 EMP。
create 'EMP', { NAME => 'EMPINFO', VERSIONS => 5}

注:上面命令在 HBase 中创建了一个 EMP 表,这个表中有一个列族 EMPINFO,历史版本保留数量为 5。

  1. 创建完成,通过命令 list 可看到 HBase 中有表 EMP。
list
  1. 退出HBase Shell。
quit
  1. commons-lang-2.6.jar上传到 /opt/sqoop/lib目录。
  2. 修改commons-lang-2.6.jar 权限,让hadoop用户可以读取。
chown hadoop:wheel /opt/sqoop/lib/commons-lang-2.6.jar
chmod 544 /opt/sqoop/lib/commons-lang-2.6.jar
  1. 执行以下命令把 MariaDB 中的 EMP 表导入到 HBase 中的 EMP 表,并使用 EMPNO 作为 Row key。注意替换为你的学号。
sqoop import --connect jdbc:mysql://nodea你的学号后4位:3306/sqoopdb --username sqoop --password sqoop123 --table EMP --hbase-table EMP --column-family EMPINFO --hbase-row-key EMPNO
  1. 再次进入 Hbase Shell
hbase shell
  1. 查看 EMP 表的数据
scan 'EMP'

导出数据从 HDFS 到 MariaDB

  1. 在导出前需要在 MariaDB 中创建接收 HDFS 数据的空表EMP2。登录 sqp。
mysql sqoopdb -u sqoop -psqoop123
  1. 创建接收数据的空表 EMP2
create table `EMP2` like `EMP`;
  1. 对比EMPEMP2的表结构。
describe `EMP2`;
describe `EMP`;
  1. 通过以下命令把之前导入到 HDFS 的数据,再次导出到 MariaDB 的 EMP2 表。注意替换为你的学号。
sqoop export --connect jdbc:mysql://nodea你的学号后4位:3306/sqoopdb --table EMP2 --export-dir /sqoop/part-m-00000 --username sqoop --password sqoop123 -m 1
  1. 导出成功后可进入 MariaDB 查看导出的内容。
select * from  `EMP2`;

扫码或长按识别访问