RocksDB 是目前最流行的轻量级嵌入式 KV 数据库,Flink 也基于 RocksDB 开发了 RocksDBStateBackend 来作大数据量下的状态存储。
背景
RocksDB 经过很多的实践证明,其本身很不错,但是作为一个标准的 KV 数据库,用在 Flink 做状态存储中,很多场景都捉襟见肘。当然,目前 Flink 也并没有提供出更好的替代方案,但是通过阅读本文可以帮助开发者更好地理解在 RocksDBStateBackend 下遇到的很多问题。
RocksDB
先简单了解一下 RocksDB,
RocksDB In Flink
RocksDB 在性能上的目标:
- 点查和范围查找都有不错的性能。
- 快速的随机读。
- 快速的更新性能。
- 能够通过调整参数来适应不同的硬件。
基本数据结构:
- memtable: 内存中的存储结构,新写入的数据在完成 WAL 后会插入 memtable。
- sstfile: 磁盘上的存储文件,memtable 中的数据会 flush 到磁盘上。
- logfile: 当 memtable 数据丢失时可以通过 logfile 恢复数据。
当新数据写入时,数据会先进入 logfile,然后同步写入到 memtable 中,等待一定条件触发后(如 memtable 达到容量限制),持久化到磁盘上。在不同的 sstfiles 之间,会定期出现 compaction 来减少 sstfile 文件的个数以及合并对同一 key 的操作。如果你了解 HBase 中的 Memstore, HFile 和 WAL 机制,那么这个机制对你来说应该很熟悉。他们都是利用了 LSM-Tree 的思想。
Compaction
RocksDB 提供了三种 Compaction:
- Level Style Compaction(default): 将不同的 sstfile 分级,越老的文件层级越高,定期将小文件向上一个层级的文件合并。这样,会有更好的查询性能,减少了读放大,提供了多线程的实现。
- Universal Style Compaction: 合并同一大小的文件,减少了写放大。
- FIFO Style Compaction: 这种 Compaction 实现的比较粗糙,会出现 evict 数据的情况。
Compaction 部分对于有实时场景的数据存储来说,并不是新鲜事儿。不管是在 Cassandra 还是 HBase 我们都能看到相似的理论。
Index Sstfile
当 RocksDB 使用默认的 Level Style Compaction 时,由于是多层有序数组的结构,索引 sstfile 文件的过程有些巧妙。假设 RocksDB 需要在 Level=N 的 sstfile 中做一个 key 的点查,如何快速找到对应的 sstfile 文件?
Fractional Cascading
这里使用了 Fractional Cascading 算法。本质上它还是使用了二分查找的方法,但是在一层一层往下做二分查找时缩小了二分查找的范围。
举例说明:
L1 = [0 - 0], [1 - 5], [6 - 10], [11 - 15]
L2 = [0 - 1], [2 - 3], [4 - 12], [13 - 18]
L3 = [0 - 7], [8 - 14], [15 - 19], [20 - 30]
如图所示,每个 sstfile 都有其自己的范围,我们不太可能直接在每个 Level 去进行二分查找,这样效率太低。 Fractional Cascading 通过在每一层的每个 sstfile 在下一层的 index 来不断减少二分查找的范围。加上 index 后,数据变成这样:
L1 = [[0 - 0], 0], [[1 - 5], 2], [[6 - 10], 2], [[11 - 15], 3]
L2 = [[0 - 1], 0], [[2 - 3], 0], [[4 - 12], 1], [[13 - 18], 3]
L3 = [[0 - 7], 0], [[8 - 14], 0], [[15 - 19], 0], [[20 - 30], 0]
假设我们要对 key='8' 的数据做点查。从 L1 开始,我们找到 [6 - 10] 的 sstfile 是我们在 L1 需要查找的 sstfile,[6 - 10] 对应在 L2 的 index=2,所以我们再 L2 找文件时,可以直接选择在 index ~ [0, 2] 的区间内做二分查找,而不是对整个 L2 数据集做查找。
Transaction
在 RocksDB 中,它将事务分为3个阶段,依次为: Put,Prepare 和 Commit。如果每次等待事务操作全部 Prepare 后,在 Commit 再持久化到磁盘,这样如果多个大型事务同时出现,势必会降低吞吐速率。
对于事务,有两个改进的方案:
- WritePrepared: 在 Prepare 阶段就持久化到磁盘。
- WriteUnprepared: 在 Put 阶段就持久化到磁盘。
WritePrepared
RocksDB 在每条记录上加了一个 prepare_seq,在事务被提交后,在 CommitCache 中会存储一个 prepare_seq 和 commit_seq 之间的 mapping。CommitCache 会根据最大大小来 evict 数据,max_evict_seq 是用来表示最近 evict 的 prepare_seq。
在查询中,找到对应记录后,你可以找到对应的 prepare_seq,如果
prepare_seq>max_evict_seq并且不在 CommitCache 中:表示记录还在 prepare 阶段,没有 commit,不可读取。prepare_seq<=max_evict_seq并且prepare_seq不在 CommitCache 中:记录已经被持久化,直接读取即可。prepare_seqin CommitCache orprepare_seq<max_evict_seq: 记录已经被 committed,可以直接读取。
当然,这样的操作存在一些特殊情况,如果一个事务持续时间过长(Prepare 到 Commit),max_evict_seq 超过了事务中记录的 prepare_seq,那么prepare_seq会被放入delayed_prepared_的集合里,RocksDB 每次查询会检查这个集合来判断记录是否 commit。
WriteUnprepared
这一部分,单纯的从文档上看,并没有很好的理解其中的原理,并且这个功能还没有在生产上使用...
在阅读完 RocksDB 相关的文档之后,我相信没有太多的开发者愿意依赖 RocksDB 的事务,首先这个事务的算法机制本身有些复杂,难以理解,一旦出现问题,线上排查问题的过程必定会相当复杂;其次 RocksDB 对于这种事务机制,并不能接受一些极端情况,如长事务和阅读过于久远的历史 snapshot 等。虽然它对 Put -> Prepare -> Commit 的流程做了很大的优化,提升了吞吐,但是这样的确对使用 RocksDB 的业务,有很大的限制。
Summary
这里是我 Database 系列的第2篇笔记,写 RocksDB 的原因是不少企业已经有了很多成功的案例,并且在我司也曾经有过对 RocksDB 的选型考虑(不过还是因为吞吐并没有我们想象的那么好)。文中的信息大部分是通过 RocksDB Wiki 得到,从架构设计上并没有太多惊艳的地方,事务处理也有些不尽人意。但是在使用上看,RocksDB 确实足够轻量,不需要额外的部署,一个 jar 包就可以拥有一个功能完备的数据库,api 的使用体验也不错,并且能灵活对接 HDFS。
关于 RocksDB 写放大读放大的问题,学术界也提出过相关的理论,我也简单研读过,的确是一个不一样的视角,但是不知是否有成功的实践呢?