Ray Datasets - 模型训练前的"最后一公里"

Ray 1.8+ 版本中提出使用 Datasets 来解决模型训练的”最后一公里”问题。由于并未在 Ray 上做过多的实践,所以本文更多是将 Datasets 做了系统化的梳理。关于 Ray 的介绍可以参考 Ray - A Distributed Framework for Emerging AI Applications

现状

Ray Docs 中可以看到:

Datasets is not intended as a replacement for more general data processing systems. Its utility is as the last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

Datasets 的出现并不是为了替代当前已有的 ETL 计算引擎(如 Spark/Dask/Mars),而是作为 ETL Pipeline 和后续分布式训练的的“连接器”存在。

训练 Pipeline

常见的训练数据流形式如下图:

ray-datasets-pipeline
当前训练流中有三个阶段:

  • ETL/数据处理:对于当前类型模型的通用数据处理,包括特征计算、特征值处理、无效数据过滤等,数据从 DFS 读取,写入到 DFS
  • 预处理:更多是面向模型的数据处理,比如将数据以 pk 形式进行分区;数据进行打散等,不同的模型会有不同的要求,预处理会读取 ETL 步骤中的数据输出,并进行进一步的处理,形成模型训练所需的 sample,并持久化到 DFS
  • 模型训练:从 DFS 上读取 sample 进行模型训练

这里注意,通常一次完整的模型训练,训练器会多次读取样本数据,每次读取称作 epoch,在每个 epoch 中,我们会把样本数据切分成若干个 batch,每个 batch 包含若干条 sample。以伪代码形式可以这样理解(相关概念可以参考 epoch-vs-iterations-vs-batch):

for epoch->epoches:
    for batch->epoch.batches:
        for sample->batch.samples:
            model.feed(sample)

问题

性能问题:

  • 序列化开销:从上面示意图中的箭头数量可以看出来,涉及到 5 次序列化/反序列化(实际上是可以做一些优化,这里描述的是一般情况)
  • 存储成本:这里指的是引入 DFS 作为中间数据存储,不仅产生了一定的存储开销,并且在数据读取和写入时引入了额外的 IO 和带宽

功能缺失:

  • 缺乏 per-epoch shuffling 的能力:关于为什么需要做 per-epoch shuffling 可以参考 why-should-the-data-be-shuffled-for-machine-learning-tasks,在这里更多指的是 global shuffle,因为 global shuffle 涉及到大量的资源和时间,所以在实际训练场景中很少会进行这样的操作

易用性:

  • 语言和框架的切换:在面向模型的数据处理中,需要使用 Spark 和 Tensorflow 进行开发
  • 异构资源支持:数据处理和模型训练可能会使用不同的资源类型 CPU/GPU
  • 调度能力:使用 airflow 或 kubeflow 进行各个任务之间的调度,调度过程中会产生资源闲置

Datasets 能力

Datasets 本质上是 Ray cluster 中对分布式数据集的抽象。 在此基础上,Datasets 能够充分借助 Ray 本身的能力,包括 The Plasma In-Memory Object Store,Scheduling 能力,资源异构能力、容错能力等。在此基础上,Ray Datasets 提供了 DataLoader、Pre-Processing、Pipelining 的能力:

Data Loader:

  • 上面提到,频繁的落盘导致了数据的序列化开销问题,Datasets 基于 Apache Arrow ,将不同格式的数据读取后,转换成 Arrow 格式并存储在 Plasma 中,这些数据可以快速地在 Ray task 之间传输而无需进行多余的序列化和反序列化

Pre-Processing:

  • 提供 map、batch map、filter,由于框架较新,所以在实现时也站在了”巨人的肩膀”上,比如 map_batch() 方法直接实现了向量化的读取等
  • 提供 global shuffle 能力(sort、random shuffle、groupBy),实际原理很简单,直接基于分布式存储,进行 Zero-Copy 的数据拉取

Pipelining CPU/GPU:

下图是从 PyData Global 2021 - Unifying Large Scale Data Preprocessing and ML Pipelines with Ray Datasets 中找到。由于一次训练可能会涉及到多次的 loading/preprocessing/inference,而各个阶段所用的资源类型不同,导致了在使用 CPU 进行数据预处理的同时,需要保留 GPU 资源(此时不能被释放,否则可能会被其他任务抢占,导致训练过程中断),所以产生了 GPU idle 资源浪费的情况。

而 Datasets 借助 Ray 的调度能力,以及对异构资源的支持,可以实现下图中「With Pipelining」的效果,并且提供了便捷的 DataPipeline API 来完成 Pipelining 语义的表达。

ray-datasets-pipelining

总结

实际应用中,训练和特征工程的人才、技术栈割裂,会导致很多问题,比如:

  • 训练一份模型需要掌握多个系统,而模型本身可能很简单
  • 数据流中数据的频繁交换,不同语言和不同系统的切换,可能成为了训模型的最大瓶颈

从训练数据流的图中可以看到,Training 其实是数据流的最后一步,而 Ray 目前已经可以和 Tensorflow/PyTorch 进行结合。数据处理层面上,Ray 在过去提供给用户的大多数都是 low-level API,需要用户进行大量的编码进行拓展才能放到生产环境使用,而 Datasets 属于 Ray Train 在数据流上的一个拓展,也给了 Ray 的潜在用户更多的动力来使用吧。

Ray Datasets 作为模型训练前的计算引擎存在,一定程度上替代了 Spark 的部分场景,而且 Datasets 的 API 抽象和 Spark API 也非常类似,所以开始了解时,最常见的问题就是 Datasets vs Spark 对比的问题。但从上面对 Datasets 的介绍来看,显然 Ray Datasets 具有很多 Spark 缺乏的能力,不过这也是各个框架最初的场景定位决定的,倒也无可厚非。

本文主要是在阅读 Ray Datasets 相关资料中,做了一些小小的汇总,如果读者期望了解更多详细的内容,可以直接从下方的链接来获取。

参考

[1]. https://docs.ray.io/en/latest/data/dataset.html#datasets-distributed-data-loading-and-compute
[2]. https://docs.ray.io/en/latest/train/train.html#train-docs
[3]. https://www.anyscale.com/blog/why-third-generation-ml-platforms-are-more-performant
[4]. https://www.youtube.com/watch?v=wl4tvru9_Cg
[5]. https://www.anyscale.com/blog/the-third-generation-of-production-ml-architectures