Kafka安装部署

下载安装

下载地址:https://kafka.apache.org/downloads.html

官方文档:https://kafka.apache.org/30/documentation.html

  1. 解压安装:tar -zxvf /opt/software/kafka_2.12-3.0.0.tgz -C /opt/module/
  2. 配置并生效环境变量:vim /etc/profile.d/my_env.sh
1
2
3
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

配置文件

修改配置文件后,分发Kafka和环境变量

配置文件参考:

org.apache.kafka.clients.consumer.ConsumerConfig

org.apache.kafka.clients.producer.ProducerConfig

kafka.server.KafkaConfig

hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka

server.properties

依赖Zookeeper模式

  • 修改配置文件server.properties:vim ./config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#broker的全局唯一编号(分发需更改)
broker.id=102
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka会自动创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

Kafka-Kraft模式

  • 修改配置文件server.properties:vim ./config/kraft/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#kafka的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能)
process.roles=broker, controller
#节点ID(分发需更改)
node.id=2
#controller服务协议别名
controller.listener.names=CONTROLLER
#全Controller列表
controller.quorum.voters=2@hadoop102:9093,3@hadoop103:9093,4@hadoop104:9093
#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker服务协议别名
inter.broker.listener.name=PLAINTEXT
#broker对外暴露的地址(分发需更改)
advertised.Listeners=PLAINTEXT://hadoop102:9092
#协议别名到安全协议的映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka数据存储目录
log.dirs=/opt/module/kafka2/data

Kafka集群启动/停止

依赖Zookeeper模式

  • 启动集群(每台Kafka节点)
    • 启动Zookeeper集群
    • 启动Kafka集群:kafka-server-start.sh -daemon ./config/server.properties
  • 停止集群(每台Kafka节点)
    • 停止Kafka集群:kafka-server-stop.sh

需要等Kafka所有节点进程全部停止后,停止Zookeeper集群。若Zookeeper集群先停止,Kafka集群无法再获取停止进程的信息,只能手动杀死Kafka进程

Kafka-Kraft模式

  • 初始化集群
    • 生成存储目录唯一ID(一台Kafka节点):kafka-storage.sh random-uuid
    • 格式化存储目录(每台Kafka节点):kafka-storage.sh format -t 唯一ID -c ./config/kraft/server.properties
  • 启动集群(每台Kafka节点):kafka-server-start.sh -daemon ./config/kraft/server.properties
  • 停止集群(每台Kafka节点):kafka-server-stop.sh

Kafka集群操作

Topic命令行操作

  • 查看命令帮助:kafka-topics.sh

举例:kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic first

常用参数 作用
–bootstrap-server 连接kafka Broker主机名称和端口号
–topic 操作的topic名称
–create 创建主题
–delete 删除主题
–alter 修改主题
–list 查看所有主题
–describe 查看主题详细描述
–partitions 设置主题分区数
–replication-factor 设置主题分区副本
–config 更改系统默认配置

Producer命令行操作

  • 查看命令帮助:kafka-console-producer.sh

举例:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

常用参数 作用
–bootstrap-server 连接kafka Broker主机名称和端口号
–topic 操作的topic名称

Consumer命令行操作

  • 查看命令帮助:kafka-console-consumer.sh

举例:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

常用参数 作用
–bootstrap-server 连接kafka Broker主机名称和端口号
–topic 操作的topic名称
–from-beginning 从头开始消费
–group 指定消费者组名称

Partition命令行操作

  • 查看命令帮助:kafka-leader-election.sh

举例:kafka-leader-election.sh --bootstrap-server hadoop102:9092 --topic first --election-type preferred --partition 2

常用参数 作用
–bootstrap-server 连接kafka Broker主机名称和端口号
–topic 操作的topic名称
–election-type 重新选举
–partition 指定分区

Kafka工具类

  • 查看命令帮助:kafka-run-class.sh

举例:kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

Kafka API

API文档:https://kafka.apache.org/30/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>

Producer API

必须配置序列化

常用类 常用方法
java.util.Properties
创建配置对象
添加配置信息:put()
org.apache.kafka.clients.producer.ProducerConfig
设置配置信息
org.apache.kafka.clients.producer.ProducerRecord
创建信息对象
获取信息:value()
org.apache.kafka.clients.producer.KafkaProducer
创建生产者对象
发送信息:send()初始化事务:initTransactions()
开启事务:beginTransaction()
事务内提交已消费的偏移量:sendOffsetsToTransaction()
提交事务:commitTransaction()
事务回滚:abortTransaction()
关闭资源:close()
org.apache.kafka.clients.producer.RecordMetadata
获取元数据信息
获取主题:topic()
获取分区:partition()
org.apache.kafka.common.Cluster
集群信息
获取分区数:partitionCountForTopic()
常用接口 作用 常用方法
org.apache.kafka.clients.producer.Callback 回调 回调函数:onCompletion()
org.apache.kafka.clients.producer.Partitioner 分区 自定义分区:partition()
java.util.concurrent.Future 并发处理 同步发送:get()

