如何自定义 Flink State 使用的 Serializer

May 12, 2021

自定义 Flink State 的序列化方式是一种高阶的使用技巧,在很多复杂场景下,通过自定义 Serializer 可以在兼容性、性能等方面获得一定的收益。

State Serializer 作用

通过下面两行代码,我们创建了一个 Integer 类型的 ValueState:

// 创建 Integer 类型的 ValueState
ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor<>("value", Types.INT);
ValueState<Integer> state = getRuntimeContext().getState(valueState);

我们传入的 Types.INT 让 Flink 知道了这个 State 的类型是 Integer 类型,并为这个 Integer 类型创建了对应的 IntSerializer。IntSerializer 有两个作用:

  • snapshot 过程中将 State 的值,也就是 Integer 对象,序列化成二进制数据,写入持久化存储中
  • restore 过程中将持久化存储中的二进制数据,反序列化成 Integer 对象

那么,State Serializer 其实是定义了 State 的二进制格式,通过自定义的方式,我们可以解决一些日常开发中常见的问题,比如:

  1. 复杂对象的序列化开销过大,比如 Kryo 需要写入 className 等冗余信息
  2. State 对象增减字段造成状态无法恢复

基本概念

一个完整的 State Serializer 由 Serializer 和 SerializerSnapshot 组成。其中:

  • Serializer 定义指定类型的序列化和反序列化方法
  • SerializerSnapshot 定义 Serializer 元信息的序列化方式(是快照中的一部分)
  • TypeSerializerSchemaCompatibility 表示作业重启后用户定义的 TypeSerializer 和快照中 TypeSerializer 兼容性

在官方文档 Custom Serialization for Managed State 中对自定义 State Serializer 有不少的介绍,大家可以通过这篇文章了解更细节的相关概念。

实现一个 Serializer

我们以 DecimalDataSerializer(用于序列化 DecimalData 类型数据)为例,来研究 Serializer 具体的实现。

我们先看 DecimalData 的数据结构:

public final class DecimalData implements Comparable<DecimalData> {
	final int precision;   // 精度(字段长度)
	final int scale;  // 范围(小数的位数)
	final long longVal;  // 精度较小时,选择用 long 值表示
	BigDecimal decimalVal;   // 精度较大时,选择用 java.math.BigDecimal 表示
	
	// 判断是否超出最大精度(即是否可以使用 long 值表示)
	public boolean isCompact() {
		return precision <= MAX_COMPACT_PRECISION;
	}
	
	// 转换成 long 值
	public long toUnscaledLong() {
		if (isCompact()) {
			return longVal;
		} else {
			return toBigDecimal().unscaledValue().longValueExact();
		}
	}	
	
	// 转换成 bytes
	public byte[] toUnscaledBytes() {
		return toBigDecimal().unscaledValue().toByteArray();
	}	
}

对于这个数据结构,我们来看一下他对应的 Serializer:

public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
	private final int precision;
	private final int scale;
	
	// 构造函数,初始化 precision 和 scale
	public DecimalDataSerializer(int precision, int scale) {
		this.precision = precision;
		this.scale = scale;
	}

	@Override
	public void serialize(DecimalData record, DataOutputView target) throws IOException {
		if (DecimalData.isCompact(precision)) {
			// 当前精度小,使用 long 值表示,写出 long 值
			assert record.isCompact();
			target.writeLong(record.toUnscaledLong());
		} else {
			// 当前精度大,使用 BigDecimal 表示
			byte[] bytes = record.toUnscaledBytes();
			target.writeInt(bytes.length);
			target.write(bytes);
		}
	}

	@Override
	public DecimalData deserialize(DataInputView source) throws IOException {
		if (DecimalData.isCompact(precision)) {
			// 当前精度小,读取 long 值,初始化 DecimalData
			long longVal = source.readLong();
			return DecimalData.fromUnscaledLong(longVal, precision, scale);
		} else {
			// 当前精度大,读取 bytes,初始化 DecimalData
			int length = source.readInt();
			byte[] bytes = new byte[length];
			source.readFully(bytes);
			return DecimalData.fromUnscaledBytes(bytes, precision, scale);
		}
	}
}

上面的代码中展示了核心的两个方法:

  • serialize:将 DecimalData 对象转化为二进制数据写入到 DataOutputView
  • deserialize:将 DataInputView 中的二进制数据按序读取,得到 DecimalData 对象

至于这两个方法在什么时候需要调用,是读者无需关心的,因为不管是写入到 Rpc 传输还是写入到具体的 Local/Remote 文件系统,都会将对应的 OutputStream 和 InputStream 封装到 DataOutputView 和 DataInputView 中。

支持多版本兼容的例子

通过上面的例子可以发现,作业经过修改后,如果 Serializer 的 precision 和 scale 发生了变化,那么由于 DecimalData.isCompact(precision) 的结果与之前不同,所以已经序列化的数据可能无法被反序列化回来。假如我们需要设计一个可以兼容一些改动的 Serializer,需要改动哪些地方?

序列化中加入版本信息

如果我们期望可以灵活改动 precision 和 scale,那么以上面的 serialize 和 deserialize 方法为例:

	int currentVersion = 2;
	
	@Override
	public void serialize(DecimalData record, DataOutputView target) throws IOException {
		target.writeInt(currentVersion); // 新增版本信息
		// ... 和之前一致
	}
	
	@Override
	public DecimalData deserialize(DataInputView source) throws IOException {
		int version = source.readInt();
		if (version == currentVersion) {
			return deserializeCurrentVersion(source);
		} else {
			return deserializeVersion1(source);
		}
	}	
	
	// 解析当前版本的数据结构
	private DecimalData deserializeCurrentVersion(DataInputView source) {
		// 使用当前版本的 precision 和 scale 进行解析
	}
	
	// 解析 Version 1 版本的数据结构
	private DecimalData deserializeVersion1(DataInputView source) {
		// 使用 Version 1 中的 precision 和 scale 进行解析并转化成当前结构的数据
	}	

调整兼容性判断

在修改了 DecimalData 的 precision 和 scale 之后,我们会发现快照中的 DecimalDataSerializer 和新创建的 DecimalDataSerializer 的属性已经对不上了,在 Flink 的实现中,会对 precision 和 scale 进行判断,如果不一致就抛出 TypeSerializerSchemaCompatibility.incompatible() 的异常。而目前我们加入了版本信息,则可以直接返回 TypeSerializerSchemaCompatibility.compatibleAsIs()

兼容性判断分为四种,分别是:

  • COMPATIBLE_AS_IS:表示兼容,并且今后使用用户新定义的 Serializer
  • COMPATIBLE_AFTER_MIGRATION:表示兼容,但是需要重新刷一遍数据,使用快照中的 Serializer 反序列化数据,并使用新的 Serializer 序列化数据
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:表示兼容,用户需要返回一个新的 Serializer 作为今后使用的 Serializer
  • INCOMPATIBLE:表示不兼容,作业抛异常退出

引用

  1. DecimalDataSerializer.java
  2. Custom Serialization for Managed State

State Serializer 在平台型应用中使用的会比较多,尤其是写 DataStream 框架的同学,因为用户往往在 State 中存储的是业务数据的状态,里面的字段的增减是非常普遍的操作。对于期望自定义 State Serializer 的同学,参考一下很多 Flink 内部实现的 Serializer,比如上面提到的 IntSerializer 和 DecimalDataSerializer。