在分布式张量流中形成障碍的正确方法是什么?

问题描述:

在分布式培训期间,我希望在每个时期后同步,对主要工作人员进行一些计算,并根据这些计算结果进行或停止培训。我需要一个障碍来这样做。在分布式张量流中形成障碍的正确方法是什么?

我没有看到文件了类似的话,所以我实现了基于队列(类似于梯度是如何存储和分布式训练应用)解决方案:

def build_barrier(tasks, task_index, barrier_name): 
    queues = [] 
    for i, task in enumerate(tasks): 
     with tf.device('%s/cpu:0' % task): 
      with tf.name_scope(barrier_name): 
       queues.append(
        tf.FIFOQueue(
         len(tasks), 
         (tf.float32), 
         shapes=(()), 
         name=str(i), 
         shared_name=str(i))) 

    with tf.control_dependencies([queue.enqueue(1.) for queue in queues]): 
     return queues[task_index].dequeue_many(len(tasks)) 

的想法是创建一个每个队列工人。对于'信号',我在每个队列中推送一个令牌,并为'加入'我从相应的队列中出列很多令牌,我想要同步多少个任务。

问题是:这是正确的做法还是有更好的方法?

+1

有一个可靠地实现了在SyncReplicas优化像这样https://github.com/tensorflow/tensorflow/blob/e8e06c5b66d7833ea8715c32c7ab9739714225fc/tensorflow/python/training/sync_replicas_optimizer(即当某些进程死亡/重新启动功能偶数)。 PY#L34 –

您的解决方案与SyncReplicasOptimizer非常相似。在SyncReplicasOptimizer中,它使用一个同步令牌队列来模拟屏障,并为每个变量累加累加器并累积和平均更新。这是一个非常典型的批量同步并行机制,但它在Tensorflow中实现了Stale同步并行机制。

此外,Tensorflow提供了最新版本的Barrier,您可以查看更多信息。