Consumer API

必须配置序列化消费者组ID

常用类 作用 常用方法
java.util.Properties 创建配置对象 添加配置信息:put()
org.apache.kafka.clients.consumer.ConsumerConfig 设置配置信息
org.apache.kafka.clients.consumer.ConsumerRecord 创建信息对象 获取信息:value()
org.apache.kafka.clients.consumer.ConsumerRecords 创建多个信息对象
org.apache.kafka.clients.consumer.KafkaConsumer 创建消费者对象 订阅主题:subscribe()拉取数据:poll()同步提交:commitSync()异步提交:commitAsync()
org.apache.kafka.clients.producer.RecordMetadata 获取元数据信息 获取主题:topic()获取分区:partition()
org.apache.kafka.common.Cluster 集群信息 获取分区数:partitionCountForTopic()
java.time.Duration 时间间隔 单位为秒:ofSeconds()

Kafka常见配置

Producer

Producer API 进行配置

常用配置参数 描述
bootstrap.servers 生产者连接集群所需的broker地址清单(可以设置1个或者多个,逗号隔开)
key.serializer、 value.serializer 指定发送消息的key和value的序列化类型(全类名)
buffer.memory RecordAccumulator缓冲区总大小(默认32m)
batch.size 缓冲区一批数据最大值(默认16k)
linger.ms 缓冲区数据发送等待时间(默认0ms,表示没有延迟)
acks acks的值(默认-1)
max.in.flight.requests.per.connection 允许ack未返回的最大次数(默认5)
Retries 消息发送出现错误时的重试次数(默认int最大值)
retry.backoff.ms 两次重试之间的时间间隔(默认100ms)
enable.idempotence 是否开启幂等性(默认true)
compression.type 发送数据的压缩方式(默认none)

Broker

常用配置参数 描述
replica.lag.time.max.ms ISR中的Follower超过该事件阈值(默认30s)未向Leader发送同步数据,则该Follower将被踢出ISR。
auto.leader.rebalance.enable 默认是true。 自动Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds 默认值300秒。检查leader负载是否平衡的间隔时间。
log.segment.bytes Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。
log.index.interval.bytes 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。
log.retention.hours Kafka中数据保存的时间,默认7天。
log.retention.minutes Kafka中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka中数据保存的时间,毫秒级别,默认关闭。(优先级最高)
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是5分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。
log.cleanup.policy 默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。
num.io.threads 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的50%的1/3
num.network.threads 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是Max(long) (9223372036854775807)。一般交给系统管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

Consumer

Consumer API 进行配置

参数名称 描述
bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。
key.deserializer、value.deserializer 指定接收消息的key和value的反序列化类型。要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 若enable.auto.commit=true, 表示消费者提交偏移量频率,默认5s。
auto.offset.reset 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.ms Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
session.timeout.ms Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次poll拉取数据返回消息的最大条数,默认是500条。
partition.assignment.strategy 消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

附:Zookeeper的存储信息

img

附:集群脚本

Kafka群起脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/bin/bash

if [ $# != 1 ];
then
echo -e "请输入参数: \nstart 启动kafka集群; \nstop 停止kafka集群;" && exit
fi

case $1 in
"start")
for host in hadoop103 hadoop102 hadoop104
do
echo "$host 启动kafka集群"
ssh $host "kafka-server-start.sh -daemon /opt/module/kafka-3.0.0/config/server.properties"
done
;;
"stop")
for host in hadoop103 hadoop102 hadoop104
do
echo "$host 停止kafka集群"
ssh $host "kafka-server-stop.sh"
done
;;
*)
echo -e "请输入参数: \nstart 启动kafka集群; \nstop 停止kafka集群"
;;
esac

Kafka-Kraft群起脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#! /bin/bash
case $1 in
"start"){
for host in hadoop102 hadoop103 hadoop104
do
echo "$host 启动Kafka-Kraft集群"
ssh $host "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"
done
};;
"stop"){
for host in hadoop102 hadoop103 hadoop104
do
echo "$host 停止Kafka-Kraft集群"
ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "
done
};;
esac