9月11日,蚂蚁金服在Google Developer Day Shanghai 2019上宣布开源了基于TensorFlow 2.0 eager execution的分布式深度学习系统ElasticDL。基于TensorFlow的支持弹性调度的深度学习系统,据我们所知,ElasticDL是第一个。项目负责人王益和我们分享了ElasticDL项目的设计意图和现状,尤其是ElasticDL与TensorFlow 2.0以及Kubernetes的技术关联。

分布式深度学习的技术思路

基于TensorFlow的分布式训练系统大致可以分为以下四类:

其中,ElasticDL位于田字格的右上角。之所以选择这条技术思路,是为了利用Kubernetes实现容错和弹性调度。

高性能计算和云计算

在深度学习技术研发的早期,涉及的人员相对少,共用一个计算集群的人相对少,计算作业之间的协调可以通过口头交流实现。大家更关心缩短运行时间,也就是从作业启动到结束的这段时间。高性能计算技术(HPC)是解决这个问题的有效途径,比如NVIDIA的cuBLAS和cuDNN优化高性能数学计算、NCCL优化GPU之间的通信效率。

随着深度学习技术的大规模使用,很多工程师和研究员共用一个集群,通过商量来协调调度显然不可行了,大家开始使用集群管理系统调度分布式作业。这其中,Kubernetes近年来一枝独秀,已经在各大公有云中广泛使用。

云计算和弹性调度

在Kubernetes上启动分布式TensorFlow作业的常用方式是使用Google Cloud开源的Kubeflow。Kubeflow是Kubernetes的一个”插件“,它询问Kubernetes计划分配哪几台机器来运行一个分布式作业中的各个进程,随后告知每个进程,所有其他进程的IP地址和port。从而保证一个作业里各个进程之间互相知道对方。

为什么需要让所有进程互相知道对方呢?这是TensorFlow ps-based distribution方式(上述表格中的左上)要求的。TensorFlow 1.x原生的分布式训练功能让一个作业中所有进程都执行TensorFlow 1.x runtime程序。这些进程互相通信,互相协调成为一个“分布式runtime“,来解释执行表示深度学习计算过程的计算图(graph)。在开始分布式训练之初,graph被TensorFlow runtime拆解成若干子图;每个进程负责执行一个子图——任何一个进程失败(可能是被更高优先级作业抢占),则整个大图的执行就失败了。所以TensorFlow原生的分布式训练能力不是容错的(fault-tolerant)。不过,它是可以从错误恢复(fault-recoverable)——TensorFlow API提供checkpoint的能力;如果一个作业失败了,可以重启作业,从最近的checkpoint开始继续执行。

Kubeflow可以在Kubernetes上启动基于TensorFlow原生的分布式计算能力的作业。但是因为后者并不能容错,所以Kubeflow并不能无中生有。不能容错,也意味着不能弹性调度。

对弹性调度的诉求

在很多人共用计算集群的情况下,支持弹性调度意味着极大提升团队效率和集群的总体利用率。前者支持快速迭代以保持技术领先;后者决定企业成本和云计算业务的盈利能力。

一个展示弹性调度效果的例子如下。假设一个集群里有N个GPU,一个作业包括一个进程,占用了N/2个GPU。第二个作业需要N/2+1个GPU;但是此时机群里空闲GPU只有N/2个。如果没有弹性调度能力,那么第二个作业被迫等待,直到第一个作业结束释放资源。这个等待时间很可能和第二个作业的运行时间同量级。此时,集群的利用率很低,是50%。如果有弹性调度,那么第二个作业可以马上启动,用N/2个GPU做计算。日后如果有更多空闲资源了,调度系统可以增加其进程数量,充分利用资源。

另一个例子是,假设有一个作业已经在执行了,此时一个新的更高优先级的作业需要资源,所以调度系统杀掉了(preempt)了第一个作业的几个进程来腾出资源启动第二个作业。如果没有弹性调度和容错,那么第一个作业会失败,所有进程都结束。直到有足够资源重启它,并且沿着最近的checkpoint继续。如果有弹性调度,则第一个作业的剩下的进程可以继续执行,只是因为可用的进程(GPU)少了,所以速度慢一些而已。

以上两个例子都展示了弹性调度对集群利用率的提升,以及对团队工作效率的保障。需要注意的是:容错和弹性调度互为因果。容错的意思是,作业不受其中进程数量变化影响。弹性调度时,作业里的进程数量会随集群workload情况增减,所以作业必须是容错的,才能和调度系统配合,实现弹性调度。也因为如此,弹性调度依赖分布式编程框架和调度系统配合。

今天,很多分布式编程框架都可以和Kubernetes配合实现容错和弹性调度。比如用于离线数据处理的Spark、用于在线数据处理的Storm、在线流数据引擎Flink、分布式存储系统Redis和HBase。其中适合深度学习的框架有Paddle EDL。基于TensorFlow的支持弹性调度的深度学习系统,据我们所知,ElasticDL是第一个。

Kubernetes-native的弹性调度

ElasticDL通过实现一个Kubernetes-native的框架,调用TensorFlow 2.0,来实现弹性深度学习。

所谓Kubernetes-native指的是一个程序调用Kubernetes API来起止进程。Google MapReduce是一个Borg-native的分布式计算框架。用户通过运行一个Borg的客户端程度启动一个MapReduce作业。Borg客户端调用Borg API提交作业,并且启动一个master进程。这个master调用Borg API启动其他workers进程。ElasticDL也类似,用户调用ElasticDL的命令行客户端程序启动作业。这个客户端程序调用Kubernetes API,启动master进程。master进程继续调用Kubernetes API启动其他进程。master进程也可以调用Kubernetes API监控其他进程。

如果worker挂了,按照分布式深度学习训练算法的数学特性,可以不用处理,即可确保训练过程继续。如果一个parameter server进程挂了,master会选择一个worker进程,让它转换角色替补上挂掉的parameter server进程。在以上两种情况下,master都会调用Kubernetes API,请它再启动一个额外的worker进程。如果启动成功,master要带它入门,加入到与其他进程的协作中。master进程的状态(主要是三个task queues:todo、doing、done)可以保留在Kubernetes集群的etcd存储系统中。这样,万一master挂了,重启的master进程可以从etcd继承前世的状态。

以上是一个简化的描述。ElasticDL实现了多种分布式计算模式,每种模式实现fault-tolerance的方式略有不同。我们会在后续文章中详细介绍。

Kubernetes-native架构使得master进程有机会与Kubernetes协作实现容错和弹性调度。不过,因为ElasticDL调用Kubernetes API,也就意味着ElasticDL只能运行在Kubernetes上。

TensorFlow原生的分布式计算能力不是Kubernetes-native的。所以TensorFlow不是绑定在Kubernetes这个平台上的。这是大家如果要用现有技术在Kubernetes运行TensorFlow作业的话,需要依赖Kubernetes的扩展Kubeflow的原因。

理论上,不调用Kubernetes API也是可以实现一定程度的容错的。即使没有Kubernetes的通知,master可以通过检查其他继承的心跳(heartbeat)或者检查TCP链接状态,判断其他进程的生死存亡。但是,不调用Kubernetes API(或者其他调度系统的API),master无法通知调度系统重启进程,也无法得知新启动的进程的信息,并且帮助它加入作业。这种“非Kubernetes-native”的容错方式颇为被动,只能接受资源紧张时一些进程被抢占而挂掉的事实,而不能在其他作业释放资源后增加进程充分利用空闲资源。

TensorFlow 2.0

如上文解释,为了保证TensorFlow最核心的runtime是平台无关的,我们没法通过修改runtime实现完备的主动的容错和弹性调度。所以如文首的田字格所示,ElasticDL和Uber Horovod都是在TensorFlow的API上包一层。

Horovod基于TensorFlow 1.x。一个Horovod作业的每个进程调用单机版TensorFlow做本地计算,然后收集gradients,并且通过AllReduce调用汇聚gradients并且更新模型。Horovod也是平台无关的,所以它提供的AllReduce操作不支持容错和弹性调度。这一点和ElasticDL不一样。

和ElasticDL一样的是,Horovod需要从TensorFlow偷偷“截获”gradients,在TensorFlow 1.x中,深度学习计算是表示成一个计算图(graph),并且由TensorFlow runtime解释执行,所以Horovod为了获得每个进程算的gradients并且AllReduce它们,就得hack进入图执行的过程。为此,Horovod要求使用者使用特定的optimizer代替TensorFlow提供的optimizer,从而可以在优化模型阶段透露出gradients。

一个调用Horovod的用户程序的结构如下。其中标记为(*)和(**)的部分是Horovod要求用户写的,帮助Horovod截获TensorFlow计算得到的gradients的代码。如果用户不慎忘记写了,那么程序执行结果就不对了。

ElasticDL没有这些问题,因为它依赖的是TensorFlow 2.0。TensorFlow 2.0主推的eager execution mode采用和解释执行图完全不同的深度学习计算方式。类似PyTorch的做法,前向计算过程把对基本计算单元(operator)的调用记录在一个内存数据结构tape里,随后,反向计算过程(计算gradients的)可以回溯这个tape,以此调用operator对应的gradient operator。这个tape提供一个操作让用户可以获取每个参数的gradient。

ElasticDL通过调用TensorFlow 2.0 API可以很直接地获取gradients:

而且上面这段代码不是需要用户写的,而是ElasticDL的一部分。ElasticDL用户需要写的代码对应上述Horovod代码范例中的一行——定义模型。

极简的API和使用方式

训练一个模型不只需要上述模型定义,还需要指定数据、优化目标(cost)、和优化算法(optimizer)。用户总是希望能以尽量精简的方式指定这些信息,以尽量少的代码描述训练作业。

ElasticDL和TensorFlow其他的high-level API,例如Keras和TensorFlow Estimator一样,几乎调用一个API函数就可以执行一个分布式训练作业。下面这个程序使用Keras。Keras使用TensorFlow原生分布式训练能力,不支持容错和弹性调度。

ElasticDL的API相对更加精简一些。上述范例程序对应的ElasticDL版本如下:

主要的区别在于:在Keras程序里用户要选择分布式执行策略;而在ElasticDL程序里则不需要。这是因为ElasticDL自动选择分布式训练算法和策略。

简单的说,对于有很大参数(需要model parallelism)的模型,ElasticDL使用asynchrnous SGD。这个方法配合delayed model update能把网络通信量减少一个数量级。很多NLP、搜索、推荐、广告的模型都符合这一类。Asynchronous SGD对于这类模型的表现比较稳定。对于图像识别和语音识别这一类参数不太大的模型,ElasticDL团队在开发一个Kubernetes-native的AllReduce。和Horovod使用的AllReduce一样,ElasticDL AllReduce把进程间通信的拓扑组织成一个环,从而实现高性能的模型更新。与之不同的是,ElasticDL AllReduce是容错的——在有进程失败导致AllReduce调用失败的情况下,master组织剩下的活着的进程构造一个新的环。

ElasticDL项目希望通过这样的分而治之的策略,提供高性能并且易用的深度学习系统。

ElasticDL和SQLFlow的关系

今年早些时候,王益团队开源了SQLFlow。用户可以用扩展后的SQL语法,非常精炼地描述整个数据流和AI流程。

比如,如果我们要为一个电子商务网站构造一个推荐系统,需要开发日志收集、在线数据清洗、特征工程、模型训练,验证和预测等模块。每个模块可能需要投入一个团队数轴甚至数月的时间。

最近几年里,很多互联网服务开始把数据直接上传到通用数据库中,比如蚂蚁金服的很多数据是在ODPS(也就是阿里云上的MaxCompute服务)以及新一代的智能数据系统。这促使我们考虑把数据清洗和预处理放在数据库中做,而特征工程、自动机器学习、和训练过程在ElasticDL这样的AI引擎里做。SQLFlow把扩展语法的SQL程序翻译成一个Python程序,把两部分链接起来。

在这样的场景中,如果AI需要很多参数,则用户也就需要在SQL程序中提供这些参数。比如下面SQL语句从数据库中提取用户的年龄、工作部门、和工作地点,来预测其收入。

其中,TRAIN从句指定要训练的模型;COLUMN从句指定如何把数据映射成特征;LABEL指定要预测的值;WITH指定训练过程中的各种参数,其中dist_strategy是调用Keras/TensorFlow做训练是需要指定的分布式策略,gpus指定需要的资源。而这些,在SQLFlow调用ElasticDL的时候都是不需要的,因为ElasticDL自动选择分布式策略和算法。

从这个例子可以看出,如果要让用户能提供尽量少的参数,人工智能引擎还需要更加智能,提供包括AutoML和自动特征工程的功能。ElasticDL项目任重道远。我们期待把上述SQL程序简化为如下形式:

ElasticDL项目的现状

ElasticDL项目处于早期探索阶段。API还在演化过程中。这次开源的版本,尚不包括自动选择分布策略和算法的代码。相比在TensorFlow runtime中实现分布式计算,基于TensorFlow 2.0 eager mode的Python API实现的分布式训练性能差距还很大。ElasticDL团队在和Google Brain团队合作,开发上述asynchronous SGD + delayed model update能力、以及Kubernetes-native AllReduce。希望在下一个版本中可以提供给大家使用。

目前ElasticDL实现的基于parameter server的分布式SGD训练方法验证了容错和弹性调度。并且在Google Cloud上的Kubernetes 1.12集群和阿里Sigma 3.1(一个Kubernetes的高性能实现)上都可以运行。并且,ElasticDL团队开发了SQLFlow生成ElasticDL程序的code generator。

我们希望尽早开源ElasticDL和尽早分享其设计意图,能汇聚来自不同公司和社区的力量,一起探索Google TensorFlow 2.0和Kubernetes的分布式训练生态,早日实现便捷的端到端的人工智能开发套件。