From a3dd06ea5dbf9d42a59801036df4555efc7ae098 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Sun, 18 May 2025 22:55:25 -0400 Subject: [PATCH 01/11] [_722] fix segfault and hung threads on SIGINT during parallel get --- irods/parallel.py | 65 +++++++--- irods/test/data_obj_test.py | 8 ++ ...test_signal_handling_in_multithread_get.py | 118 ++++++++++++++++++ 3 files changed, 174 insertions(+), 17 deletions(-) create mode 100644 irods/test/modules/test_signal_handling_in_multithread_get.py diff --git a/irods/parallel.py b/irods/parallel.py index 2ad03492d..e1d6672cc 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -10,6 +10,7 @@ import threading import multiprocessing from typing import List, Union +import weakref from irods.data_object import iRODSDataObject from irods.exception import DataObjectDoesNotExist @@ -17,6 +18,14 @@ from queue import Queue, Full, Empty +transfer_managers = weakref.WeakKeyDictionary() + + +def abort_asynchronous_transfers(): + for mgr in transfer_managers: + mgr.quit() + + logger = logging.getLogger(__name__) _nullh = logging.NullHandler() logger.addHandler(_nullh) @@ -91,9 +100,11 @@ def __init__( for future in self._futures: future.add_done_callback(self) else: - self.__invoke_done_callback() + self.__invoke_futures_done_logic() + return self.progress = [0, 0] + if (progress_Queue) and (total is not None): self.progress[1] = total @@ -112,7 +123,7 @@ def _progress(Q, this): # - thread to update progress indicator self._progress_fn = _progress self._progress_thread = threading.Thread( - target=self._progress_fn, args=(progress_Queue, self) + target=self._progress_fn, args=(progress_Queue, self), daemon=True ) self._progress_thread.start() @@ -153,11 +164,13 @@ def __call__( with self._lock: self._futures_done[future] = future.result() if len(self._futures) == len(self._futures_done): - self.__invoke_done_callback() + self.__invoke_futures_done_logic( + skip_user_callback=(None in self._futures_done.values()) + ) - def __invoke_done_callback(self): + def __invoke_futures_done_logic(self, skip_user_callback=False): try: - if callable(self.done_callback): + if not skip_user_callback and callable(self.done_callback): self.done_callback(self) finally: self.keep.pop("mgr", None) @@ -240,6 +253,9 @@ def _copy_part(src, dst, length, queueObject, debug_info, mgr, updatables=()): bytecount = 0 accum = 0 while True and bytecount < length: + if mgr._quit: + bytecount = None + break buf = src.read(min(COPY_BUF_SIZE, length - bytecount)) buf_len = len(buf) if 0 == buf_len: @@ -275,11 +291,16 @@ class _Multipart_close_manager: """ def __init__(self, initial_io_, exit_barrier_): + self._quit = False self.exit_barrier = exit_barrier_ self.initial_io = initial_io_ self.__lock = threading.Lock() self.aux = [] + def quit(self): + self._quit = True + self.exit_barrier.abort() + def __contains__(self, Io): with self.__lock: return Io is self.initial_io or Io in self.aux @@ -304,8 +325,12 @@ def remove_io(self, Io): Io.close() self.aux.remove(Io) is_initial = False - self.exit_barrier.wait() - if is_initial: + broken = False + try: + self.exit_barrier.wait() + except threading.BrokenBarrierError: + broken = True + if is_initial and not (broken or self._quit): self.finalize() def finalize(self): @@ -440,13 +465,19 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): Io = File = None if Operation.isNonBlocking(): - if queueLength: - return futures, queueObject, mgr - else: - return futures + return futures, queueObject, mgr else: - bytecounts = [f.result() for f in futures] - return sum(bytecounts), total_size + bytes_transferred = 0 + try: + bytecounts = [f.result() for f in futures] + if None not in bytecounts: + bytes_transferred = sum(bytecounts) + except KeyboardInterrupt: + if any(not f.done() for f in futures): + # Induce any threads still alive to quit the transfer and exit. + mgr.quit() + raise + return bytes_transferred, total_size def io_main(session, Data, opr_, fname, R="", **kwopt): @@ -559,10 +590,10 @@ def io_main(session, Data, opr_, fname, R="", **kwopt): if Operation.isNonBlocking(): - if queueLength > 0: - (futures, chunk_notify_queue, mgr) = retval - else: - futures = retval + (futures, chunk_notify_queue, mgr) = retval + transfer_managers[mgr] = None + + if queueLength <= 0: chunk_notify_queue = total_bytes = None return AsyncNotify( diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index 071771717..6dd3663c1 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -3320,6 +3320,14 @@ def test_access_time__issue_700(self): # Test that access_time is there, and of the right type. self.assertIs(type(data.access_time), datetime) + def test_handling_of_termination_signals_during_multithread_get__issue_722(self): + from irods.test.modules.test_signal_handling_in_multithread_get import ( + test as test__issue_722, + ) + + test__issue_722(self) + + if __name__ == "__main__": # let the tests find the parent irods lib sys.path.insert(0, os.path.abspath("../..")) diff --git a/irods/test/modules/test_signal_handling_in_multithread_get.py b/irods/test/modules/test_signal_handling_in_multithread_get.py new file mode 100644 index 000000000..2665765bf --- /dev/null +++ b/irods/test/modules/test_signal_handling_in_multithread_get.py @@ -0,0 +1,118 @@ +import os +import re +import signal +import subprocess +import sys +import tempfile +import time + +import irods +import irods.helpers +from irods.test import modules as test_modules + +OBJECT_SIZE = 2 * 1024**3 +OBJECT_NAME = "data_get_issue__722" +LOCAL_TEMPFILE_NAME = "data_object_for_issue_722.dat" + + +_clock_polling_interval = max(0.01, time.clock_getres(time.CLOCK_BOOTTIME)) + + +def wait_till_true(function, timeout=None): + start_time = time.clock_gettime_ns(time.CLOCK_BOOTTIME) + while not (truth_value := function()): + if ( + timeout is not None + and (time.clock_gettime_ns(time.CLOCK_BOOTTIME) - start_time) * 1e-9 + > timeout + ): + break + time.sleep(_clock_polling_interval) + return truth_value + + +def test(test_case, signal_names=("SIGTERM", "SIGINT")): + """Creates a child process executing a long get() and ensures the process can be + terminated using SIGINT or SIGTERM. + """ + program = os.path.join(test_modules.__path__[0], os.path.basename(__file__)) + + for signal_name in signal_names: + # Call into this same module as a command. This will initiate another Python process that + # performs a lengthy data object "get" operation (see the main body of the script, below.) + process = subprocess.Popen( + [sys.executable, program], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, + ) + + # Wait for download process to reach the point of spawning data transfer threads. In Python 3.9+ versions + # of the concurrent.futures module, these are nondaemon threads and will block the exit of the main thread + # unless measures are taken (#722). + localfile = process.stdout.readline().strip() + test_case.assertTrue( + wait_till_true( + lambda: os.path.exists(localfile) + and os.stat(localfile).st_size > OBJECT_SIZE // 2 + ), + "Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.", + ) + + signal_message_info = f"While testing signal {signal_name}" + sig = getattr(signal, signal_name) + + # Interrupt the subprocess with the given signal. + process.send_signal(sig) + # Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit + # due to misproper or incomplete handling of the signal. + try: + test_case.assertEqual( + process.wait(timeout=15), + -sig, + "{signal_message_info}: unexpected subprocess return code.", + ) + except subprocess.TimeoutExpired as timeout_exc: + test_case.fail( + f"{signal_message_info}: subprocess timed out before terminating. " + "Non-daemon thread(s) probably prevented subprocess's main thread from exiting." + ) + # Assert that in the case of SIGINT, the process registered a KeyboardInterrupt. + if sig == signal.SIGINT: + test_case.assertTrue( + re.search("KeyboardInterrupt", process.stderr.read()), + "{signal_message_info}: Expected 'KeyboardInterrupt' in log output.", + ) + + +if __name__ == "__main__": + # These lines are run only if the module is launched as a process. + session = irods.helpers.make_session() + hc = irods.helpers.home_collection(session) + TESTFILE_FILL = b"_" * (1024 * 1024) + object_path = f"{hc}/{OBJECT_NAME}" + + # Create the object to be downloaded. + with session.data_objects.open(object_path, "w") as f: + for y in range(OBJECT_SIZE // len(TESTFILE_FILL)): + f.write(TESTFILE_FILL) + local_path = None + # Establish where (ie absolute path) to place the downloaded file, i.e. the get() target. + try: + with tempfile.NamedTemporaryFile( + prefix="local_file_issue_722.dat", delete=True + ) as t: + local_path = t.name + + # Tell the parent process the name of the local file being "get"ted (got) from iRODS + print(local_path) + sys.stdout.flush() + + # "get" the object + session.data_objects.get(object_path, local_path) + finally: + # Clean up, whether or not the download succeeded. + if local_path is not None and os.path.exists(local_path): + os.unlink(local_path) + if session.data_objects.exists(object_path): + session.data_objects.unlink(object_path, force=True) From 481952c9a9c10ce880ad92884c3f27cdaa4e22d7 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Fri, 6 Jun 2025 04:05:03 -0400 Subject: [PATCH 02/11] use subtest. --- .../modules/test_signal_handling_in_multithread_get.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/irods/test/modules/test_signal_handling_in_multithread_get.py b/irods/test/modules/test_signal_handling_in_multithread_get.py index 2665765bf..7c01f9433 100644 --- a/irods/test/modules/test_signal_handling_in_multithread_get.py +++ b/irods/test/modules/test_signal_handling_in_multithread_get.py @@ -38,6 +38,9 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): program = os.path.join(test_modules.__path__[0], os.path.basename(__file__)) for signal_name in signal_names: + + test_case.subTest(f"Testing with signal {signal_name}") + # Call into this same module as a command. This will initiate another Python process that # performs a lengthy data object "get" operation (see the main body of the script, below.) process = subprocess.Popen( @@ -59,7 +62,6 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): "Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.", ) - signal_message_info = f"While testing signal {signal_name}" sig = getattr(signal, signal_name) # Interrupt the subprocess with the given signal. @@ -70,18 +72,18 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): test_case.assertEqual( process.wait(timeout=15), -sig, - "{signal_message_info}: unexpected subprocess return code.", + "Unexpected subprocess return code.", ) except subprocess.TimeoutExpired as timeout_exc: test_case.fail( - f"{signal_message_info}: subprocess timed out before terminating. " + f"Subprocess timed out before terminating. " "Non-daemon thread(s) probably prevented subprocess's main thread from exiting." ) # Assert that in the case of SIGINT, the process registered a KeyboardInterrupt. if sig == signal.SIGINT: test_case.assertTrue( re.search("KeyboardInterrupt", process.stderr.read()), - "{signal_message_info}: Expected 'KeyboardInterrupt' in log output.", + "Did not find expected string 'KeyboardInterrupt' in log output.", ) From 1213d2b36ebe19759104bef567de03e4538b0db9 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Wed, 10 Dec 2025 19:52:29 +0000 Subject: [PATCH 03/11] try to preserve latest synchronous parallel put/get for orderly shutdown in signal handler --- irods/parallel.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/irods/parallel.py b/irods/parallel.py index e1d6672cc..52c5299ab 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -20,10 +20,16 @@ transfer_managers = weakref.WeakKeyDictionary() - -def abort_asynchronous_transfers(): - for mgr in transfer_managers: - mgr.quit() +# Keep last active synchronous manager(s) here. +# TODO - Might have to protect with mutex. +current_mgr = {} + +def abort_parallel_transfers(dry_run = False): + if not dry_run: + for mgr in transfer_managers: + mgr.quit() + else: + return dict(transfer_managers) logger = logging.getLogger(__name__) @@ -472,10 +478,11 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): bytecounts = [f.result() for f in futures] if None not in bytecounts: bytes_transferred = sum(bytecounts) - except KeyboardInterrupt: + except (KeyboardInterrupt, SystemExit): if any(not f.done() for f in futures): - # Induce any threads still alive to quit the transfer and exit. - mgr.quit() + transfer_managers.update( current_mgr ) + current_mgr.clear() + current_mgr[mgr] = 1 raise return bytes_transferred, total_size From 59fd131cf64d1aec8241062acd2428a522f96f92 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Thu, 11 Dec 2025 23:44:22 -0500 Subject: [PATCH 04/11] can now abort parallel transfers with SIGINT/^C or SIGTERM some debug still remains. --- irods/parallel.py | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/irods/parallel.py b/irods/parallel.py index 52c5299ab..bb45ffbf1 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -17,19 +17,13 @@ import irods.keywords as kw from queue import Queue, Full, Empty - transfer_managers = weakref.WeakKeyDictionary() -# Keep last active synchronous manager(s) here. -# TODO - Might have to protect with mutex. -current_mgr = {} - def abort_parallel_transfers(dry_run = False): if not dry_run: for mgr in transfer_managers: mgr.quit() - else: - return dict(transfer_managers) + return dict(transfer_managers) logger = logging.getLogger(__name__) @@ -259,6 +253,7 @@ def _copy_part(src, dst, length, queueObject, debug_info, mgr, updatables=()): bytecount = 0 accum = 0 while True and bytecount < length: + print (('T' if mgr._quit else 'F'), end = '', flush=True) if mgr._quit: bytecount = None break @@ -296,16 +291,30 @@ class _Multipart_close_manager: """ - def __init__(self, initial_io_, exit_barrier_): + def __init__(self, initial_io_, exit_barrier_, executor = None): self._quit = False self.exit_barrier = exit_barrier_ self.initial_io = initial_io_ self.__lock = threading.Lock() self.aux = [] + self.futures = set() + self.executor = executor + + def add_future(self, future): self.futures.add(future) + + @property + def active_futures(self): + return tuple(_ for _ in self.futures if not _.done()) + + def shutdown(self): + if self.executor: + self.executor.shutdown(cancel_futures = True) def quit(self): self._quit = True self.exit_barrier.abort() + self.shutdown() + return self.active_futures def __contains__(self, Io): with self.__lock: @@ -324,6 +333,7 @@ def add_io(self, Io): # synchronizes all of the parallel threads just before exit, so that we know # exactly when to perform a finalizing close on the data object + def remove_io(self, Io): is_initial = True with self.__lock: @@ -424,7 +434,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): futures = [] executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) num_threads = min(num_threads, len(ranges)) - mgr = _Multipart_close_manager(Io, Barrier(num_threads)) + mgr = _Multipart_close_manager(Io, Barrier(num_threads), executor) counter = 1 gen_file_handle = lambda: open( fname, Operation.disk_file_mode(initial_open=(counter == 1)) @@ -456,7 +466,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): if File is None: File = gen_file_handle() futures.append( - executor.submit( + f := executor.submit( _io_part, Io, byte_range, @@ -467,6 +477,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): **thread_opts ) ) + mgr.add_future(f) counter += 1 Io = File = None @@ -475,15 +486,16 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): else: bytes_transferred = 0 try: + transfer_managers[mgr] = 1 bytecounts = [f.result() for f in futures] if None not in bytecounts: bytes_transferred = sum(bytecounts) - except (KeyboardInterrupt, SystemExit): - if any(not f.done() for f in futures): - transfer_managers.update( current_mgr ) - current_mgr.clear() - current_mgr[mgr] = 1 + except (KeyboardInterrupt, #SystemExit +): + print ('\nraising KBI\n') raise + finally: + pass return bytes_transferred, total_size From ba0407a05f0cb55c65022364e93c7952ccabd216 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Fri, 12 Dec 2025 15:38:30 +0000 Subject: [PATCH 05/11] [_722] update readme for signals and parallel put/get --- README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/README.md b/README.md index d8ee206de..5a57c32f5 100644 --- a/README.md +++ b/README.md @@ -312,6 +312,25 @@ will spawn a number of threads in order to optimize performance for iRODS server versions 4.2.9+ and file sizes larger than a default threshold value of 32 Megabytes. +Because multithread processes under Unix-type operating systems sometimes +need special handling, it is recommended that puts and gets of large files +should be appropriately checked in case a signal aborts the transfer: + +```python +from irods.parallel import abort_parallel_transfers + +def handler(*arguments): + abort_parallel_transfers() + +signal(SIGINT,handler) + +try: + # a multi-1247 put or get can leave non-daemon threads running if not treated with care. + session.data_objects.put( ...) +except KeyboardInterrupt + abort_parallel_transfers() +``` + Progress bars ------------- From dddcc952182ce430f19b02bd8f47af0d31208b40 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Sat, 13 Dec 2025 03:47:10 -0500 Subject: [PATCH 06/11] prevent auto_close --- irods/manager/data_object_manager.py | 3 ++- irods/parallel.py | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/irods/manager/data_object_manager.py b/irods/manager/data_object_manager.py index f2c5ed31b..b97a29d97 100644 --- a/irods/manager/data_object_manager.py +++ b/irods/manager/data_object_manager.py @@ -131,12 +131,13 @@ def __init__(self, *a, **kwd): self._iRODS_session = kwd.pop("_session", None) super(ManagedBufferedRandom, self).__init__(*a, **kwd) import irods.session + self.no_close = False with irods.session._fds_lock: irods.session._fds[self] = None def __del__(self): - if not self.closed: + if not self.no_close and not self.closed: self.close() call___del__if_exists(super(ManagedBufferedRandom, self)) diff --git a/irods/parallel.py b/irods/parallel.py index bb45ffbf1..be4b16259 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -311,6 +311,15 @@ def shutdown(self): self.executor.shutdown(cancel_futures = True) def quit(self): + from irods.manager.data_object_manager import ManagedBufferedRandom + # remove all descriptors from consideration for auto_close. + import irods.session + with irods.session._fds_lock: + for fd in self.aux + [self.initial_io]: + irods.session._fds.pop(fd, ()) + if type(fd) is ManagedBufferedRandom: + fd.no_close = True + # abort threads. self._quit = True self.exit_barrier.abort() self.shutdown() From b96f80540ec891d54163235ca4ecbea473f2d8ee Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Sat, 13 Dec 2025 04:31:46 -0500 Subject: [PATCH 07/11] satisfy static typing. --- irods/parallel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/irods/parallel.py b/irods/parallel.py index be4b16259..af2fa70a6 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -9,7 +9,7 @@ import concurrent.futures import threading import multiprocessing -from typing import List, Union +from typing import List, Union, Any import weakref from irods.data_object import iRODSDataObject @@ -17,7 +17,8 @@ import irods.keywords as kw from queue import Queue, Full, Empty -transfer_managers = weakref.WeakKeyDictionary() + +transfer_managers: weakref.WeakKeyDictionary[_Multipart_close_manager, Any] = weakref.WeakKeyDictionary() def abort_parallel_transfers(dry_run = False): if not dry_run: From ed25eda9fb58ae11a4c3f1a54eb6d8bf0b65a4dc Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Sat, 13 Dec 2025 04:34:34 -0500 Subject: [PATCH 08/11] revise README --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5a57c32f5..c0de14c5f 100644 --- a/README.md +++ b/README.md @@ -313,8 +313,9 @@ iRODS server versions 4.2.9+ and file sizes larger than a default threshold value of 32 Megabytes. Because multithread processes under Unix-type operating systems sometimes -need special handling, it is recommended that puts and gets of large files -should be appropriately checked in case a signal aborts the transfer: +need special handling, it is recommended that any put or get of a large file +be appropriately handled in the case that a terminating signal aborts the +transfer: ```python from irods.parallel import abort_parallel_transfers From 9d2ff7af616fe1fd0cdad06a28e57101ff248827 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Sat, 13 Dec 2025 14:45:04 -0500 Subject: [PATCH 09/11] forward ref needed for mypy? --- irods/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/irods/parallel.py b/irods/parallel.py index af2fa70a6..cea12874c 100644 --- a/irods/parallel.py +++ b/irods/parallel.py @@ -18,7 +18,7 @@ from queue import Queue, Full, Empty -transfer_managers: weakref.WeakKeyDictionary[_Multipart_close_manager, Any] = weakref.WeakKeyDictionary() +transfer_managers: weakref.WeakKeyDictionary["_Multipart_close_manager", Any] = weakref.WeakKeyDictionary() def abort_parallel_transfers(dry_run = False): if not dry_run: From 0aaf74782ab181558cc50dd627c192f01eaccfef Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Wed, 17 Dec 2025 19:38:56 -0500 Subject: [PATCH 10/11] patch test --- ...test_signal_handling_in_multithread_get.py | 105 ++++++++++-------- 1 file changed, 60 insertions(+), 45 deletions(-) diff --git a/irods/test/modules/test_signal_handling_in_multithread_get.py b/irods/test/modules/test_signal_handling_in_multithread_get.py index 7c01f9433..da33e6cbb 100644 --- a/irods/test/modules/test_signal_handling_in_multithread_get.py +++ b/irods/test/modules/test_signal_handling_in_multithread_get.py @@ -9,6 +9,7 @@ import irods import irods.helpers from irods.test import modules as test_modules +from irods.parallel import abort_parallel_transfers OBJECT_SIZE = 2 * 1024**3 OBJECT_NAME = "data_get_issue__722" @@ -39,53 +40,56 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): for signal_name in signal_names: - test_case.subTest(f"Testing with signal {signal_name}") - - # Call into this same module as a command. This will initiate another Python process that - # performs a lengthy data object "get" operation (see the main body of the script, below.) - process = subprocess.Popen( - [sys.executable, program], - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - text=True, - ) - - # Wait for download process to reach the point of spawning data transfer threads. In Python 3.9+ versions - # of the concurrent.futures module, these are nondaemon threads and will block the exit of the main thread - # unless measures are taken (#722). - localfile = process.stdout.readline().strip() - test_case.assertTrue( - wait_till_true( - lambda: os.path.exists(localfile) - and os.stat(localfile).st_size > OBJECT_SIZE // 2 - ), - "Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.", - ) - - sig = getattr(signal, signal_name) - - # Interrupt the subprocess with the given signal. - process.send_signal(sig) - # Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit - # due to misproper or incomplete handling of the signal. - try: - test_case.assertEqual( - process.wait(timeout=15), - -sig, - "Unexpected subprocess return code.", - ) - except subprocess.TimeoutExpired as timeout_exc: - test_case.fail( - f"Subprocess timed out before terminating. " - "Non-daemon thread(s) probably prevented subprocess's main thread from exiting." + with test_case.subTest(f"Testing with signal {signal_name}"): + + # Call into this same module as a command. This will initiate another Python process that + # performs a lengthy data object "get" operation (see the main body of the script, below.) + process = subprocess.Popen( + [sys.executable, program], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, ) - # Assert that in the case of SIGINT, the process registered a KeyboardInterrupt. - if sig == signal.SIGINT: + + # Wait for download process to reach the point of spawning data transfer threads. In Python 3.9+ versions + # of the concurrent.futures module, these are nondaemon threads and will block the exit of the main thread + # unless measures are taken (#722). + localfile = process.stdout.readline().strip() test_case.assertTrue( - re.search("KeyboardInterrupt", process.stderr.read()), - "Did not find expected string 'KeyboardInterrupt' in log output.", + wait_till_true( + lambda: os.path.exists(localfile) + and os.stat(localfile).st_size > OBJECT_SIZE // 2 + ), + "Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.", ) + sig = getattr(signal, signal_name) + + translate_return_code = lambda s: 128 - s if s < 0 else s + + # Interrupt the subprocess with the given signal. + process.send_signal(sig) + + # Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit + # due to misproper or incomplete handling of the signal. + try: + test_case.assertEqual( + translate_return_code(process.wait(timeout=15)), + 128 + sig, + "Unexpected subprocess return code.", + ) + except subprocess.TimeoutExpired as timeout_exc: + test_case.fail( + f"Subprocess timed out before terminating. " + "Non-daemon thread(s) probably prevented subprocess's main thread from exiting." + ) + # Assert that in the case of SIGINT, the process registered a KeyboardInterrupt. + if sig == signal.SIGINT: + test_case.assertTrue( + re.search("KeyboardInterrupt", process.stderr.read()), + "Did not find expected string 'KeyboardInterrupt' in log output.", + ) + if __name__ == "__main__": # These lines are run only if the module is launched as a process. @@ -110,8 +114,19 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): print(local_path) sys.stdout.flush() - # "get" the object - session.data_objects.get(object_path, local_path) + def handler(sig,*_): + abort_parallel_transfers() + exit(128+sig) + + signal.signal(signal.SIGTERM, handler) + + try: + # download the object + session.data_objects.get(object_path, local_path) + except KeyboardInterrupt: + abort_parallel_transfers() + raise + finally: # Clean up, whether or not the download succeeded. if local_path is not None and os.path.exists(local_path): From ade42ead80b4fc14b19e23632e2638d171f0649a Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Fri, 19 Dec 2025 15:11:48 -0500 Subject: [PATCH 11/11] more informative error message when retcodes do not match --- .../modules/test_signal_handling_in_multithread_get.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/irods/test/modules/test_signal_handling_in_multithread_get.py b/irods/test/modules/test_signal_handling_in_multithread_get.py index da33e6cbb..73b8bc89d 100644 --- a/irods/test/modules/test_signal_handling_in_multithread_get.py +++ b/irods/test/modules/test_signal_handling_in_multithread_get.py @@ -65,7 +65,8 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): sig = getattr(signal, signal_name) - translate_return_code = lambda s: 128 - s if s < 0 else s + signal_offset_return_code = lambda s: 128 - s if s < 0 else s + signal_plus_128 = lambda sig: 128 + sig # Interrupt the subprocess with the given signal. process.send_signal(sig) @@ -73,10 +74,11 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")): # Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit # due to misproper or incomplete handling of the signal. try: + translated_return_code = signal_offset_return_code(process.wait(timeout=15)) test_case.assertEqual( - translate_return_code(process.wait(timeout=15)), - 128 + sig, - "Unexpected subprocess return code.", + translated_return_code, + signal_plus_128(sig), + f"Expected subprocess return code of {signal_plus_128(sig) = }; got {translated_return_code = }", ) except subprocess.TimeoutExpired as timeout_exc: test_case.fail(