在大数据的情况下,基于单机的建模已经不能满足现有的需求了,所以需要在集群上进行建模,这个时候需要使用分布式的开发方式。单机和分布式的开发代码是有所区别的,这里就简单介绍下tensoflow的分布式开发的两种方式

基于tf原生的分布式开发

1、分布式的话就会涉及到更新梯度的方式,有同步和异步的两个方案,同步更新的方式在模型的表现上能更快的进行收敛,异步更新迭代的速度会更加快。两种更新方式入下图:

image.png

image.png

2、 tf是基于ps, work 两种服务器进行分布式的开发。ps服务器可以只用与参数的汇总更新,让各个work进行梯度的计算。

3、具体开发流程:

首先指定ps 服务器启动参数 –job_name=ps

1
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0

接着指定work服务器参数(启动两个work 节点) –job_name=worker

1
2
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1

上面指定的参数 worker_hosts ps_hosts job_name task_index 都会在py文件中接受使用

参数接收使用

1
tf.app.flags.DEFINE_string("worker_hosts", "默认值", "描述说明")

接收了参数后分别注册ps work,让他们各司其职。

1
2
3
4
5
6
7
8
9
10
11
12
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)

issync = FLAGS.issync
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):

接下来更新梯度:
同步

1
2
3
4
5
6
7
8
rep_op = tf.train.SyncReplicasOptimizer(optimizer,
replicas_to_aggregate=len(worker_hosts),
replica_id=FLAGS.task_index,
total_num_replicas=len(worker_hosts),
use_locking=True)
train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()

异步更新梯度:

1
train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)

使用tf.train.Supervisor 进行真的迭代
注意:如果是同步更新梯度要加入

1
2
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)

这个异步的方式虽然能够使用,但是有点麻烦,要自己指定集群集群ip和端口,但是我们往往自己不知道这些信息,所以这个时候可以借助 TensorFlowOnSpark来解决,使用yarn来管理。

基于TensorFlowOnSpark的分布式开发

使用spark-submit 来提交任务同时指定spark需要运行的参数(–num-executors 6等),模型代码,模型超参等
这边同样需要接受外部参数

1
2
3
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--tracks", help="数据集路径")
args = parser.parse_args()

参数准备好,训练数据(DataFrame)准备好,调用模型的api进行启动

1
2
3
4
5
6
7
8
9
10
estimator = TFEstimator(soft_dist.map_fun, args) \
.setInputMapping({'tracks': 'tracks', 'label': 'label'}) \
.setModelDir(args.model) \
.setExportDir(args.serving) \
.setClusterSize(args.cluster_size) \
.setNumPS(num_ps) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.max_steps)
model = estimator.fit(df)

soft_dist.map_fun 要调起的方法,后面都是模型训练的参数

soft_dist定义一个 map_fun(args, ctx)的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def map_fun(args, ctx):
...
worker_num = ctx.worker_num # worker数量
job_name = ctx.job_name # job名
task_index = ctx.task_index # 任务索引
if job_name == "ps": # ps节点(主节点)
time.sleep((worker_num + 1) * 5)
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
num_workers = len(cluster.as_dict()['worker'])
if job_name == "ps":
server.join()
elif job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
...

后面使用tf.train.MonitoredTrainingSession高级api 进行模型训练和预测。
这里可以有点小技巧,在模型结束的时候,可以发送邮件通知自己。这样可以及时的知道明星运行的情况。

注意,如果是用SessionRunHook来保存最后输出的模型,要了解框架代码是有个bug 的,他只能在规定的时间内保存完,没有运行结束也会被kill,如果你的版本不是bug修复后的,要自己处理,放宽运行时间。

tf分布式的开发大致是这两种情况,后边的方式可以用于实际的生产环境,还是比较稳定的。这边提供一个大体的思路和注意点,希望对大家有所帮助。最好的开发指南还是在官方的文档。