4.1 KafkaConsumer 概念

4.1.1 消费者和消费者群组

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。这样可以协调消费者与生产者之间收发消息的不同速率以及 Broker 存储消息的能力。

一个消费者可以接收多个主题分区的消息,但是如下图所示,如果消费者的数量超过主题中分区的数量,那么多余的消费者就会被闲置,不会接收到任何消息。

![5个消费者受到4个分区的消息](/Users/raymond/Library/Application Support/typora-user-images/image-20210425102753887.png)

往群组里增加消费者是横向伸缩消费能力的主要方式,有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者,但是消费者的数量不要超过主题分区的数量。

除横向收缩外,也可以使用多个应用程序从同一个主题读取数据,只要保证每个应用程序都有自己的消费者群组,就可以读取到主题内所有的消息。如下图所示,如果增加 Consumer Group 2,它也不会影响 Consumer Group 1 接收消息主题全部分区的消息。

两个消费者群组对应一个主题

4.1.2 消费者群组和分区再均衡

一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃 时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡(rebalance),它提供了高可用性和伸缩性

再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才 会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

kafka 0.10.1 在版本引入了一个独立的心跳进程,可以在轮询消息的空档发送心跳,使得发送心跳的频率与消息轮询的频率相互独立。在新版本的 Kafka 里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livelock)

分配分区是怎样的一个过程?

  • 消费者要加入群组时,先向群组协调器(group coordinator)发送 JoinGroup 请求。第一个加入群组的消费者将成为群主(leader)
  • 群主协调器获得群组成员列表(包含所有最近发送过心跳的消费者),并为每一个消费者分配分区。群主实现了 PartitionAssignor 接口来决定哪些分区应该被分配给哪个消费者。
  • 分配完毕后,群主分配情况列表发送给群组协调器,协调器再把这些消息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。
  • (再均衡时自然也会发生再分区)

Kafka 的分区分配策略:

  • org.apache.kafka.clients.consumer.RangeAssignor(默认)
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • org.apache.kafka.clients.consumer.StickyAssignor
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor

也可以通过实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口自定义分区策略。

4.2 创建 Kafka 消费者

创建 KafkaConsumer 与创建 KafkaProducer 过程极为相似,需要提供 3 个必要的属性:bootstrap.serverskey.deserializervalue.deserializer。下面是 Kafka 0.10.1 中创建消费者的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final KafkaConsumer<Integer, String> consumer;
private final String topic;

public Consumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);
this.topic = topic;
}

4.3 订阅主题

消费者的 subscribe() 方法接受一个主题列表作为参数。

1
consumeer.subscribe(Collections.singletonList("customerCountries"));

也可以在调用 subscribe 方法时传入正则表达式,用以匹配多个主题。如果创建了新主题,并且主题名与正则表达式相匹配,将会立即触发一次再均衡。要订阅所有与 test 相关的主题,可以

1
consumer.subscribe("test.*");

4.4 轮询(Poll Loop)

消息轮询是消费者的核心 API,一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。消费者代码主要部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
try {
while (true) { // 消费者长期运行,持续轮询。
// poll() 的参数是超时时间,它会在指定的毫秒数内一直等待 broker 返回数据。
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = %s, partition = %s, offset = %d, consumer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
int updateCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updateCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updateCount);

JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
// 关闭后,网络连接和 Socket 也会随之关闭,并立即触发一次 再均衡,
// 而不是等待群组协调器发现它不再发送发送心跳并认定它已死亡,节省了时间。
consumer.close();
}

轮询不只是获取数据那么简单,在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,要确保在轮询期间所做的任何处理工作都应该尽快完成。

线程安全

在同一个群组里,无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。一个消费者应该单独使用一个线程,如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用 Java 的 ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。

4.5 消费者的配置

关于消费者的详细配置可参考 Apache Kafka Consumer Configuration,不再赘述。

4.6 提交和偏移量

每次调用 poll() 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录, 因此可以追踪到哪些记录是被群组里的哪个消费者读取的。Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,相反,消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。

更新分区当前位置的操作被叫做提交。

消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。当消费者发生崩溃或者有新的消费者加入群组时就会触发再均衡,再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的分区。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

  • 如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处理两个偏移量之间的消息会被重复处理。如下图所示。

提交的偏移量小于客户端处理的最后一个消息的偏移量

  • 如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。如下图所示。

提交的偏移量大于客户端处理的最后一个消息的偏移量

4.6.1 自动提交

如果 enable.auto.commit 被设为 true,那 么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔 由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了。

4.6.2 提交当前偏移量

开发者既可以通过控制偏移量提交时间,也可以通过提交偏移量来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。

auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。消费者的 commitSync() API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

要记住:commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理

4.6.3 异步提交

手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这会影响吞吐量,可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。此时可以使用异步提交 API commitAsync(),只管发送提交请求,而无需等待 broker 的响应。

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,如果在服务器出现更大偏移量提交成功的情况下,出现了再均衡,就会出现重复消息。

commitAsync() 支持回调,在 broker 作出响应时会执行回调。

