2.1 安装 Kafka Broker
在安装 Zookeeper 和 Kafka 之前,需要先安装 Java 环境,因为 Kafka 许多功能使用 Java 语言编写,如果运行源码时还需要安装 Scala。
Zookeeper 用于保存集群的元数据信息和消费者信息。
在 MacOS 环境下安装,使用 Homebrew
只需一条命令即可,在安装 Kafka 时 Homebrew 会自动下载相应的依赖,自然也包括 Zookeeper、OpenJDK。
1 | brew install kafka |
Homebrew 会将 Kafka 安装到 /usr/local/Cellar
目录,不过某些文件会被链接到其他目录。
- 二进制文件和脚本文件在
/usr/local/bin
目录下。 - Kafka 配置文件在
/usr/local/etc/kafka
目录下。 - Zookeeper 配置文件在
/usr/local/etc/zookeeper
目录下,需要配置zoo.cfg
。 log.dirs
(Kafka 的数据目录)被设置为/usr/local/var/lib/kafka-logs
。
安装完毕后,就可以启动并测试 Zookeeper 和 Kafka了。
1 | zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties |
Zookeeper 集群被称为群组,使用的是一致性协议,因此建议集群中包含奇数个节点,因为只有当群组里的大多数节点处于可用状态,Zookeeper 才能处理外部的请求。
2.2 Broker 配置
更为详细的配置信息可参考 Kafka 官网:Configuration
2.2.1 常规配置
位于 /usr/local/etc/kafka/server.properties
。
配置项 | 说明 |
---|---|
broker.id |
Kafka 集群内需唯一,默认值为 0。 |
port |
默认 9092 ,若设置到 1024 以下需要 root 权限启动。 |
zookeeper.connect |
默认 2181 ,冒号分割的一组 host:port/path |
log.dirs |
逗号分隔的本地文件系统路径,broker 会根据“最少使用”原则把同一个分区的日志片段保存到同一个路径下。 |
num.recovery.threads.per.data.dir |
每个数据目录的线程数,所配置的数字对应的是 log.dirs 指定的单个日志目录。 |
auto.create.topics.enable |
是否启动自动创建主题 |
num.recovery.threads.per.data.dir
:Kafka 使用可配置的线程池来处理日志片段的几种情况
- 服务器正常启动,用于打开每个分区的日志片段;
- 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
- 服务器正常关闭,用于关闭日志片段。
auto.create.topics.enable
:Kafka 自动创建主题的几种情况
- 当一个
Producer
向Topic
写入消息时; - 当一个
Consumer
从Topic
读取消息时; - 当任意一个
Client
向Topic
发送Metadata
请求时。
2.2.2 主题的默认配置
配置项 | 说明 |
---|---|
num.partitions |
新建主题所包含的分区个数,默认值为 1 。 |
log.retention.hours |
数据被保留的时间,默认 168 小时,即一周,也可将hours 换成 minutes 与ms 。 |
log.retention.bytes |
通过消息字节数判断消息是否过期,作用在每一个分区上。 |
log.segment.bytes |
关闭日志片段的阈值,默认 1GB ,超过此值将开启新的日志片段。 |
log.segment.ms |
关闭日志片段的阈值,以时间为单位。与log.segment.bytes 非互斥关系。 |
message.max.bytes |
单条消息被压缩后的最大阈值,默认为 1 000 000 ,即 1MB ,超过则会被拒绝。 |
Kafka 集群通过 Partition 实现对 Topic 的横向扩展,当有新的 Broker 加入集群时,可以通过 Partition 来实现集群的负载均衡。那么,如何确定分区的数量?
- Topic 需要多大的吞吐量?
- 单个分区读取数据的最大吞吐量是多少?
- 生产者向单个分区写入数据的吞吐量是多少?
- 每个
broker
包含的分区个数、可用的磁盘空间和网络带宽。 - 可以使用 主题吞吐量/消费者吞吐量 计算分区个数。
消息何时会被清除?
- 通过
log.retention.hours/minutes/ms
与log.retention.bytes
确定,当消息条件满足任意条件之一时都将会被删除。 - 日志片段被关闭之前消息是不会过期的,即在达到
log.segment.bytes
设置的阈值之后日志片段关闭,然后再达到log.retention.xxxx
条件之后消息才会真正过期。
在服务端和客户端之间协调消息大小的配置
- 消费者客户端所设置的
fetch.message.max.bytes
需要与服务器设置的消息大小message.max.bytes
所协调,如果这个值比message.max.bytes
小,那么消费者将无法读取较大的消息,导致消费者被阻塞。 - 集群中的
broker
配置replica.fetch.max.bytes
,也遵循与上类似的原则。
2.3 Kafka 硬件的选择
影响 Kafka 性能的一些硬件因素主要包括:磁盘吞吐量(机械硬盘还是固态硬盘)、磁盘容量(1GB 还是 1TB)、内存大小、网络吞吐量、CPU、Kafka 集群。
2.3.1 需要多少个 broker
首先,考虑需要多少磁盘空间来保留数据,以及单个 broker
有多少空间可用;其次要考虑的因素是集群处理请求的能力,因磁盘吞吐量低和系统内存不足造成的性能问题,可以通过扩展多个 broker
来解决。
2.3.2 broker
的配置
要把一个 broker
加入到集群里,只需要修改两个配置参数。
- 所有
broker
都必须配置相同的zookeeper.connect
,该参数指定了用于保存元数据的 Zookeeper 群组和路径。 - 每个
broker
都必须为broker.id
参数设置唯一的值。
2.3.3 操作系统调优
需要关注的是虚拟内存、磁盘以及网络。
2.3.4 生产环境的注意事项
需要注意的是 JVM 垃圾回收器的设置、数据中心的布局以及 Zookeeper 的配置。