Skip to content

Worker raises CommClosedError on client shutdown #94

@lgarrison

Description

@lgarrison

Describe the issue:

I'm trying out a simple hello-world style dask-mpi example, and the computation returns the right result, but I'm getting exceptions when the client finishes. I'm running the below script under Slurm as srun -n3 python repro.py, and the error is:

2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

I thought this might be related to #87, but I'm running on Python 3.8 and there's just an exception, no hang.

Am I doing something wrong? It looks to me like the worker is complaining because the scheduler shuts down before the worker does. Is this expected? If I manually force the workers to shut down before the client and scheduler do, with:

def closeall(dask_scheduler):
    for w in dask_scheduler.workers:
        dask_scheduler.close_worker(w)
[...]
client.run_on_scheduler(closeall)

then everything exits with no exceptions. But this feels like a hack... am I missing something?

Minimal Complete Verifiable Example:

import dask_mpi
from distributed import Client

def f(a):
    return a + 1

def main():
    dask_mpi.initialize()

    with Client() as client:
        future = client.submit(f, 1)
        res = future.result()
        print(f'future returned {res}')

if __name__ == '__main__':
    main()

Full log:

(venv8) lgarrison@scclin021:~/scc/daskdistrib$ srun -n3 python ./repro_commclosed.py 
2022-10-05 14:05:40,460 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-10-05 14:05:40,527 - distributed.scheduler - INFO - State start
2022-10-05 14:05:40,533 - distributed.scheduler - INFO -   Scheduler at: tcp://10.250.145.105:45101
2022-10-05 14:05:40,533 - distributed.scheduler - INFO -   dashboard at:                     :8787
2022-10-05 14:05:40,566 - distributed.worker - INFO -       Start worker at: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO -          Listening to: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO -           Worker name:                          2
2022-10-05 14:05:40,566 - distributed.worker - INFO -          dashboard at:       10.250.145.105:34979
2022-10-05 14:05:40,566 - distributed.worker - INFO - Waiting to connect to: tcp://10.250.145.105:45101
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:40,566 - distributed.worker - INFO -               Threads:                          1
2022-10-05 14:05:40,566 - distributed.worker - INFO -                Memory:                   7.81 GiB
2022-10-05 14:05:40,566 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-az8e_3tm
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,354 - distributed.scheduler - INFO - Receive client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,356 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,385 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: init, memory: 0, processing: 0>
2022-10-05 14:05:41,386 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.250.145.105:38331
2022-10-05 14:05:41,386 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,386 - distributed.worker - INFO -         Registered to: tcp://10.250.145.105:45101
2022-10-05 14:05:41,386 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,387 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Close client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
future returned 2
2022-10-05 14:05:41,506 - distributed.scheduler - INFO - Receive client connection: Client-53080c1f-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,507 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,508 - distributed.worker - INFO - Run out-of-band function 'stop'
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing...
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing all comms
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: running, memory: 0, processing: 0>
2022-10-05 14:05:41,509 - distributed.worker - INFO - Stopping worker at tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.core - INFO - Removing comms to tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.scheduler - INFO - Lost all workers
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

Environment:

  • Dask version: 2022.9.2
  • Python version: 3.8.12
  • Operating System: Rocky 8
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions