tensorflow: TF1.6/1.7 PS/Worker Distributed Run Failed with "UnavailableError: OS Error" when jobs are not running on current machine

System information

  • Have I written custom code (as opposed to using a stock example script provided in TensorFlow): yes
  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): Ubuntu 16.04
  • TensorFlow installed from (source or binary): official 1.6.0 release binary, or build from master branch (with latest commit: 47407ccb99a61fd5115130020ff8ef5ef9272433)
  • TensorFlow version (use command below): 1.6.0 official release or master
  • Python version: python 3.5 or python 2.7
  • Bazel version (if compiling from source): 0.11.1
  • GCC/Compiler version (if compiling from source): gcc (Ubuntu 5.4.0-6ubuntu1~16.04.9) 5.4.0 20160609
  • CUDA/cuDNN version: 9.0/7.0
  • GPU model and memory: Tesla K80, 12206MiB
  • Exact command to reproduce:

You can collect some of this information using our environment capture script:

https://github.com/tensorflow/tensorflow/tree/master/tools/tf_env_collect.sh

You can obtain the TensorFlow version with

python -c “import tensorflow as tf; print(tf.GIT_VERSION, tf.VERSION)” (‘v1.6.0-rc1-1503-g47407cc’, ‘1.6.0’)

Describe the problem

The expected behavior

The below source code utilized ps/worker mode to do some training, for usage: we need to run

python mnist_replica.py --data_dir /tmp/tensorflow/mnist/input_data --task_index 0 --ps_hosts ‘10.0.1.5:14416’ --worker_hosts ‘10.0.1.4:14417’ --job_name ‘ps’

python mnist_replica.py --data_dir /tmp/tensorflow/mnist/input_data --task_index 0 --ps_hosts ‘10.0.1.5:14416’ --worker_hosts ‘10.0.1.4:14417’ --job_name ‘worker’

respectively on “ps job” machine and “worker job” machine.

If we run the script firstly on ps, normally, it will wait for worker machine ready, before going furthur, the log is as below:

2018-03-20 05:49:40.410488: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:14416} 2018-03-20 05:49:40.410614: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> 10.0.1.4:14417} 2018-03-20 05:49:40.418149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:14416 ps 0, create done queue ps 0, running 2018-03-20 05:49:50.430531: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:50:00.430728: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:50:10.430943: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:50:20.431080: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:50:30.431351: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 ^C2018-03-20 05:50:40.434895: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:50:50.435104: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0 2018-03-20 05:51:00.435244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0

Then we run the script on worker machine, the two machines communicated and coordinated to get things done.

The problem

It works pretty well on tf1.5/1.4 or earlier version, but on latest 1.6.0 release version (and i also tried to build from master source code), it failed for sometimes. I did some investigation and testing, here are the symptoms:

  • If the specified ps/worker-hosts are having the same ip as current machine running the scripts (e.g. ps/worker are running different ports of current machine), everything is just fine, they works.

  • If the specified ps/worker-hosts are having the same ip (we call it A-IP), but different with current machine, even though current machine can ping successfully the A-IP, but will failed. The error log after starting ps task (with python mnist_replica.py --data_dir /tmp/tensorflow/mnist/input_data --task_index 0 --ps_hosts ‘10.0.1.5:14416’ --worker_hosts ‘10.0.1.4:14417’ --job_name ‘ps’):

2018-03-20 05:57:29.228323: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:14416} 2018-03-20 05:57:29.228478: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> 10.0.1.4:14417} 2018-03-20 05:57:29.229155: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:14416 ps 0, create done queue ps 0, running I0320 05:57:29.309552659 3803 subchannel.cc:677] Connect failed: {“created”:“@1521525449.309441854”,“description”:“Failed to connect to remote host: OS Error”,“errno”:111,“file”:“external/grpc/src/core/lib/iomgr/tcp_client_posix.cc”,“file_line”:198,“os_error”:“Connection refused”,“syscall”:“connect”,“target_address”:“ipv4:10.0.1.4:14417”} I0320 05:57:29.309786369 3803 subchannel.cc:484] Retry in 998 milliseconds I0320 05:57:30.307312499 3796 subchannel.cc:437] Failed to connect to channel, retrying I0320 05:57:30.308555551 3804 subchannel.cc:677] Connect failed: {“created”:“@1521525450.308464247”,“description”:“Failed to connect to remote host: OS Error”,“errno”:111,“file”:“external/grpc/src/core/lib/iomgr/tcp_client_posix.cc”,“file_line”:198,“os_error”:“Connection refused”,“syscall”:“connect”,“target_address”:“ipv4:10.0.1.4:14417”} I0320 05:57:30.308750759 3804 subchannel.cc:484] Retry in 999 milliseconds I0320 05:57:31.307171978 3796 subchannel.cc:437] Failed to connect to channel, retrying I0320 05:57:31.308303225 3802 subchannel.cc:677] Connect failed: {“created”:“@1521525451.308214021”,“description”:“Failed to connect to remote host: OS Error”,“errno”:111,“file”:“external/grpc/src/core/lib/iomgr/tcp_client_posix.cc”,“file_line”:198,“os_error”:“Connection refused”,“syscall”:“connect”,“target_address”:“ipv4:10.0.1.4:14417”} I0320 05:57:31.308338927 3802 subchannel.cc:484] Retry in 999 milliseconds I0320 05:57:32.307163816 3796 subchannel.cc:437] Failed to connect to channel, retrying I0320 05:57:32.308250261 3801 subchannel.cc:677] Connect failed: {“created”:“@1521525452.308164957”,“description”:“Failed to connect to remote host: OS Error”,“errno”:111,“file”:“external/grpc/src/core/lib/iomgr/tcp_client_posix.cc”,“file_line”:198,“os_error”:“Connection refused”,“syscall”:“connect”,“target_address”:“ipv4:10.0.1.4:14417”} I0320 05:57:32.308284662 3801 subchannel.cc:484] Retry in 999 milliseconds I0320 05:57:33.307136307 3796 subchannel.cc:437] Failed to connect to channel, retrying I0320 05:57:33.308314356 3806 subchannel.cc:677] Connect failed: {“created”:“@1521525453.308215652”,“description”:“Failed to connect to remote host: OS Error”,“errno”:111,“file”:“external/grpc/src/core/lib/iomgr/tcp_client_posix.cc”,“file_line”:198,“os_error”:“Connection refused”,“syscall”:“connect”,“target_address”:“ipv4:10.0.1.4:14417”} I0320 05:57:33.308375658 3806 subchannel.cc:484] Retry in 999 milliseconds I0320 05:57:34.307172752 3796 subchannel.cc:437] Failed to connect to channel, retrying 2018-03-20 05:57:34.308793: E tensorflow/core/distributed_runtime/master.cc:269] Master init: Unavailable: OS Error Traceback (most recent call last): File “mnist_replica.py”, line 304, in <module> main(args) File “mnist_replica.py”, line 102, in main sess.run(queue.dequeue()) File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py”, line 905, in run run_metadata_ptr) File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py”, line 1137, in _run feed_dict_tensor, options, run_metadata) File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py”, line 1355, in _do_run options, run_metadata) File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py”, line 1374, in _do_call raise type(e)(node_def, op, message) tensorflow.python.framework.errors_impl.UnavailableError: OS Error

  • If the specified ps/worker-hosts are having different ips (in the same LAN, can ping successfully each other), errors on starting on ps worker is similar with the second situation.

  • The exception happens in MasterSession initilization ( i guess there needs some communication via grpc there)

My personal thinking

To be honest, i am wondering whether the gRPC upgrade (that was introduced since v1.6rc0 ) did the trick, but since I am pretty new to this component, besides i am not sure whether somebody else have the similar issues (while I think people using tf1.6 and master will suffer from this on distribute run).

That would be great if any experts can share some insights or thoughts. Thanks in advance!!!

Source code / logs

source code:

`from future import absolute_import from future import division from future import print_function

import argparse import math import sys import tempfile import time

import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data

IMAGE_PIXELS = 28

def create_done_queue(ps_task_index, worker_count): “”“Queue used to signal death for i’th ps shard. Intended to have all workers enqueue an item onto it to signal doneness.”“”

with tf.device("/job:ps/task:%d/cpu:0" % (ps_task_index)):
    return tf.FIFOQueue(worker_count, tf.int32, shared_name="done_queue" + str(ps_task_index))

def create_done_queues(ps_count, worker_count): return [create_done_queue(ps_task_index, worker_count) for ps_task_index in range(ps_count)]

def main(args): mnist = input_data.read_data_sets(args.input_training_data_path, one_hot=True) if args.download_only: sys.exit(0)

if args.job_name is None or args.job_name == "":
    raise ValueError("Must specify an explicit `job_name`")
if args.task_index is None or args.task_index == "":
    raise ValueError("Must specify an explicit `task_index`")

print("job name = %s" % args.job_name)
print("task index = %d" % args.task_index)

# Construct the cluster and start the server
ps_spec = args.ps_hosts.split(",")
worker_spec = args.worker_hosts.split(",")

# Get the number of workers.
num_workers = len(worker_spec)
num_pss = len(ps_spec)

cluster = tf.train.ClusterSpec({
    "ps": ps_spec,
    "worker": worker_spec})

if not args.existing_servers:
    # Not using existing servers. Create an in-process server.
    server = tf.train.Server(
        cluster, job_name=args.job_name, task_index=args.task_index, protocol=args.protocol)
    if args.job_name == "ps":
        config = tf.ConfigProto(log_device_placement=True, allow_soft_placement=True)
        sess = tf.Session(server.target, config=config)

        print("ps %d, create done queue" % args.task_index)
        queue = create_done_queue(args.task_index, num_workers)

        print("ps %d, running" % args.task_index)
        for i in range(num_workers):
            sess.run(queue.dequeue())
            print("ps %d received worker %d done" % (args.task_index, i))

        print("all workers are done, ps %d: exit" % (args.task_index))
        sys.exit()

is_chief = (args.task_index == 0)
if args.num_gpus > 0:
    # Avoid gpu allocation conflict: now allocate task_num -> #gpu
    # for each worker in the corresponding machine
    gpu = (args.task_index % args.num_gpus)
    worker_device = "/job:worker/task:%d/gpu:%d" % (args.task_index, gpu)
elif args.num_gpus == 0:
    # Just allocate the CPU to worker server
    cpu = 0
    worker_device = "/job:worker/task:%d/cpu:%d" % (args.task_index, cpu)

print("worker %d, worker_device=%s" % (args.task_index, worker_device))
print("worker %d, create done queue" % args.task_index)
queues = create_done_queues(num_pss, num_workers)
print("worker %d, done queue created" % args.task_index)

# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU

with tf.device(
        tf.train.replica_device_setter(
            worker_device=worker_device,
            ps_device="/job:ps/cpu:0",
            cluster=cluster)):
    global_step = tf.Variable(0, name="global_step", trainable=False)

    # Variables of the hidden layer
    hid_w = tf.Variable(
        tf.truncated_normal(
            [IMAGE_PIXELS * IMAGE_PIXELS, args.hidden_units],
            stddev=1.0 / IMAGE_PIXELS),
        name="hid_w")
    hid_b = tf.Variable(tf.zeros([args.hidden_units]), name="hid_b")

    # Variables of the softmax layer
    sm_w = tf.Variable(
        tf.truncated_normal(
            [args.hidden_units, 10],
            stddev=1.0 / math.sqrt(args.hidden_units)),
        name="sm_w")
    sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

    # Ops: located on the worker specified with args.task_index
    x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
    y_ = tf.placeholder(tf.float32, [None, 10])

    hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
    hid = tf.nn.relu(hid_lin)

    y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
    cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

    opt = tf.train.AdamOptimizer(args.learning_rate)

    if args.sync_replicas:
        if args.replicas_to_aggregate is None:
            replicas_to_aggregate = num_workers
        else:
            replicas_to_aggregate = args.replicas_to_aggregate

        opt = tf.train.SyncReplicasOptimizer(
            opt,
            replicas_to_aggregate=replicas_to_aggregate,
            total_num_replicas=num_workers,
            name="mnist_sync_replicas")

    train_step = opt.minimize(cross_entropy, global_step=global_step)

    if args.sync_replicas:
        local_init_op = opt.local_step_init_op
        if is_chief:
            local_init_op = opt.chief_init_op

        ready_for_local_init_op = opt.ready_for_local_init_op

        # Initial token and chief queue runners required by the sync_replicas mode
        chief_queue_runner = opt.get_chief_queue_runner()
        sync_init_op = opt.get_init_tokens_op()

    init_op = tf.global_variables_initializer()
    train_dir = tempfile.mkdtemp()

    enq_ops = []
    for q in queues:
        qop = q.enqueue(1)
        enq_ops.append(qop)
if args.sync_replicas:
    sv = tf.train.Supervisor(
        is_chief=is_chief,
        logdir=train_dir,
        init_op=init_op,
        local_init_op=local_init_op,
        ready_for_local_init_op=ready_for_local_init_op,
        recovery_wait_secs=1,
        global_step=global_step)
else:
    sv = tf.train.Supervisor(
        is_chief=is_chief,
        logdir=train_dir,
        init_op=init_op,
        recovery_wait_secs=1,
        global_step=global_step)

sess_config = tf.ConfigProto(
    allow_soft_placement=True,
    log_device_placement=False,
    device_filters=["/job:ps", "/job:worker/task:%d" % args.task_index])
if args.infer_shapes == True:
    sess_config.graph_options.infer_shapes = args.infer_shapes

# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
    print("Worker %d: Initializing session..." % args.task_index)
else:
    print("Worker %d: Waiting for session to be initialized..." %
          args.task_index)

		  
if args.existing_servers:
    server_grpc_url = "grpc://" + worker_spec[args.task_index]
    print("Using existing server at: %s" % server_grpc_url)

    sess = sv.prepare_or_wait_for_session(server_grpc_url,
                                          config=sess_config)
else:
    sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

print("Worker %d: Session initialization complete." % args.task_index)

if args.sync_replicas and is_chief:
    # Chief worker will start the chief queue runner and call the init op.
    sess.run(sync_init_op)
    sv.start_queue_runners(sess, [chief_queue_runner])

# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)

local_step = 0
while True:
    # Training feed
    batch_xs, batch_ys = mnist.train.next_batch(args.batch_size)
    train_feed = {x: batch_xs, y_: batch_ys}

    _, step = sess.run([train_step, global_step], feed_dict=train_feed)
    local_step += 1

    now = time.time()
    print("%f: Worker %d: training step %d done (global step: %d)" %
          (now, args.task_index, local_step, step))

    if step >= args.train_steps:
        break

time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)

# Validation feed
val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
val_xent = sess.run(cross_entropy, feed_dict=val_feed)
print("After %d training step(s), validation cross entropy = %g" %
      (args.train_steps, val_xent))

for op in enq_ops:
    sess.run(op)

if name == “main”: parser = argparse.ArgumentParser() parser.add_argument(“–input-training-data-path”, default=“/tmp/mnist-data”) parser.add_argument(“–input_training_data_path”, default=“/tmp/mnist-data”) parser.add_argument(“–download_only”, type=bool, default=False) parser.add_argument(“–task-index”, type=int) parser.add_argument(“–task_index”, type=int) parser.add_argument(“–num_gpus”, type=int, default=1) parser.add_argument(“–replicas_to_aggregate”, type=int) parser.add_argument(“–hidden_units”, type=int, default=100) parser.add_argument(“–train_steps”, type=int, default=200) parser.add_argument(“–batch_size”, type=int, default=100) parser.add_argument(“–learning_rate”, type=float, default=0.01) parser.add_argument(“–sync_replicas”, type=bool, default=False) parser.add_argument(“–existing_servers”, type=bool, default=False) parser.add_argument(“–ps-hosts”, default=“localhost:2222”) parser.add_argument(“–ps_hosts”, default=“localhost:2222”) parser.add_argument(“–worker-hosts”, default=“localhost:2223,localhost:2224”) parser.add_argument(“–worker_hosts”, default=“localhost:2223,localhost:2224”) parser.add_argument(“–job-name”) parser.add_argument(“–job_name”) parser.add_argument(“–protocol”, default=“grpc”) parser.add_argument(“–infer_shapes”, type=bool, default=False)

(args, unknown) = parser.parse_known_args()
main(args)`

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 24 (6 by maintainers)

Most upvoted comments

This has been troubling me for a while. I found out that the problem is that GRPC uses the native “epoll” polling engine for communication. Changing this to a portable polling engine solved this issue for me. The way to do is to set the environment variable, “GRPC_POLL_STRATEGY=poll” before running the tensorflow programs. This solved this issue for me. For reference, see, https://github.com/grpc/grpc/blob/master/doc/environment_variables.md.

This has been troubling me for a while. I found out that the problem is that GRPC uses the native “epoll” polling engine for communication. Changing this to a portable polling engine solved this issue for me. The way to do is to set the environment variable, “GRPC_POLL_STRATEGY=poll” before running the tensorflow programs. This solved this issue for me. For reference, see, https://github.com/grpc/grpc/blob/master/doc/environment_variables.md.

could you tell how to set the environment variable? thanks very much

import os os.environ[‘GRPC_POLL_STRATEGY’] = “poll”