dask-kubernetes: SteamClosedError when creating KubeCluster on Kubeflow

Hi there,

I met this StreamClosedError when I created a cluster with cluster = KubeCluster.from_yaml('worker-template.yaml') on kubeflow jupyter notebook:

---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    183         try:
--> 184             n_frames = await stream.read_bytes(8)
    185             n_frames = struct.unpack("Q", n_frames)[0]

StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

CommClosedError                           Traceback (most recent call last)
<ipython-input-4-1d49f9431640> in <module>
----> 1 cluster = KubeCluster.from_yaml('worker-template.yaml')

/opt/conda/lib/python3.7/site-packages/dask_kubernetes/core.py in from_yaml(cls, yaml_path, **kwargs)
    309             d = yaml.safe_load(f)
    310             d = dask.config.expand_environment_variables(d)
--> 311             return cls.from_dict(d, **kwargs)
    312 
    313     @property

Assumed that the pod doesn’t have enough authorities to create new pods, I created a new role object jupyter-notebook and configured new rolebinding as below (the pod using the default ServiceAccount default-editor):

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  creationTimestamp: "2019-10-16T13:19:58Z"
  name: default-editor
  namespace: kubeflow-hong-zhu
  ownerReferences:
  - apiVersion: kubeflow.org/v1alpha1
    blockOwnerDeletion: true
    controller: true
    kind: Profile
    name: kubeflow-hong-zhu
    uid: 8022a9fa-f017-11e9-ad0d-42010a840259
  resourceVersion: "4025"
  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/kubeflow-hong-zhu/rolebindings/default-editor
  uid: a6f95f59-f017-11e9-ad0d-42010a840259
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: jupyter-notebook
subjects:
- kind: ServiceAccount
  name: default-editor
  namespace: kubeflow-hong-zhu

This however, doesn’t solve the problem. Any thoughts?

Thanks 😃

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 22 (8 by maintainers)

Most upvoted comments

I came across (I think) the same issue. It seems to depend on the version of Istio at least to some extent. On on cluster with istio 1.2.6 I have the error (trace below), but on a similar cluster with Istio 1.3.8 and the same resources it works fine!

Relevant versions:

Python 3.7
Dask-Kubernetes 0.10.1
Kubernetes (EKS) 1.14.9

I created a service account/role/rolebinding exactly as specified in the docs. I am using my own image instead of the daskdev one and it all worked fine on my local microk8s cluster (without Istio enabled).

Code:

with KubeCluster.from_yaml('config/worker-spec.yml') as cluster:
    cluster.adapt(minimum=DASK_MIN_WORKERS, maximum=DASK_MAX_WORKERS)
    with Client(cluster) as client:
        print(client.get_versions(check=True))

Error message:

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://192.168.21.190:40467
distributed.scheduler - INFO -   dashboard at:                     :8787
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 240, in write
    stream.write(b)
  File "/usr/local/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
    self._check_closed()
  File "/usr/local/lib/python3.7/site-packages/tornado/iostream.py", line 1035, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/opt/job.py", line 66, in <module>
    with KubeCluster.from_yaml('config/worker-spec.yml') as cluster:
  File "/usr/local/lib/python3.7/site-packages/dask_kubernetes/core.py", line 640, in from_yaml
    return cls.from_dict(d, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask_kubernetes/core.py", line 602, in from_dict
    return cls(make_pod_from_dict(pod_spec), **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask_kubernetes/core.py", line 416, in __init__
    super().__init__(**self.kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 256, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 161, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/dask_kubernetes/core.py", line 574, in _start
    await super()._start()
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 289, in _start
    await super()._start()
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 59, in _start
    await comm.write({"op": "subscribe_worker_status"})
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 255, in write
    convert_stream_closed_error(self, e)
  File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

Hope this is useful for the dask-kubernetes contributors (#197) or others with the same issue.

Just wanted to confirm that running a notebook without istio, and then starting a Dask cluster (also without Istio) works.

So looks like we/I need to find a way to start using services.

Sry about the mess here, pressed wrong bottom 😂