一、Flume安装部署 1、下载安装 下载地址:https://flume.apache.org/download.html
官方文档:https://flume.apache.org/documentation.html
解压安装:tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
删除lib
文件夹下的guava-11.0.2.jar
(以兼容Hadoop 3.1.3):rm /opt/module/flume/lib/guava-11.0.2.jar
配置并生效环境变量:vim /etc/profile.d/my_env.sh
1 2 3 export FLUME_HOME=/opt/module/flume-1.9.0export PATH=$PATH :$FLUME_HOME /bin
修改log日志位置:vim conf/log4j.properties
1 flume.log.dir=/opt/module/flume/logs
修改堆内存配置:vim conf/flume-env.sh
1 2 export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
2、环境配置(netcat案例) 案例描述:使用Flume监听一个端口,收集该端口数据,并打印到控制台
安装netcat工具:sudo yum install -y nc
判断XXX端口(例:44444)是否被占用:sudo netstat -nlp | grep 44444
在flume目录下创建job文件夹并进入:
mkdir -p job/simpleCase
cd job/simpleCase
在job/simpleCase文件夹下创建Flume Agent配置文件并添加内容:vim flume-1-netcat-logger.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #Name the components on this agent a1.sources = r1 # 为a1的Source组件命名为r1,多个组件用空格间隔 a1.sinks = k1 # 为a1的Sink组件命名为k1,多个组件用空格间隔 a1.channels = c1 # 为a1的Channel组件命名为c1,多个组件用空格间隔 # Describe/configure the source a1.sources.r1.type = netcat # 配置r1的类型 a1.sources.r1.bind = localhost # 配置r1的绑定地址(注意localhost和hadoop102的区别) a1.sources.r1.port = 44444 # 配置r1的监听端口 # Describe the sink a1.sinks.k1.type = logger # 配置k1的类型为logger,输出到控制台 # Use a channel which buffers events in memory a1.channels.c1.type = memory # 配置c1的类型为memory a1.channels.c1.capacity = 1000 # 配置c1的容量为1000个事件 a1.channels.c1.transactionCapacity = 100 # 配置c1的事务容量为100个事件 # Bind the source and sink to the channel a1.sources.r1.channels = c1 # 配置r1的channel属性,指定r1连接到那个channel a1.sinks.k1.channel = c1 # 配置k1的channel属性,指定k1连接到那个channel
3、环境配置(taildir案例) 案例描述:使用Flume监听整个目录的实时追加文件,并上传至HDFS
在flume根目录下创建目录datas/tailCase/files和datas/tailCase/logs用于存放数据文件:mkdir -p datas/tailCase/files datas/tailCase/logs
在job/simpleCase文件夹下创建Flume Agent配置文件并添加内容:vim flume-2-taildir-hdfs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = TAILDIR a2.sources.r1.positionFile = /opt/module/flume/tail_dir.json a2.sources.r1.filegroups = f1 f2 a2.sources.r1.filegroups.f1 = /opt/module/flume/datas/tailCase/files/.*file.* a2.sources.r1.filegroups.f2 = /opt/module/flume/datas/tailCase/logs/.*log.* # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/tailDir/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = tail- # 是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true # 多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 # 重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 # 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream) a2.sinks.k1.hdfs.fileType = DataStream # 多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 60 # 设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 # 文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
二、Flume运行
写法一:flume-ng agent ``--conf`` conf/ ``--name`` a1 ``--conf-file`` job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
写法二:flume-ng agent ``-c`` conf/ ``-n`` a1 ``-f`` job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
常用参数
参数说明
–conf/-c
配置文件存储目录
–name/-n
Agent名字
–conf-file/-f
配置文件路径
-Dflume.root.logger
flume运行时动态修改flume.root.logger参数属性值,并设置日志打印级别(log、info、warn、error)
脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #!/bin/bash case $1 in "start" ){ for i in hadoop102 hadoop103 do echo " --------启动 $i 上游flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &" done };; "stop" ){ for i in hadoop102 hadoop103 do echo " --------停止 $i 上游flume-------" ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9" done };; esac