1
2
3
4
5
6
7
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
log.error("Commit failed for offsets {}", offsets, e);
}
}
});

4.6.4 同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

  • 如果一切正常,使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。
  • 如果直接关闭消费者,就没有所谓的”下一次提交“了。使用 commitSync() 方法会一 直重试,直到提交成功或发生无法恢复的错误。

4.6.5 提交特定的偏移量

我们都知道,提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交,或者在批次中间提交偏移量可以使用消费者 API,它允许在调用 commitSync()commitAsync() 方法时传进去希望提交的分区和偏移量的 map

1
2
3
4
5
6
7
8
9
10
11
12
13
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
// ...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic, Partition, Offset, customer, country");
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 1000 == 0) // 每处理 1000 条记录就提交一次偏移量
consumer.commitAsync(currentOffsets, null);
count++;
}
}

4.7 再均衡监听器

上一节提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作。

应该在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量,还可能需要处理缓冲区累积下来的记录、关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时, 可以通过消费者 API 执行程序代码,在调用 subscribe() 方法时传进 ConsumerRebalanceListener 实例,这个接口有两个需要实现的方法。

  • public void onPartitionsRevoked(Collection<TopicPartition> partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用,如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  • public void onPartitionAssigned(Collection<TopicPartition> partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance, Committing current offsets: " + currentOffsets);
consumer.commitSync(currentOffsets);
}
}

public void someMethod() {
try {
consumer.subscribe(Collections.singleton(topic), new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String > record : records) {
System.out.println("record: " + record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
log.error("msg");
} catch (Exception e) {
log.error("Unexpected error ", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}

4.8 从特定偏移量处开始处理记录

poll() 方法从各个分区的最新偏移量开始处理消息,如果想从分区的起始位置,或者直接跳到分区的末尾开始读取消息,可以使用 seekToBeginning(Collection<TopicPartition> tp)seekToEnd(Collection<TopicPartition>) tp 这两个方法。

Kafka 也提供了用于查找特定偏移量的 API,它可以向后回退或向前跳过几个消息。在消费者启动或分配到新分区时,可以使用 seek() 方法查找保存在数据库里的偏移量。使用 ConsumerRebalanceListenerseek() 方法可以确保是从数据库里保存的偏移量所指定的位置开始处理消息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SaveOffsetsOnRebalance  implements ConsumerRebalanceListener {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// commitDBTransactions(); 虚构的数据库事务,保存记录和偏移量。
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
// 虚构方法从数据库获取偏移量
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}

public void someMethod1() {
consumer.subscribe(Collections.singleton(topic), new SaveOffsetsOnRebalance(consumer));
for (TopicPartition partition : consumer.assignment()) {
consumer.seek(partition, getOffsetFromDB(partition));
}

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
for (ConsumerRecord<String, String> record : records) {
// processRecord(record);
// storeRecordInDB(record);
// storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
// commitDBTransaction();
}
}

4.9 如何退出

如果确定要退出 while(true) 循环,需要通过另一个线程调用 consumer.wakeup() 方法。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。要记住,consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll(), 并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用 consumer.close() 是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

优雅退出的完整代码: CloseConsumer

4.10 反序列化器

前文提到,生产者需要用序列化器把对象转换成字节数组再发送给 Kafka。 类似地,消费者需要用反序列化器把从 Kafka 接收到的字节数组转换成 Java 对象。 很显然,生成消息使用的序列化器与读取消息使用的反序列化器应该是一一对应的。

使用 Avro 和 schema 注册表进行序列化和反序列化的优势在于:

  • AvroSerializer 可以保证写入主题的数据与主题的 schema 是兼容的,也就是说,可以使用相应的反序列化器和 schema 来反序列化数据。
  • 另外,在生产者或消费者里出现的任何一个与兼容性有关的错误都会被捕捉到,它们都带有消息描述,也就是说,在出现序列化错误时,就没必要再去调试字节数组了。

不推荐自定义序列化器与反序列化器,序列化与反序列化的逻辑相反,这里也不再赘述了。

4.11 无群组的独立消费者

此前提到的消费者均为属于某个群组的消费者,消费者在加入消费者群组后被自动分配分区,在群组里新增或移除消费者时自动触发再均衡。如果只需要单个消费者,而且不需要它加入消费者群组,那么就不需要让它订阅主题,取而代之的是它为自己分配分区。

一个消费者可以订阅主题并加入消费者群组,或者为自己分配分区,但不能同时做这两件事情。下面代码展示了一个消费者如何为自己分配分区并从分区里读取消息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<PartitionInfo> partitionInfos = null;
// 1. 向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过。
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 2. 知道需要哪些分区后,调用 assign() 方法。
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("record: " + record);
}
consumer.commitSync();
}
}

除了不会发生再均衡,也不需要手动查找分区。不过,如果主题增加了新的分区,消费者不会收到通知。所以,要么周期性地调用 consumer.partitionsFor() 方法来检查是否有分区加入,要么在添加新分区后重启应用程序。

评论