-
-
Notifications
You must be signed in to change notification settings - Fork 29
Description
Describe the issue:
I'm trying out a simple hello-world style dask-mpi example, and the computation returns the right result, but I'm getting exceptions when the client finishes. I'm running the below script under Slurm as srun -n3 python repro.py, and the error is:
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
I thought this might be related to #87, but I'm running on Python 3.8 and there's just an exception, no hang.
Am I doing something wrong? It looks to me like the worker is complaining because the scheduler shuts down before the worker does. Is this expected? If I manually force the workers to shut down before the client and scheduler do, with:
def closeall(dask_scheduler):
for w in dask_scheduler.workers:
dask_scheduler.close_worker(w)
[...]
client.run_on_scheduler(closeall)then everything exits with no exceptions. But this feels like a hack... am I missing something?
Minimal Complete Verifiable Example:
import dask_mpi
from distributed import Client
def f(a):
return a + 1
def main():
dask_mpi.initialize()
with Client() as client:
future = client.submit(f, 1)
res = future.result()
print(f'future returned {res}')
if __name__ == '__main__':
main()Full log:
(venv8) lgarrison@scclin021:~/scc/daskdistrib$ srun -n3 python ./repro_commclosed.py
2022-10-05 14:05:40,460 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-10-05 14:05:40,527 - distributed.scheduler - INFO - State start
2022-10-05 14:05:40,533 - distributed.scheduler - INFO - Scheduler at: tcp://10.250.145.105:45101
2022-10-05 14:05:40,533 - distributed.scheduler - INFO - dashboard at: :8787
2022-10-05 14:05:40,566 - distributed.worker - INFO - Start worker at: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO - Listening to: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO - Worker name: 2
2022-10-05 14:05:40,566 - distributed.worker - INFO - dashboard at: 10.250.145.105:34979
2022-10-05 14:05:40,566 - distributed.worker - INFO - Waiting to connect to: tcp://10.250.145.105:45101
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:40,566 - distributed.worker - INFO - Threads: 1
2022-10-05 14:05:40,566 - distributed.worker - INFO - Memory: 7.81 GiB
2022-10-05 14:05:40,566 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-az8e_3tm
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,354 - distributed.scheduler - INFO - Receive client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,356 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,385 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: init, memory: 0, processing: 0>
2022-10-05 14:05:41,386 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.250.145.105:38331
2022-10-05 14:05:41,386 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,386 - distributed.worker - INFO - Registered to: tcp://10.250.145.105:45101
2022-10-05 14:05:41,386 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,387 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Close client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
future returned 2
2022-10-05 14:05:41,506 - distributed.scheduler - INFO - Receive client connection: Client-53080c1f-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,507 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,508 - distributed.worker - INFO - Run out-of-band function 'stop'
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing...
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing all comms
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: running, memory: 0, processing: 0>
2022-10-05 14:05:41,509 - distributed.worker - INFO - Stopping worker at tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.core - INFO - Removing comms to tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.scheduler - INFO - Lost all workers
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
Environment:
- Dask version: 2022.9.2
- Python version: 3.8.12
- Operating System: Rocky 8
- Install method (conda, pip, source): pip