数据开发 - Kafka实操
Kafka安装部署
下载安装
下载地址:https://kafka.apache.org/downloads.html
官方文档:https://kafka.apache.org/30/documentation.html
- 解压安装:
tar -zxvf /opt/software/kafka_2.12-3.0.0.tgz -C /opt/module/
- 配置并生效环境变量:
vim /etc/profile.d/my_env.sh
1 | #KAFKA_HOME |
配置文件
修改配置文件后,分发Kafka和环境变量
配置文件参考:
org.apache.kafka.clients.consumer.ConsumerConfig
org.apache.kafka.clients.producer.ProducerConfig
kafka.server.KafkaConfig
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
server.properties
依赖Zookeeper模式
- 修改配置文件server.properties:
vim ./config/server.properties
1 | #broker的全局唯一编号(分发需更改) |
Kafka-Kraft模式
- 修改配置文件server.properties:
vim ./config/kraft/server.properties
1 | #kafka的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能) |
Kafka集群启动/停止
依赖Zookeeper模式
- 启动集群(每台Kafka节点)
- 启动Zookeeper集群
- 启动Kafka集群:
kafka-server-start.sh -daemon ./config/server.properties
- 停止集群(每台Kafka节点)
- 停止Kafka集群:
kafka-server-stop.sh
- 停止Kafka集群:
需要等Kafka所有节点进程全部停止后,停止Zookeeper集群。若Zookeeper集群先停止,Kafka集群无法再获取停止进程的信息,只能手动杀死Kafka进程
Kafka-Kraft模式
- 初始化集群
- 生成存储目录唯一ID(一台Kafka节点):
kafka-storage.sh random-uuid
- 格式化存储目录(每台Kafka节点):
kafka-storage.sh format -t 唯一ID -c ./config/kraft/server.properties
- 生成存储目录唯一ID(一台Kafka节点):
- 启动集群(每台Kafka节点):
kafka-server-start.sh -daemon ./config/kraft/server.properties
- 停止集群(每台Kafka节点):
kafka-server-stop.sh
Kafka集群操作
Topic命令行操作
- 查看命令帮助:
kafka-topics.sh
举例:
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic first
常用参数 | 作用 |
---|---|
–bootstrap-server | 连接kafka Broker主机名称和端口号 |
–topic | 操作的topic名称 |
–create | 创建主题 |
–delete | 删除主题 |
–alter | 修改主题 |
–list | 查看所有主题 |
–describe | 查看主题详细描述 |
–partitions | 设置主题分区数 |
–replication-factor | 设置主题分区副本 |
–config | 更改系统默认配置 |
Producer命令行操作
- 查看命令帮助:
kafka-console-producer.sh
举例:
kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
常用参数 | 作用 |
---|---|
–bootstrap-server | 连接kafka Broker主机名称和端口号 |
–topic | 操作的topic名称 |
Consumer命令行操作
- 查看命令帮助:
kafka-console-consumer.sh
举例:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
常用参数 | 作用 |
---|---|
–bootstrap-server | 连接kafka Broker主机名称和端口号 |
–topic | 操作的topic名称 |
–from-beginning | 从头开始消费 |
–group | 指定消费者组名称 |
Partition命令行操作
- 查看命令帮助:
kafka-leader-election.sh
举例:
kafka-leader-election.sh --bootstrap-server hadoop102:9092 --topic first --election-type preferred --partition 2
常用参数 | 作用 |
---|---|
–bootstrap-server | 连接kafka Broker主机名称和端口号 |
–topic | 操作的topic名称 |
–election-type | 重新选举 |
–partition | 指定分区 |
Kafka工具类
- 查看命令帮助:
kafka-run-class.sh
举例:
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Kafka API
API文档:https://kafka.apache.org/30/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
1 | <dependencies> |
Producer API
必须配置序列化
常用类 | 常用方法 |
---|---|
java.util.Properties 创建配置对象 |
添加配置信息:put() |
org.apache.kafka.clients.producer.ProducerConfig 设置配置信息 |
|
org.apache.kafka.clients.producer.ProducerRecord 创建信息对象 |
获取信息:value() |
org.apache.kafka.clients.producer.KafkaProducer 创建生产者对象 |
发送信息:send()初始化事务:initTransactions() 开启事务:beginTransaction() 事务内提交已消费的偏移量:sendOffsetsToTransaction() 提交事务:commitTransaction() 事务回滚:abortTransaction() 关闭资源:close() |
org.apache.kafka.clients.producer.RecordMetadata 获取元数据信息 |
获取主题:topic() 获取分区:partition() |
org.apache.kafka.common.Cluster 集群信息 |
获取分区数:partitionCountForTopic() |
常用接口 | 作用 | 常用方法 |
---|---|---|
org.apache.kafka.clients.producer.Callback | 回调 | 回调函数:onCompletion() |
org.apache.kafka.clients.producer.Partitioner | 分区 | 自定义分区:partition() |
java.util.concurrent.Future | 并发处理 | 同步发送:get() |
Consumer API
必须配置序列化和消费者组ID
常用类 | 作用 | 常用方法 |
---|---|---|
java.util.Properties | 创建配置对象 | 添加配置信息:put() |
org.apache.kafka.clients.consumer.ConsumerConfig | 设置配置信息 | |
org.apache.kafka.clients.consumer.ConsumerRecord | 创建信息对象 | 获取信息:value() |
org.apache.kafka.clients.consumer.ConsumerRecords | 创建多个信息对象 | |
org.apache.kafka.clients.consumer.KafkaConsumer | 创建消费者对象 | 订阅主题:subscribe()拉取数据:poll()同步提交:commitSync()异步提交:commitAsync() |
org.apache.kafka.clients.producer.RecordMetadata | 获取元数据信息 | 获取主题:topic()获取分区:partition() |
org.apache.kafka.common.Cluster | 集群信息 | 获取分区数:partitionCountForTopic() |
java.time.Duration | 时间间隔 | 单位为秒:ofSeconds() |
Kafka常见配置
Producer
Producer API 进行配置
常用配置参数 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单(可以设置1个或者多个,逗号隔开) |
key.serializer、 value.serializer | 指定发送消息的key和value的序列化类型(全类名) |
buffer.memory | RecordAccumulator缓冲区总大小(默认32m) |
batch.size | 缓冲区一批数据最大值(默认16k) |
linger.ms | 缓冲区数据发送等待时间(默认0ms,表示没有延迟) |
acks | acks的值(默认-1) |
max.in.flight.requests.per.connection | 允许ack未返回的最大次数(默认5) |
Retries | 消息发送出现错误时的重试次数(默认int最大值) |
retry.backoff.ms | 两次重试之间的时间间隔(默认100ms) |
enable.idempotence | 是否开启幂等性(默认true) |
compression.type | 发送数据的压缩方式(默认none) |
Broker
常用配置参数 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中的Follower超过该事件阈值(默认30s)未向Leader发送同步数据,则该Follower将被踢出ISR。 |
auto.leader.rebalance.enable | 默认是true。 自动Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds | 默认值300秒。检查leader负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。 |
log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 |
log.retention.hours | Kafka中数据保存的时间,默认7天。 |
log.retention.minutes | Kafka中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka中数据保存的时间,毫秒级别,默认关闭。(优先级最高) |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是5分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 |
log.cleanup.policy | 默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的50%的1/3 |
num.network.threads | 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是Max(long) (9223372036854775807)。一般交给系统管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |
Consumer
Consumer API 进行配置
参数名称 | 描述 |
---|---|
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer、value.deserializer | 指定接收消息的key和value的反序列化类型。要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 若enable.auto.commit=true, 表示消费者提交偏移量频率,默认5s。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
partition.assignment.strategy | 消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky |
附:Zookeeper的存储信息
附:集群脚本
Kafka群起脚本
1 |
|
Kafka-Kraft群起脚本
1 |
|