3.1 Kafka 概览

Kafka 的应用场景中多,如记录用户活动、记录度量指标、保存日志消息、记录智能家电信息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据等等。多样的应用场景自然也意味着 Kafka 对消息的处理策略会有所不同,如哪些消息允许少量丢失或重复、哪些消息需要有严格的时延以及吞吐量等等。

下图展示了向 Kafka 发送消息的主要步骤,主要包括

  • 创建 ProducerRecord 对象

    • ProducerRecord 对象包含目标主题和要发送的内容,还可以指定键或分区。
    • 在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
  • 接下来,数据被传给分区器(Partitioner)

    • 如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。
    • 如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键(Key)来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。
    • 紧接着,这条记录被添加到一个记录批次(Batch)里,这个批次里的所有消息会被发送到相同的主题和分区上。
    • 之后,一个独立的线程负责把这些记录批次发送到相应的 broker 上。
  • 服务器在收到这些消息时会返回响应。

    • 如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。
    • 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新(Retry)发送消息,几次重试仍然失败,就返回错误信息。

Kafka生产者组件图

3.2 创建 Kafka 生产者

在此选择 Kafka 0.10.1.0 中的示例源码,其中 bootstrap.serverskey.serializervalue.serializer 必须设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final String topic; // 主题
private final Boolean isAsync; // 发送消息方式:同步、异步。

public Producer(String topic, Boolean isAsync) {
Properties props = new Properties(); // 1. 新建 Peoperties 对象
props.put("bootstrap.servers", "localhost:9092"); // 2. 指定 broker 地址清单,可多个。
props.put("client.id", "DemoProducer");
// 3. 指定 Key 与 Value 的序列化方式。
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}

3.3 发送消息

在实例化 Producer 对象后便可以开始发送消息,主要有以下 $3$ 种方式。

  • 发送并忘记(fire-and-forget):只管把消息发送出去,而不关心其是否正常到达。
  • 同步发送(synchronous send):返回 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
  • 异步发送(asynchronous send):指定回调函数,服务器在返回响应时调用回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 一个简单的发送消息的例子
public void run() {
int messageNo = 1; // 消息编号
while (true) {
String messageStr = "消息-" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // 异步发送:发送后无序等待直接返回,所以需要传入回调函数 Callback 待消息成功后回调提醒。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // 同步发送,返回的是 Future 对象,使用 get() 方法获取返回内容。
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("发送消息: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo; // 每发送完一条信息后计数器 +1。
}
}

回调函数,需要实现 org.apache.kafka.clients.producer.Callback 接口,实现 onCompletion 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DemoCallBack implements Callback {

private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}

3.4 Kafka 的配置

Kafka 官方有详细的生产者配置信息,可前往 Producer 查看,出去刚才提到的 $3$ 个,还有一些比较重要的。

Kafka 可以保证同一个分区里的消息是有序的。

配置项 说明
acks 指定必须要有多少个分区副本收到消息,生产者才会认为消息时写入成功的。常用的值有 0、1、all 等。
buffer.memory 设置生产者内存缓冲区的大小,生产者用缓冲区缓存要发送到服务器的消息。
compression.type 消息被发送给 broker 之前的压缩算法,可选值为 snappygziplz4zstd。默认为 none
retries 生产者重发消息的次数,达到后生产者会放弃重试并返回错误。
batch.size 一个批次可以使用的内存大小,按照字节数计算,而非消息个数。
linger.ms 生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。
client.id 可以是任意字符串。
max.in.flight.requests.per.connection 生产者在收到服务器相应之前可以发送的消息个数。
request.timeout.ms 生产者在发送数据时等待服务器返回响应的时间
metadata.fetch.timeout.ms 生产者在获取元数据时等待服务器返回响应的时间
timeout.ms broker 等待同步副本返回消息确认的时间
max.request.size 可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。
receiver/send.buffer.bytes TCP Socket 接收和发送数据包的缓冲区大小。

3.5 序列化器

创建生产者对象必须指定序列化器,Kafka 提供了整型和字节数组序列化器,但也可以利用 Avro 序列化器,或者自定义序列化器。

3.5.1 自定义序列化器

假设有一个类 Customer,它看起来是

1
2
3
4
5
@Data
public class Customer {
private int customerID;
private String customerName;
}

那么,Customer 类的序列化器可以是这样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// Nothing to configure.
}

@Override
/**
* Customer 对象被序列化成:
* 表示 customerID 的 4 字节整数
* 表示 customerName 长度的 4 字节整数(为空时为 0)
* 表示 customerName 的 N 个字节
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null) {
return null;
} else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);

return buffer.array();
} catch (Exception e) {
throw new SerializationException("序列化失败:" + e);
}
}

@Override
public void close() {
// Nothing to close.
}
}

但是,这个自定义的序列化器真的很烂。还是使用 Avro 比较好。

3.5.2 使用 Avro 序列化

Apache Arvo 是一种与编程语言无关的序列化格式,由 Doug Cutting 创建,目的是提供一种共享数据文件的方式。数据可被序列化为二进制文件或 JSON 文件,不过一般会使用二进制文件。

当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动。不过有两个需要注意的地方:

  • 用于写入数据和读取数据的 schema 必须是相互兼容的。
  • 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。

3.5.3 在 Kafka 里使用 Avro

写入数据需要用到的 schema 被保存在注册表里,然后在记录里引用 schema 的标识符。负责读取数据的应用程序使用标识符从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。

Avro记录的序列化与反序列化流程图

关于如何把 Avro 生成的 Avro对象 发送到 Kafka 可参考 Apache Avro Documentation,与前文创建生产者类似,可以在创建生产者时指定序列化方式以及 Schema 的存储位置。

1
2
3
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

如果选择使用一般的 Avro 对象而非生成的 Avro 对象,只需提供 schema 即可。

3.6 分区

前文提到,ProducerRecord 对象包含了 目标主题。Kafka 的消息是一个个的键值对ProducerRecord 对象可以只包含主题和值可以设置为默认的 null有两个用途:

  • 作为消息的附加信息
  • 被用来决定消息该被写到主题的哪个分区,拥有相同键的消息将被写到同一个分区。

如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取,创建包含键值的 ProducerRecord 如下。

1
2
3
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
// 如果要创建键为 null 的消息,不指定键即可,第一个参数为主题 Topic。
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
  • 如果键值为 null 并且使用了默认的分区器,记录将被随机发送到主体内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分不到各个分区上。
  • 如果键不为 null 并且使用了默认的分区器,Kafka 将会对键进行散列,然后根据散列值把消息映射到特定的分区上。同一个键总是被映射到同一个分区上。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。

下面是一个自定义分区器的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {
// Nothing to configure.
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceof String)))
throw new InvalidRecordException("Some shit.");
if (((String) key).equals("Banana"))
return numPartitions; // Banana 总是被分配到最后一个分区。

return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}

public void close() {
// Nothing to close.
}
}

评论