前言

之前已分析清楚 Producer 的消息缓冲机制、网络层协议读写,其发送消息仅涉及 Metadata 和 Produce 两种协议请求,复杂点在缓冲消息的实现。相比之下 Consumer 复杂点在状态管理,会涉及九种协议请求:


一:准备

1.1 Consumer 接口

消费模型不再赘述,可参考《Kafka权威指南》第四章,KafkaConsumer 实现了 20+ 个接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface Consumer<K, V> extends Closeable {
public Set<TopicPartition> assignment(); // 读取当前消费的 tp 集合
public Set<String> subscription(); // 读取当前 subscribe 的 tp 集合

public void subscribe(Collection<String> topics); // subscribe 多个消息,多次调用会覆盖而非取并集
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback); // 指定 rebalance 开始前、结束后要执行的回调
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback); // 正则订阅
public void assign(Collection<TopicPartition> partitions); // 手动指定要消费的 tp
public void unsubscribe(); // 退出消费组,等待重用

public ConsumerRecords<K, V> poll(long timeout); // 拉取消息

public void commitSync(); // 同步提交上次 poll 的消费进度
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets); // 同步提交,指定进度
public void commitAsync(); // 异步提交
public void commitAsync(OffsetCommitCallback callback); // 异步提交,回调处理提交结果
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback); //

public void seek(TopicPartition partition, long offset); // 指定 tp 从 offset 处开始消费
public void seekToBeginning(Collection<TopicPartition> partitions); // 从 earliest 处开始消费
public void seekToEnd(Collection<TopicPartition> partitions); // 从 latest 处开始消费
public long position(TopicPartition partition); // 计算下一条 fetch 到的消息 offset
public OffsetAndMetadata committed(TopicPartition partition); // 读取消费组在该 tp 上的消费进度

public Map<MetricName, ? extends Metric> metrics(); // 指标集
public List<PartitionInfo> partitionsFor(String topic); // 读取 tp 元数据
public Map<String, List<PartitionInfo>> listTopics(); // 列出集群所有 topic 元数据

public void pause(Collection<TopicPartition> partitions); // 暂停指定 tp 后 poll 不返回缓存消息,也不 fetch 新消息
public void resume(Collection<TopicPartition> partitions); // 恢复 tp
public Set<TopicPartition> paused(); // 列出被暂停的 tp

public void close(); // 退出消费组,close 各内部组件
public void wakeup(); // 中断被阻塞的 poll 操作
}

1.2 编程模型

1. 线程安全问题

Producer:场景简单,只提供 send 方法将消息写入内存,底层网络 IO 对用户透明,故在设计上应用了只读类如 Metadata、互斥锁如 BufferPool 内存管理等手段,实现了线程安全,能提高写吞吐

Consumer:不能并发使用,原因如下

  • 瓶颈在网络 IO 而非计算:大部分方法都会发起网络请求,比如 subscribe 会触发 rebalance,其底层至少会串行执行 2 次同步网络 IO
  • 方法调用都不是幂等的,副作用代价高:若并发发起两个 JoinGroup 请求,则 generationId 较低的请求会收到ILLEGAL_GENERATION等错误,这类无效请求会浪费大量 coordinator 的网络资源,代价过高

基于以上原因,实现为单线程,不考虑并发问题,为防止用户误用,各公共方法都加了获取“互斥锁”的前置检查,若“加锁”失败则直接向用户抛出异常,此“锁”使用 CAS 实现,和 JDK 中轻量级锁的思想一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final AtomicLong currentThread = new AtomicLong(-1); // -1: consumer 对象未被任何线程持有
private final AtomicInteger refcount = new AtomicInteger(0); // 使用引用计数模拟重入次数

// 加锁:若调用方法的线程 id,不是持有当前 consumer 对象的线程 id,抛出并发修改异常
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}

// 释放锁:若计数归零,说明持有 consumer 对象的线程已结束最后一个方法调用
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquire(); // 标准使用模式
try { /* ... */
} finally {
release();
}
}
}

2. RequestFuture

背景:Consumer 底层有大量串行执行网络 IO 的场景,如 rebalance 底层是 2 个 JoinGroup, SyncGroup 请求,后者强依赖前者返回的 memberId, generationId 等数据

