一、Flume安装部署

1、下载安装

下载地址:https://flume.apache.org/download.html

官方文档:https://flume.apache.org/documentation.html

  1. 解压安装:tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  2. 删除lib文件夹下的guava-11.0.2.jar(以兼容Hadoop 3.1.3):rm /opt/module/flume/lib/guava-11.0.2.jar
  3. 配置并生效环境变量:vim /etc/profile.d/my_env.sh
1
2
3
#FLUME_HOME
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

修改log日志位置:vim conf/log4j.properties

1
flume.log.dir=/opt/module/flume/logs

修改堆内存配置:vim conf/flume-env.sh

1
2
# 取消该行注释,使用Memory Channel时可以修改最大内存配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

2、环境配置(netcat案例)

案例描述:使用Flume监听一个端口,收集该端口数据,并打印到控制台

  1. 安装netcat工具:sudo yum install -y nc
  2. 判断XXX端口(例:44444)是否被占用:sudo netstat -nlp | grep 44444
  3. 在flume目录下创建job文件夹并进入:
    1. mkdir -p job/simpleCase
    2. cd job/simpleCase
  4. 在job/simpleCase文件夹下创建Flume Agent配置文件并添加内容:vim flume-1-netcat-logger.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Name the components on this agent
a1.sources = r1 # 为a1的Source组件命名为r1,多个组件用空格间隔
a1.sinks = k1 # 为a1的Sink组件命名为k1,多个组件用空格间隔
a1.channels = c1 # 为a1的Channel组件命名为c1,多个组件用空格间隔

# Describe/configure the source
a1.sources.r1.type = netcat # 配置r1的类型
a1.sources.r1.bind = localhost # 配置r1的绑定地址(注意localhost和hadoop102的区别)
a1.sources.r1.port = 44444 # 配置r1的监听端口

# Describe the sink
a1.sinks.k1.type = logger # 配置k1的类型为logger,输出到控制台

# Use a channel which buffers events in memory
a1.channels.c1.type = memory # 配置c1的类型为memory
a1.channels.c1.capacity = 1000 # 配置c1的容量为1000个事件
a1.channels.c1.transactionCapacity = 100 # 配置c1的事务容量为100个事件

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 # 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1 # 配置k1的channel属性,指定k1连接到那个channel

3、环境配置(taildir案例)

案例描述:使用Flume监听整个目录的实时追加文件,并上传至HDFS

  1. 在flume根目录下创建目录datas/tailCase/files和datas/tailCase/logs用于存放数据文件:mkdir -p datas/tailCase/files datas/tailCase/logs
  2. 在job/simpleCase文件夹下创建Flume Agent配置文件并添加内容:vim flume-2-taildir-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume/tail_dir.json
a2.sources.r1.filegroups = f1 f2
a2.sources.r1.filegroups.f1 = /opt/module/flume/datas/tailCase/files/.*file.*
a2.sources.r1.filegroups.f2 = /opt/module/flume/datas/tailCase/logs/.*log.*

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/tailDir/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = tail-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream)
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

二、Flume运行

  • 写法一:flume-ng agent ``--conf`` conf/ ``--name`` a1 ``--conf-file`` job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
  • 写法二:flume-ng agent ``-c`` conf/ ``-n`` a1 ``-f`` job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
常用参数 参数说明
–conf/-c 配置文件存储目录
–name/-n Agent名字
–conf-file/-f 配置文件路径
-Dflume.root.logger flume运行时动态修改flume.root.logger参数属性值,并设置日志打印级别(log、info、warn、error)

脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 上游flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 上游flume-------"
ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9"
done
};;
esac