最近做了一个 debezium 流水线升级的工作,遇到了挺多问题,记录一下。
事情的起因是最近几个礼拜我们的多个 postgres 实例遇到了磁盘空间不足的问题,看下来并不是我们数据量很大,而是 WAL 日志占据了很大的空间,有些实例 WAL 占用空间是数据的十倍以上,这显然是不正常的情况。即使凭借我浅显的 postgres 知识,我也很快联想到这大概和我们的 debezium 流水线有关。我们的服务的数据写入量并不大,WAL 正常情况下不会占据很大空间,但开启了主从同步,并且延迟很大的话,WAL 就会积攒起来,毕竟总得存着等从库消费完才能删掉,这应该就是我们的问题。
第一个 postgres 实例出现问题的时候,是个很简单的情况:这是个测试实例,之前有 debezium 从这个实例捕捉变动,后来迁走了。但是,我们忘记了删除 replication slot,于是 WAL 从 debezium 停止开始就一直不停积累,终于磁盘不足了。这很好解决,既然迁走不用了,就删掉 replication slot,然后 WAL 就会被自动清理掉,磁盘占用率也就恢复了。
第二个 postgres 实例出现问题的情况就复杂了一些,这正是前面 debezium 流水线后读的数据库,它显然是在不停消费的,那就没道理 WAL 会不停的积累。因为这是测试库,为了不让数据库因为磁盘满而完全不可用,我先停掉了 debezium 服务,然后删除了 replication slot,就和之前的实例一样,磁盘空间被顺利释放出来。之后我重新开启 debezium 观察,可以发现,WAL 确实会以比较快的速度积攒,估计一两天就会撑满磁盘。然而看 debezium 服务,它看起来是正常工作的,输出的 kafka topic 里可以看到有数据。那这就和之前的问题不一样了,并不是不消费才导致的 WAL 积累。
重新搜索了一下,看了 debezium 的文档,原来这个问题是有详细解释的。简单来说就是,并不是不消费,而是需要同步的库流量太少,而 WAL 是针对整个实例而言的,如果其他库写入量很大,产生的 WAL 就很大,但是需要同步的库很久没有变动,debezium 也就没法记录下来,这样就会导致 WAL 积累。解决的办法文档里也提了,需要用心跳机制来保证 WAL 有变动,这样就不会积累了。那就需要配置 heartbeat.interval.ms
和 heartbeat.action.query
两个参数,前一个是心跳包发送间隔,每次心跳会往 sink 里写一条消息。后一个可以配置一条 SQL,内容是随便的一些写入操作即可,比如更新任意表的一条记录,只要保证他能触发 replication 点的更新即可。于是我就配了这俩,但是……看起来不生效。
这时候需要再回顾一下背景了,我们的 debezium 流水线是一年多前搭建的,用的是 debezium 1.6,server 模式。也就是在一个 pod 中启动 debezium server,它从一个 postgres 上捕捉变动,写到 kafka 中。现在的 debezium 最新版本是 2.0,我也不清楚 heartbeat 不生效的原因是啥,也许是版本太老了?或者说,有问题的话,我们总是应该升级到新版再看,毕竟有 bug 也不会修在旧版上。于是,我就想这次干脆把 debezium 升级到 2.0 吧。
然而升级也不是那么容易,因为我们并不是用的完全原版的 debezium。我们搭建这套流水线时,debezium 支持的 offset storage 并不多,没有我们想要的 redis 实现,为此我还自己写了一个。并且这个 debezium server 也是我重新构建的,就为了把我写的 redis offset storage 放进去。然而现在 debezium 有自带的 redis offset storage 实现了,所以直接用官方的 debezium/server 镜像就可。可是……已经记录的 offset 怎么办呢?官方的 redis offset storage 肯定跟我的实现是不兼容的,这个需要迁移。还好,看了下 io.debezium.server.redis.RedisOffsetBackingStore
的代码,很好理解,可以照着它写入 redis 的方式手动写入,我试了试,确实可以把 offset 迁移过去。
重新启动新版 debezium 的时候,先遇到了第一个问题:wal2json
插件不支持了,现在必须用 pgoutput
或者 decoderbufs
。这个回想了一下,应该是因为我们之前 postgres 版本过低,不支持 pgoutput
,所以不得不用 wal2json
。现在这个 postgres 已经是 10+ 了,所以那就换成 pgoutput
。再次重启,发现还是启动不了,原来是 replication slot 里记着插件类型,只修改 debezium 配置是不行的,所以需要手动删除 replication slot,然后再重启。这次终于启动成功了。
之后遇到了一个新的问题,写到 kafka 的时候一直报 CORRUPT_MESSAGE
。这个问题我搜了一圈,一般的说法是,topic 的 cleanup.policy=compact
时,如果写入的消息没有 key,就会出现此错误。然而我看了下,我们并没有设置 cleanup.policy
,我们写入的消息也一定有 key……另外这个错误看起来是 kafka broker 抛给 debezium 的,日志里没有什么别的有用的内容了,而我们用的 kafka 是阿里云的,看不了它自己的日志,这就很尴尬,无从查起了。最后还是搜了好久,找到阿里云的一篇文档,要设置 enable.idempotence=false
。看起来是 debezium 从 1.6 到 2.0 的过程中,升级了 kafka 客户端的版本,它默认用了阿里云不支持的幂等功能。debezium 的 kafka sink 是支持配置参数透传的(kafka 的参数实在是太多了),所以在 applicaiton.properties
里配置 debezium.sink.kafka.producer.enable.idempotence=false
就可以了。
最后我们的 debezium 终于正常工作了,观察了一下 WAL 占用空间的情况,基本上一直保持在一个很低的水平,符合预期。至此升级工作顺利结束。