原因:为避免方法调用嵌套过深导致代码混乱,可使用 Future 搭配 Listeners 来异步处理网络 IO 结果,但在有保证的单线程环境下,使用 juc 的 Future 类不划算,故实现了 RequestFuture,仅用于保存结果或异常

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RequestFuture<T> {
private boolean isDone = false; // future 是否已完成计算,即 future 对象是一次性的
private T value; // 执行成功的执行结果
private RuntimeException exception; // 执行失败时发生的异常

// 使用 onSuccess, onFailure 监听当前 future 的执行情况
private List<RequestFutureListener<T>> listeners = new ArrayList<>();
public void complete(T value) { /*...*/
fireSuccess(); // 执行完毕,触发回调
}
private void fireSuccess() {
for (RequestFutureListener<T> listener : listeners)
listener.onSuccess(value);
}
}

还应用了适配器模式,实现 2 个不同类型的 future 的改造转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class RequestFutureAdapter<F, S> {
public abstract void onSuccess(F value, RequestFuture<S> future); // 恰好同名
public void onFailure(RuntimeException e, RequestFuture<S> future) { future.raise(e); }
}

public class RequestFuture<F> { /*...*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<F, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>();
// 为 future F 添加匿名监听器,将 F 的执行结果传给 adapter,进一步改造成类型为 S 的结果
addListener(new RequestFutureListener<F>() {
@Override
public void onSuccess(F value) {
adapter.onSuccess(value, adapted); // 关键点:adapted 对象被捕捉
}
@Override
public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); }
});
return adapted;
}
}

示意图:适配器在自己的onSuccess中监听类型为 F 的结果,改造成类型为 S 的结果后,完成新 future

image-20200503211642551

3. 网络 IO 回调

在 Producer 中已分析过,NetworkClient 会从 Selector 统一收集四种网络 IO 的结果并处理,处理逻辑由RequestCompletionHandler接口描述,比如 Produce 响应结果的处理由 Sender 中的匿名类指定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface RequestCompletionHandler {
public void onComplete(ClientResponse response); // 处理各协议解析后的响应
}

public class Sender implements Runnable {/*...*/
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
/*...*/
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
// Produce 请求收到响应后,交由 Sender.handleProduceResponse 处理
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
return new ClientRequest(now, acks != 0, send, callback); // callback 与 request 成对绑定
}
}

而 Consumer 组合了 RequestFuture 来扩展了此接口,当收到响应后通知 future

1
2
3
4
5
6
7
8
9
class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler {
@Override
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) /*...*/
raise(DisconnectException.INSTANCE);
else
complete(response);
}
}

以上的 Future、FutureAdapter、FutureCompletionHandler 关系如下

  • complete:仅需读取 ClientResponse 结果即可,没有后续步骤
  • compose:读取 ClientResponse 结果后,执行后续处理,或发起新的请求,返回新的结果类型

JoinGroup -> SyncGroup请求为例,将以上各组件串联起来,流程如下:

image-20200503223009586


二:组件分析

各组件大致关系如下

image-20200503225315411

2.1 SubscriptionState

保存 Consumer 原始订阅 topic,维护被分配的 tp 集合,跟踪各 tp 的消费进度、offset 提交进度;注意部分字段是互斥的,比如 subscription 与 userAssignment,子类 tpState 的 position 与 resetStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SubscriptionState {
private Pattern subscribedPattern; // 正则订阅表达式

// 当前要 consumer subscribe(topics/pattern) 的所有 topic,与用户 assign 互斥
private final Set<String> subscription;

// consumer leader 保存整个组订阅的 topic 集合,用于检测是否有 topic 被删除、分区是否变化、正则匹配到的 topic 数发生变化
private final Set<String> groupSubscription;

// 用户 assign 指定消费的 tp 集合,与 subscription 互斥
private final Set<TopicPartition> userAssignment;

// 无论哪种订阅类型,都表示当前 consumer 被分配的 tp 集合,用于跟踪消费进度
private final Map<TopicPartition, TopicPartitionState> assignment;

// 订阅状态发生变化,需重新发起 rebalance
// 1. 订阅的 topic 变化:新增,或新匹配
// 2. fetch/commit-offset 发现 generation 已过期,则尝试 re-join group
// 3. metadata listener 发现 topic 分区数变化
private boolean needsPartitionAssignment;

// 是否需要从 coordinator 处拉取上次 committed offset
// 1. 新加入消费组成功,消费状态为空
// 2. 异步提交 offset,需刷新提交结果
private boolean needsFetchCommittedOffsets;

private final OffsetResetStrategy defaultResetStrategy; // 默认重置策略,当全新 group 上限或 seek 越界时重置 position

private static class TopicPartitionState {
private Long position; // 上次拉取到消费的 offset,将逐步被 commit 掉
private OffsetAndMetadata committed; // 上次提交给 coordinator 的 offset
private boolean paused; // 是否被暂停消费
private OffsetResetStrategy resetStrategy; // 按 auto.offset.reset 策略重置 position,会清空现有 position
}
}

