当前位置:主页 > 人工智能 > 正文
Kafka集群突破百万partition的技术探索
来源:InfoQ作者:丁俊2020-02-22 09:55:26

前言

对于小业务量的业务,往往多个业务共享kafka集群,随着业务规模的增长需要不停的增加topic或者是在原topic的基础上扩容partition数,另外一些后来大体量的业务在试水阶段也可能不会部署独立的集群,当业务规模爆发时,需要迅速扩容扩容集群节点。在不牺牲稳定性的前提下单集群规模有限,常常会碰到业务体量变大后无法在原集群上直接进行扩容,只能让业务创建新的集群来支撑新增的业务量,这时用户面临系统变更的成本,有时由于业务关联的原因,集群分开后涉及到业务部署方案的改变,很难短时间解决。

为了快速支持业务扩容,就需要我们在不需要业务方做任何改动的前提下对集群进行扩容,大规模的集群,往往意味着更多的partition数,更多的broker节点,下面会描述当集群规模增长后主要面临哪些方面的挑战:

挑战

ZK节点数

Kafka的topic在broker上是以partition为最小单位存放和进行复制的,因此集群需要维护每个partition的Leader信息,单个partition的多个副本都存放在哪些broker节点上,处于复制同步状态的副本都有哪些。为了存放这些元数据,kafka集群会为每一个partition在zk集群上创建一个节点,partition的数量直接决定了zk上的节点数。

假设集群上有1万个topic,每个topic包含100个partition,则ZK上节点数约为200多万个,快照大小约为300MB,ZK节点数据变更,会把数据会写在事务日志中进行持久化存储,当事务日志达到一定的条目会全量写入数据到持久化快照文件中,partition节点数扩大意味着快照文件也大,全量写入快照与事务日志的写入会相互影响,从而影响客户端的响应速度,同时zk节点重启加载快照的时间也会变长。

Partition复制

Kafka的partition复制由独立的复制线程负责,多个partition会共用复制线程,当单个broker上的partition增大以后,单个复制线程负责的partition数也会增多,每个partition对应一个日志文件,当大量的partition同时有写入时,磁盘上文件的写入也会更分散,写入性能变差,可能出现复制跟不上,导致ISR频繁波动,调整复制线程的数量可以减少单个线程负责的partition数量,但是也加剧了磁盘的争用。

Controller切换时长

由于网络或者机器故障等原因,运行中的集群可能存在controller切换的情况,当controller切换时需要从ZK中恢复broker节点信息、topic的partition复制关系、partition当前leader在哪个节点上等,然后会把partition完整的信息同步给每一个broker节点。

在虚拟机上测试,100万partition的元数据从ZK恢复到broker上约需要37s的时间,100万partition生成的元数据序列化后大约80MB(数据大小与副本数、topic名字长度等相关),其他broker接收到元数据后,进行反序列化并更新到本机broker内存中,应答响应时间约需要40s(测试时长与网络环境有关)。

Controller控制了leader切换与元数据的下发给集群中其他broker节点,controller的恢复时间变长增加了集群不可用风险,当controller切换时如果存在partition的Leader需要切换,就可能存在客户端比较长的时间内拿不到新的leader,导致服务中断。

broker上下线恢复时长

日常维护中可能需要对broker进行重启操作,为了不影响用户使用,broker在停止前会通知controller进行Leader切换,同样broker故障时也会进行leader切换,leader切换信息需要更新ZK上的partition状态节点数据,并同步给其他的broker进行metadata信息更新。当partition数量变多,意味着单个broker节点上的partitiion Leader切换时间变长。

通过上述几个影响因素,我们知道当partition数量增加时会直接影响到controller故障恢复时间;单个broker上partition数量增多会影响磁盘性能,复制的稳定性;broker重启Leader切换时间增加等。当然我们完全可以在现有的架构下限制每个broker上的partition数量,来规避单broker上受partition数量的影响,但是这样意味着集群内broker节点数会增加,controller负责的broker节点数增加,同时controller需要管理的partition数并不会减少,如果我们想解决大量partition共用一个集群的场景,那么核心需要解决的问题就是要么提升单个controller的处理性能能力,要么增加controller的数量。

解决方案

单ZK集群

从提升单个controller处理性能方面可以进行下面的优化:

并行拉取zk节点

Controller在拉取zk上的元数据时,虽然采用了异步等待数据响应的方式,请求和应答非串行等待,但是单线程处理消耗了大约37s,我们可以通过多线程并行拉取元数据,每个线程负责一部分partition,从而缩减拉取元数据的时间。 在虚拟机上简单模拟获取100万个节点数据,单线程约花费28s,分散到5个线程上并行处理,每个线程负责20万partition数据的拉取,总时间缩短为14s左右(这个时间受虚拟机本身性能影响,同虚拟机上如果单线程拉取20万partition约只需要6s左右),因此在controller恢复时,并行拉取partition可以明显缩短恢复时间。

变更同步元数据的方式

上文中提到100万partition生成的元数据约80MB,如果我们限制了单broker上partition数量,意味着我们需要增加broker的节点数,controller切换并行同步给大量的broker,会给controller节点带来流量的冲击,同时同步80MB的元数据也会消耗比较长的时间。因此需要改变现在集群同步元数据的方式,比如像存放消费位置一样,通过内置topic来存放元数据,controller把写入到ZK上的数据通过消息的方式发送到内置存放元数据的topic上,broker分别从topic上消费这些数据并更新内存中的元数据,这类的方案虽然可以在controller切换时全量同步元数据,但是需要对现在的kafka架构进行比较大的调整(当然还有其他更多的办法,比如不使用ZK来管理元数据等,不过这不在本篇文章探讨的范围内)。

