3.1 Kafka 概览
Kafka 的应用场景中多,如记录用户活动、记录度量指标、保存日志消息、记录智能家电信息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据等等。多样的应用场景自然也意味着 Kafka 对消息的处理策略会有所不同,如哪些消息允许少量丢失或重复、哪些消息需要有严格的时延以及吞吐量等等。
下图展示了向 Kafka 发送消息的主要步骤,主要包括
创建
ProducerRecord
对象ProducerRecord
对象包含目标主题和要发送的内容,还可以指定键或分区。- 在发送
ProducerRecord
对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器(Partitioner)。
- 如果之前在
ProducerRecord
对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。 - 如果没有指定分区,那么分区器会根据
ProducerRecord
对象的键(Key)来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。 - 紧接着,这条记录被添加到一个记录批次(Batch)里,这个批次里的所有消息会被发送到相同的主题和分区上。
- 之后,一个独立的线程负责把这些记录批次发送到相应的
broker
上。
- 如果之前在
服务器在收到这些消息时会返回响应。
- 如果消息成功写入 Kafka,就返回一个
RecordMetaData
对象,它包含了主题和分区信息,以及记录在分区里的偏移量。 - 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新(Retry)发送消息,几次重试仍然失败,就返回错误信息。
- 如果消息成功写入 Kafka,就返回一个
3.2 创建 Kafka 生产者
在此选择 Kafka 0.10.1.0
中的示例源码,其中 bootstrap.servers
、key.serializer
、value.serializer
必须设置。
1 | private final String topic; // 主题 |
3.3 发送消息
在实例化 Producer
对象后便可以开始发送消息,主要有以下 $3$ 种方式。
- 发送并忘记(
fire-and-forget
):只管把消息发送出去,而不关心其是否正常到达。 - 同步发送(
synchronous send
):返回Future
对象,调用get()
方法进行等待,就可以知道消息是否发送成功。 - 异步发送(
asynchronous send
):指定回调函数,服务器在返回响应时调用回调函数。
1 | // 一个简单的发送消息的例子 |
回调函数,需要实现 org.apache.kafka.clients.producer.Callback
接口,实现 onCompletion
方法。
1 | class DemoCallBack implements Callback { |
3.4 Kafka 的配置
Kafka 官方有详细的生产者配置信息,可前往 Producer 查看,出去刚才提到的 $3$ 个,还有一些比较重要的。
Kafka 可以保证同一个分区里的消息是有序的。
配置项 | 说明 |
---|---|
acks |
指定必须要有多少个分区副本收到消息,生产者才会认为消息时写入成功的。常用的值有 0、1、all 等。 |
buffer.memory |
设置生产者内存缓冲区的大小,生产者用缓冲区缓存要发送到服务器的消息。 |
compression.type |
消息被发送给 broker 之前的压缩算法,可选值为 snappy 、gzip 、lz4 、zstd 。默认为 none 。 |
retries |
生产者重发消息的次数,达到后生产者会放弃重试并返回错误。 |
batch.size |
一个批次可以使用的内存大小,按照字节数计算,而非消息个数。 |
linger.ms |
生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。 |
client.id |
可以是任意字符串。 |
max.in.flight.requests.per.connection |
生产者在收到服务器相应之前可以发送的消息个数。 |
request.timeout.ms |
生产者在发送数据时等待服务器返回响应的时间 |
metadata.fetch.timeout.ms |
生产者在获取元数据时等待服务器返回响应的时间 |
timeout.ms |
broker 等待同步副本返回消息确认的时间 |
max.request.size |
可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。 |
receiver/send.buffer.bytes |
TCP Socket 接收和发送数据包的缓冲区大小。 |
3.5 序列化器
创建生产者对象必须指定序列化器,Kafka 提供了整型和字节数组序列化器,但也可以利用 Avro
序列化器,或者自定义序列化器。
3.5.1 自定义序列化器
假设有一个类 Customer
,它看起来是
1 |
|
那么,Customer
类的序列化器可以是这样的。
1 | public class CustomerSerializer implements Serializer<Customer> { |
但是,这个自定义的序列化器真的很烂。还是使用 Avro 比较好。
3.5.2 使用 Avro 序列化
Apache Arvo 是一种与编程语言无关的序列化格式,由 Doug Cutting 创建,目的是提供一种共享数据文件的方式。数据可被序列化为二进制文件或 JSON 文件,不过一般会使用二进制文件。
当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动。不过有两个需要注意的地方:
- 用于写入数据和读取数据的 schema 必须是相互兼容的。
- 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。
3.5.3 在 Kafka 里使用 Avro
写入数据需要用到的 schema 被保存在注册表里,然后在记录里引用 schema 的标识符。负责读取数据的应用程序使用标识符从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。
关于如何把 Avro 生成的 Avro对象 发送到 Kafka 可参考 Apache Avro Documentation,与前文创建生产者类似,可以在创建生产者时指定序列化方式以及 Schema 的存储位置。
1 | props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); |
如果选择使用一般的 Avro 对象而非生成的 Avro 对象,只需提供 schema 即可。
3.6 分区
前文提到,ProducerRecord
对象包含了 目标主题、键 和 值。Kafka 的消息是一个个的键值对,ProducerRecord
对象可以只包含主题和值,键可以设置为默认的 null
。键有两个用途:
- 作为消息的附加信息
- 被用来决定消息该被写到主题的哪个分区,拥有相同键的消息将被写到同一个分区。
如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取,创建包含键值的 ProducerRecord
如下。
1 | ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA"); |
- 如果键值为
null
并且使用了默认的分区器,记录将被随机发送到主体内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分不到各个分区上。 - 如果键不为
null
并且使用了默认的分区器,Kafka 将会对键进行散列,然后根据散列值把消息映射到特定的分区上。同一个键总是被映射到同一个分区上。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。
下面是一个自定义分区器的例子:
1 | public class BananaPartitioner implements Partitioner { |