2.1 安装 Kafka Broker

在安装 Zookeeper 和 Kafka 之前,需要先安装 Java 环境,因为 Kafka 许多功能使用 Java 语言编写,如果运行源码时还需要安装 Scala。

Zookeeper 用于保存集群的元数据信息和消费者信息。

KafkaAndZookeeper

在 MacOS 环境下安装,使用 Homebrew 只需一条命令即可,在安装 Kafka 时 Homebrew 会自动下载相应的依赖,自然也包括 Zookeeper、OpenJDK。

1
$ brew install kafka

Homebrew 会将 Kafka 安装到 /usr/local/Cellar 目录,不过某些文件会被链接到其他目录。

  • 二进制文件和脚本文件在 /usr/local/bin 目录下。
  • Kafka 配置文件在 /usr/local/etc/kafka 目录下。
  • Zookeeper 配置文件在 /usr/local/etc/zookeeper 目录下,需要配置 zoo.cfg
  • log.dirs (Kafka 的数据目录)被设置为 /usr/local/var/lib/kafka-logs

安装完毕后,就可以启动并测试 Zookeeper 和 Kafka了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
$ brew install telnet
$ telnet localhost 2181
Trying ::1...
Connected to localhost.
Escape character is '^]'.
srvr
Zookeeper version: 3.5.8...
Latency min/avg/max: 0/10/44
Received: 80
Sent: 81
Connections: 2
Outstanding: 0
Zxid: 0x1d
Mode: standalone
Node count: 27
Connection closed by foreign host.
# 创建 Topic
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
# 验证 Topic
$ kafka-topics --zookeeper localhost:2181 --describe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
# 往测试 Topic 上发送消息,输入任意消息然后 Ctrl+C 退出,使用 Consumer 消费信息即可。
$ kafka-console-producer --broker-list localhost:9092 --topic test
>Test Message 1
>Test Message 2
^C
# 从测试 Topic 上消费消息
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

Zookeeper 集群被称为群组,使用的是一致性协议,因此建议集群中包含奇数个节点,因为只有当群组里的大多数节点处于可用状态,Zookeeper 才能处理外部的请求。

2.2 Broker 配置

更为详细的配置信息可参考 Kafka 官网:Configuration

2.2.1 常规配置

位于 /usr/local/etc/kafka/server.properties

配置项 说明
broker.id Kafka 集群内需唯一,默认值为 0。
port 默认 9092,若设置到 1024 以下需要 root 权限启动。
zookeeper.connect 默认 2181,冒号分割的一组 host:port/path
log.dirs 逗号分隔的本地文件系统路径,broker 会根据“最少使用”原则把同一个分区的日志片段保存到同一个路径下。
num.recovery.threads.per.data.dir 每个数据目录的线程数,所配置的数字对应的是 log.dirs 指定的单个日志目录。
auto.create.topics.enable 是否启动自动创建主题

num.recovery.threads.per.data.dir:Kafka 使用可配置的线程池来处理日志片段的几种情况

  • 服务器正常启动,用于打开每个分区的日志片段;
  • 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
  • 服务器正常关闭,用于关闭日志片段。

auto.create.topics.enable:Kafka 自动创建主题的几种情况

  • 当一个 ProducerTopic 写入消息时;
  • 当一个 ConsumerTopic 读取消息时;
  • 当任意一个 ClientTopic 发送 Metadata 请求时。

2.2.2 主题的默认配置

配置项 说明
num.partitions 新建主题所包含的分区个数,默认值为 1
log.retention.hours 数据被保留的时间,默认 168 小时,即一周,也可将hours换成 minutesms
log.retention.bytes 通过消息字节数判断消息是否过期,作用在每一个分区上。
log.segment.bytes 关闭日志片段的阈值,默认 1GB,超过此值将开启新的日志片段。
log.segment.ms 关闭日志片段的阈值,以时间为单位。与log.segment.bytes 非互斥关系。
message.max.bytes 单条消息被压缩后的最大阈值,默认为 1 000 000,即 1MB,超过则会被拒绝。

Kafka 集群通过 Partition 实现对 Topic 的横向扩展,当有新的 Broker 加入集群时,可以通过 Partition 来实现集群的负载均衡。那么,如何确定分区的数量?

  • Topic 需要多大的吞吐量?
  • 单个分区读取数据的最大吞吐量是多少?
  • 生产者向单个分区写入数据的吞吐量是多少?
  • 每个 broker 包含的分区个数、可用的磁盘空间和网络带宽。
  • 可以使用 主题吞吐量/消费者吞吐量 计算分区个数。

消息何时会被清除?

  • 通过 log.retention.hours/minutes/mslog.retention.bytes 确定,当消息条件满足任意条件之一时都将会被删除。
  • 日志片段被关闭之前消息是不会过期的,即在达到 log.segment.bytes 设置的阈值之后日志片段关闭,然后再达到 log.retention.xxxx 条件之后消息才会真正过期。

在服务端和客户端之间协调消息大小的配置

  • 消费者客户端所设置的 fetch.message.max.bytes 需要与服务器设置的消息大小 message.max.bytes 所协调,如果这个值比 message.max.bytes 小,那么消费者将无法读取较大的消息,导致消费者被阻塞。
  • 集群中的 broker 配置 replica.fetch.max.bytes,也遵循与上类似的原则。

2.3 Kafka 硬件的选择

影响 Kafka 性能的一些硬件因素主要包括:磁盘吞吐量(机械硬盘还是固态硬盘)、磁盘容量(1GB 还是 1TB)、内存大小、网络吞吐量、CPU、Kafka 集群。

一个简单的Kafka集群

2.3.1 需要多少个 broker

首先,考虑需要多少磁盘空间来保留数据,以及单个 broker 有多少空间可用;其次要考虑的因素是集群处理请求的能力,因磁盘吞吐量低和系统内存不足造成的性能问题,可以通过扩展多个 broker 来解决。

2.3.2 broker 的配置

要把一个 broker 加入到集群里,只需要修改两个配置参数。

  • 所有 broker 都必须配置相同的 zookeeper.connect,该参数指定了用于保存元数据的 Zookeeper 群组和路径。
  • 每个 broker 都必须为 broker.id 参数设置唯一的值。

2.3.3 操作系统调优

需要关注的是虚拟内存、磁盘以及网络

2.3.4 生产环境的注意事项

需要注意的是 JVM 垃圾回收器的设置、数据中心的布局以及 Zookeeper 的配置。

评论