那有没有其他的办法,在对kafka架构改动较小的前提下来支持大规模partition的场景呢?我们知道kafka客户端与broker交互时,会先通过指定的地址拉取topic元数据,然后再根据元数据连接partition相应的Leader进行生产和消费,我们通过控制元数据,可以控制客户端生产消费连接的机器,这些机器在客户端并不要求一定在同一个集群中,只需要客户端能够拿到这些partition的状态信息,因此我们可以让不同的topic分布到不同的集群上,然后再想办法把不同集群上的topic信息组合在一起返回给客户端,就能达到客户端同时连接不同集群的效果,从客户端视角来看就就是一个大的集群。这样不需要单个物理集群支撑非常大的规模,可以通过组合多个物理集群的方式来达到支撑更大的规模,通过这种方式,扩容时不需要用户停机修改业务,下面我们就来描述一下怎么实现这种方案:

小集群组建逻辑集群

当我们需要组建逻辑集群时,有几个核心问题需要解决:

1、当客户端需要拉取元数据时,怎么把多个小的物理集群上的元数据组装在一起返回给客户端;

2、不同集群上的元数据变更时怎么及时地通知变更;

3、多个集群上保存消费位置和事务状态的topic怎么分布。 下面针对这些问题一一进行讲解:

metadata服务

针对metadata组装问题,我们可以在逻辑集群里的多个物理集群中选一个为主集群,其他集群为扩展集群,由主集群负责对外提供metadata、消费位置、事务相关的服务,当然主集群也可以同时提供消息的生产消费服务,扩展集群只能用于业务消息的生产和消费。我们知道当partition的Leader切换时需要通过集群中的controller把新的metadata数据同步给集群中的broker。当逻辑集群是由多个相互独立的物理集群组成时,controller无法感知到其他集群中的Broker节点。

我们可以对主集群中的metada接口进行简单的改造,当客户端拉取metadata时,我们可以跳转到其他的集群上拉取metadata,然后在主集群上进行融合组装再返回给客户端。

虽然跳转拉取metadata的方式有一些性能上的消耗,但是正常情况下并不在消息生产和消费的路径上,对客户端影响不大。通过客户端拉取时再组装metadata,可以规避跨物理集群更新metadata的问题,同时也能够保证实时性。

消费分组与事务协调

当消费分组之间的成员需要协调拉取数据的partition时,服务端会根据保存消费位置topic的partition信息返回对应的协调节点,因此我们在一个逻辑集群中需要确定消费位置topic分布的集群,避免访问不同物理集群的节点返回的协调者不一样,从不同集群上拉取到的消费位置不一样等问题。我们可以选主集群的broker节点提供消费和事务协调的服务,消费位置也只保存在主集群上。

通过上述的一些改造,我们就可以支持更大的业务规模,用户在使用时只需要知道主集群的地址就可以了。

组建逻辑集群除了上述的核心问题外,我们也需要关注topic的分配,由于腾讯云的ckafka本身就会把broker上创建topic的请求转发给管控模块创建,因此可以很方便的解决topic在多个物理集群的分布,也可以规避同一逻辑集群上,不同物理集群内可能出现同名topic的问题。

单物理集群分裂

前面讲述了多个物理集群怎么组建成单个逻辑集群,有时可能面临一个问题,就是单个物理集群由于一些原因需要在现有的topic上不断的扩充partition,如果多个topic同时需要扩容可能出现单个物理集群过大的情况,因此需要对现有的集群进行分裂,一个物理集群拆分成两个物理集群。

进行集群的分裂涉及到ZK集群的分裂和对broker节点进行分组拆分,首先对集群中的broker节点分成两组,每组连接不同的ZK节点,比如我们可以在原来的zk集群中增加observer节点,新增的broker为一组,原来集群中的broker为一组,我们让新broker只填写observer的地址。ZK集群分裂前,通过KAFKA内置迁移工具可以很方便地把不同的topic迁移到各自的broker分组上,同一个topic的partition只会分布在同一个分组的broker节点上,后续把observer节点从现有的ZK集群中移除出去,然后让observer与别的ZK节点组成新的ZK集群,从而实现kafka集群的分裂。

结束语

通过提升controller的性能,和通过把多个物理集群组装成一个逻辑集群的做法都可以提升单集群承载partition的规模。但是相比而言,通过组建多个物理集群的方式对kafka现有的架构改动更小一些,故障恢复的时间上更有保障一些,服务更稳定。

当然业务在使用kafka服务时,如果业务允许保持一个partition数量适度的集群规模,通过业务拆分的方式连接不同的集群也是一种很好的实践方式。

本篇文章主要从元数据,controller逻辑等方面介绍了如何解决支撑百万partition的问题,运营大规模集群其实还涉及到磁盘故障,冷读,数据均衡等数据方面的问题,监控和报警服务同样非常的重要。腾讯云的CKAFKA团队一直在不断探索,致力于为用户提供可靠的消息服务。

作者简介

丁俊,主要从事消息、缓存、NOSQL等基础设施的研发,目前就职于腾讯云。


[责任编辑:陆熹]