2.2 ConsumerCoordinator

继承自 AbstractCoordinator,实现与 coordinator broker 的逻辑交互,负责触发 autoCommit 和 heartbeat 定时任务,负责执行 rebalance

1. rebalance

触发条件

  • topic 本身变化:正在订阅的 topic 被删除,或正则消费匹配到了新 topic
  • topic 分区数变化:有 subscribe 的 topic 分区被扩容
  • consumer 上线:新 consumer 加入消费组后主动发起 rebalance
  • consumer 下线:被 coordinator 检测到下线后,coordinator 主动发起 rebalance

流程(直接看代码,不再贴出)

  1. 找到 coordinator:consumer 上线后,向任一 broker 发起 GroupCoordinator 请求,找到负责此 topic 的 coordinator broker

  2. 加入消费组:发起 JoinGroup 请求,coordinator 收集组内各 consumer 各自要订阅的 topic、各自支持的分区 assign 策略,选定其中一个 consumer 作为 leader 返回组员的订阅情况,其余 consumer 作为 follower 只返回 memberId, generationId 等元信息

  3. 分配分区

    • leader consumer:执行大家都支持的最优 assign 策略,计算出分区分布,发起 SyncGroup 请求通知 coordinator,也等待 assign 给自己要消费的分区
    • follower consumer:发起空 SyncGroup 请求,读取响应得知 assign 给自己的分区集合

再之后,leader consumer 持有整个组订阅的 topic 集合,它会负责在 Metadata 周期性过期后拉取集群的所有 topic 进行检查,若 topic 分区数与本地快照不一致,则发起新一轮 rebalance

2. assign 策略

各 consumer 可配置partition.assignment.strategy来指定分配策略,抽象实现如下:

1
2
3
4
5
6
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
// 输入:topic->topic 元数据,consumer-id->List<topic>
// 输出:consumerId->List<tp> // 分配结果,每个 consumer 要消费哪些 tp
public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, List<String>> subscriptions);
}

继承关系:

image-20200509100141587

示例:topic 简写为 T,分区简写为 TP,consumer 简写为 C;假设 groupX 中有 3 个 C,C0 订阅了 T1(3 分区),C1,C2 都订阅了 T1,T2(5 分区)

  • RangeAssignor:分段分配(默认策略)

    • 策略:在 topic 维度,将 tp 分段分配给订阅此 topic 的子集 consumers
    • 算法:假设 topic 有 N 个分区,有 M 个 consumer,则按 consumer-id 排序后,前 N%M 个 consumer 被逐一分配连续的 (N/M+1) 个分区,之后的 consumer 被逐一分配连续的 N/M 个分区
    • 优点:逻辑简单
    • 缺点:要消费大量分区数无法整除的 topic 时,id 靠前的 consumer 将分配到更多分区数,负载不均

    image-20200509105229185

  • RoundRobinAssignor:轮询分配

    • 策略:在 topic 维度,将 tp 轮询分配给订阅此 topic 的子集 consumers
    • 算法:假设 topic 有 N 个分区,有 M 个 consumer,则按 consumer-id 排序后,N 个分区会被逐一分发给 M 个 consumer
    • 优点:逻辑简单
    • 缺点:同样存在负载不均的 case

    image-20200509105327371

  • StickyAssignor:0.11 引入的更合理的分配策略,分区会尽可能分配均匀,同时发生 rebalance 时会尽量保持分配结果与之前一致,具体参考 KIP-54


2.3 ConsumerNetworkClient

背景:NetworkClient 的 inFlightRequests 队列缓存的是已发出、但未收到响应的 ClientRequest,其并不缓存任何还未发出的请求,只负责将 ClientRequest 下发给 Selector 执行网络 IO,多请求的协调由调用方自行调用 ready 来检测实现

  • Producer:Sender 线程调用 ready 筛选出可写节点,才为其生成 Produce 请求
  • Consumer:常见的 rebalance 操作会发出多个请求,使用请求缓冲队列暂存未未发送的请求,再定时出队下发给 NetworkClient 执行发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ConsumerNetworkClient implements Closeable {
/*...*/
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); // 存放心跳任务、自动提交 offset 等任务的优先级队列
private final KafkaClient client;
private final Map<Node, List<ClientRequest>> unsent = new HashMap<>(); // 等待发往各节点的请求队列

public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());

// 放入请求队列,返回 future 异步等待结果,并不会立刻执行网络请求
put(node, new ClientRequest(time.milliseconds(), true, send, future));
return future;
}
}

另外还实现了 5 个带超时重载的 poll 操作,来下发请求给 NetworkClient,核心重载为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void poll(long timeout, long now, boolean executeDelayedTasks) {
trySend(now); // 将各 node 请求队列中的请求下发给 NetworkClient 准备发送

timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); // 最快到期的定时任务超时时间
clientPoll(timeout, now); // 执行网络 IO

// 若上边的 clientPoll 检测到有断开的连接,必须及时清空连接断开的节点的 unsent 请求(complete 超时异常)
// 否则队列里还有请求,会导致第二个 trySend 触发很大概率会失败的重连
now = time.milliseconds();
checkDisconnects(now);

if (executeDelayedTasks)
delayedTasks.poll(now); // 定时任务出队执行

// 可能上方 clientPoll 新建了连接,尝试重新下发给 selector,下一轮 poll 将尽快发出
trySend(now);

// 重要:若有请求超过 request.timeout.ms 还得不到执行,则及时抛出超时异常
failExpiredRequests(now);
}

其中定时任务实现为 DelayedTaskQueue 队列,底层是一个到期时间为权重的最小堆(优先级队列)

1
2
3
4
5
6
7
8
9
10
11
public class DelayedTaskQueue {
private PriorityQueue<Entry> tasks;
private static class Entry implements Comparable<Entry> {
DelayedTask task; // 带时间参数的 Runnable
long timeout;
@Override
public int compareTo(Entry entry) {
return Long.compare(timeout, entry.timeout); // 超时时间戳越小,越紧急,越靠近堆顶
}
}
}

有 2 个实现 DelayedTask 的定时任务,会发起 OffsetCommit 或 Heartbeat 请求,并根据请求结果重置任务的下次超时时间

  • AutoCommitTask:定时auto.commit.interval.ms提交当前 consumer 消费的所有 tp 的进度
  • HeartbeatTask:定时heartbeat.interval.ms提交心跳请求

2.4 Fetcher

由用户触发,负责发起 Fetch 请求,拉取各 tp 的消息并缓存在本地,返回给用户后更新各 tp 的消费进度,核心方法是 sendFetches 并发地向各 broker 发起 Fetch 请求,解析响应并放入本地队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Fetcher<K, V> {
private final List<PartitionRecords<K, V>> records; // poll 到的消息缓冲
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
final FetchRequest fetch = fetchEntry.getValue();
client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse response) {
handleFetchResponse(response, fetch); // 解析消息并放入 records 缓冲
}/*...*/
});
}
}

// 用户调用 poll 会先读取 fetcher 的缓冲区,有消息就返回
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
/*...*/
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); /*...*/
int maxRecords = maxPollRecords; // max.poll.records 配置只会与 fetcher 交互
Iterator<PartitionRecords<K, V>> iterator = records.iterator();
while (iterator.hasNext() && maxRecords > 0) {
PartitionRecords<K, V> part = iterator.next(); // 读取某个 tp 在之前 fetch 到的多条消息
maxRecords -= append(drained, part, maxRecords); // 更新 consumer 在该 tp 上的消费进度
if (part.isConsumed())
iterator.remove();
}
return drained;
}
}

总结

Consumer 为单线程,复杂之处在于各种请求如何通过更新 SubscriptionState 状态来相互配合,比如完整的 rebalance 流程中 SyncGroup 和 OffsetFetch 都会更新该状态,之后的 Fetch 则基于此状态拉取消息,再之后的 OffsetCommit 请求则基于此状态提交各 tp 的消费进度等等

本文梳理清楚了 Consumer 的编程模型、四个关键组件及其主要操作、各请求的大致流程,但并未梳理面向用户的方法实现、各种请求的 error code 如何处理等问题,参考源码及其注释更为直观