From 82ebe80adb2494cf42e89be117e274c5f1fbe2a1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 17:30:57 +0000 Subject: [PATCH 1/4] feat: Add use_exclusive_concurrency flag for serial partition processing This feature allows streams to be marked as exclusive so their partitions are processed serially instead of concurrently. This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, such as scroll-based pagination APIs that only allow one active scroll at a time. Changes: - Add disable_partition_concurrency field to DeclarativeStream schema - Add use_exclusive_concurrency property to AbstractStream and DefaultStream - Update ModelToComponentFactory to pass the new property - Update ConcurrentReadProcessor to handle exclusive streams serially Closes: https://github.com/airbytehq/oncall/issues/8346 Co-Authored-By: alfredo.garcia@airbyte.io --- .../concurrent_read_processor.py | 59 +++++++++++++++---- .../declarative_component_schema.yaml | 8 +++ .../models/declarative_component_schema.py | 5 ++ .../parsers/model_to_component_factory.py | 1 + .../streams/concurrent/abstract_stream.py | 10 ++++ .../streams/concurrent/default_stream.py | 6 ++ 6 files changed, 79 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 905999a4d..474d9e46f 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -3,7 +3,8 @@ # import logging import os -from typing import Dict, Iterable, List, Optional, Set +from collections import deque +from typing import Deque, Dict, Iterable, List, Optional, Set from airbyte_cdk.exception_handler import generate_failed_streams_error_message from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor @@ -65,6 +66,15 @@ def __init__( self._partition_reader = partition_reader self._streams_done: Set[str] = set() self._exceptions_per_stream_name: dict[str, List[Exception]] = {} + self._exclusive_streams: Set[str] = { + s.name for s in stream_instances_to_read_from if s.use_exclusive_concurrency + } + self._pending_partitions_per_exclusive_stream: Dict[str, Deque[Partition]] = { + stream_name: deque() for stream_name in self._exclusive_streams + } + self._exclusive_stream_partition_in_progress: Dict[str, bool] = { + stream_name: False for stream_name in self._exclusive_streams + } def on_partition_generation_completed( self, sentinel: PartitionGenerationCompletedSentinel @@ -92,7 +102,8 @@ def on_partition(self, partition: Partition) -> None: This method is called when a partition is generated. 1. Add the partition to the set of partitions for the stream 2. Log the slice if necessary - 3. Submit the partition to the thread pool manager + 3. For exclusive streams, queue the partition and only submit if no partition is in progress + 4. For non-exclusive streams, submit the partition to the thread pool manager immediately """ stream_name = partition.stream_name() self._streams_to_running_partitions[stream_name].add(partition) @@ -101,9 +112,15 @@ def on_partition(self, partition: Partition) -> None: self._message_repository.emit_message( self._slice_logger.create_slice_log_message(partition.to_slice()) ) - self._thread_pool_manager.submit( - self._partition_reader.process_partition, partition, cursor - ) + + if stream_name in self._exclusive_streams: + self._pending_partitions_per_exclusive_stream[stream_name].append(partition) + if not self._exclusive_stream_partition_in_progress[stream_name]: + self._submit_next_exclusive_partition(stream_name) + else: + self._thread_pool_manager.submit( + self._partition_reader.process_partition, partition, cursor + ) def on_partition_complete_sentinel( self, sentinel: PartitionCompleteSentinel @@ -111,20 +128,27 @@ def on_partition_complete_sentinel( """ This method is called when a partition is completed. 1. Close the partition - 2. If the stream is done, mark it as such and return a stream status message - 3. Emit messages that were added to the message repository + 2. For exclusive streams, submit the next pending partition if any + 3. If the stream is done, mark it as such and return a stream status message + 4. Emit messages that were added to the message repository """ partition = sentinel.partition + stream_name = partition.stream_name() - partitions_running = self._streams_to_running_partitions[partition.stream_name()] + partitions_running = self._streams_to_running_partitions[stream_name] if partition in partitions_running: partitions_running.remove(partition) + + if stream_name in self._exclusive_streams: + self._exclusive_stream_partition_in_progress[stream_name] = False + self._submit_next_exclusive_partition(stream_name) + # If all partitions were generated and this was the last one, the stream is done if ( - partition.stream_name() not in self._streams_currently_generating_partitions + stream_name not in self._streams_currently_generating_partitions and len(partitions_running) == 0 ): - yield from self._on_stream_is_done(partition.stream_name()) + yield from self._on_stream_is_done(stream_name) yield from self._message_repository.consume_queue() def on_record(self, record: Record) -> Iterable[AirbyteMessage]: @@ -246,3 +270,18 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: else AirbyteStreamStatus.COMPLETE ) yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) + + def _submit_next_exclusive_partition(self, stream_name: str) -> None: + """ + Submit the next pending partition for an exclusive stream. + This ensures that only one partition is processed at a time for streams + that have use_exclusive_concurrency=True. + """ + pending_partitions = self._pending_partitions_per_exclusive_stream[stream_name] + if pending_partitions: + partition = pending_partitions.popleft() + cursor = self._stream_name_to_instance[stream_name].cursor + self._exclusive_stream_partition_in_progress[stream_name] = True + self._thread_pool_manager.submit( + self._partition_reader.process_partition, partition, cursor + ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e04a82c0d..7cf722cfb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1655,6 +1655,14 @@ definitions: $parameters: type: object additionalProperties: true + disable_partition_concurrency: + title: Disable Partition Concurrency + description: > + If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently. + This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, + such as scroll-based pagination APIs that only allow one active scroll at a time. + type: boolean + default: false $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b78a07021..0c3b00518 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2558,6 +2558,11 @@ class Config: description="(experimental) Describes how to fetch a file", title="File Uploader", ) + disable_partition_concurrency: Optional[bool] = Field( + False, + description="If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently. This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, such as scroll-based pagination APIs that only allow one active scroll at a time.", + title="Disable Partition Concurrency", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3a772b691..17d3738b6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2118,6 +2118,7 @@ def create_default_stream( logger=logging.getLogger(f"airbyte.{stream_name}"), cursor=concurrent_cursor, supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader), + use_exclusive_concurrency=bool(model.disable_partition_concurrency), ) def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None: diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 667d088ab..2a5903eb4 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -90,3 +90,13 @@ def check_availability(self) -> StreamAvailability: """ :return: If the stream is available and if not, why """ + + @property + def use_exclusive_concurrency(self) -> bool: + """ + If True, partitions for this stream will be processed serially (one at a time) instead of concurrently. + This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, + such as scroll-based pagination APIs that only allow one active scroll at a time. + :return: Whether to disable concurrent partition processing for this stream. + """ + return False diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index f5d4ccf2e..26de9ede4 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -26,6 +26,7 @@ def __init__( cursor: Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False, + use_exclusive_concurrency: bool = False, ) -> None: self._stream_partition_generator = partition_generator self._name = name @@ -36,6 +37,7 @@ def __init__( self._cursor = cursor self._namespace = namespace self._supports_file_transfer = supports_file_transfer + self._use_exclusive_concurrency = use_exclusive_concurrency def generate_partitions(self) -> Iterable[Partition]: yield from self._stream_partition_generator.generate() @@ -94,6 +96,10 @@ def log_stream_sync_configuration(self) -> None: def cursor(self) -> Cursor: return self._cursor + @property + def use_exclusive_concurrency(self) -> bool: + return self._use_exclusive_concurrency + def check_availability(self) -> StreamAvailability: """ Check stream availability by attempting to read the first record of the stream. From def73300c416aa0148d0a1740a74e704c8160a1b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 17:34:49 +0000 Subject: [PATCH 2/4] test: Add unit tests for exclusive concurrency feature Co-Authored-By: alfredo.garcia@airbyte.io --- .../test_concurrent_read_processor.py | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index a681f75eb..1ae043fd5 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -792,3 +792,119 @@ def test_start_next_partition_generator(self): self._thread_pool_manager.submit.assert_called_with( self._partition_enqueuer.generate_partitions, self._stream ) + + def test_exclusive_stream_partitions_are_queued_and_processed_serially(self): + """Test that partitions for exclusive streams are queued and only one is processed at a time.""" + exclusive_stream = Mock(spec=AbstractStream) + exclusive_stream.name = _STREAM_NAME + exclusive_stream.use_exclusive_concurrency = True + exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 1 + assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 1 + assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True + + def test_exclusive_stream_submits_next_partition_on_completion(self): + """Test that the next partition is submitted when the current one completes.""" + exclusive_stream = Mock(spec=AbstractStream) + exclusive_stream.name = _STREAM_NAME + exclusive_stream.use_exclusive_concurrency = True + exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + handler.start_next_partition_generator() + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 2 + + sentinel = PartitionCompleteSentinel(partition1) + list(handler.on_partition_complete_sentinel(sentinel)) + + assert self._thread_pool_manager.submit.call_count == 3 + assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 0 + assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True + + def test_non_exclusive_stream_partitions_are_submitted_immediately(self): + """Test that partitions for non-exclusive streams are submitted immediately.""" + non_exclusive_stream = Mock(spec=AbstractStream) + non_exclusive_stream.name = _STREAM_NAME + non_exclusive_stream.use_exclusive_concurrency = False + non_exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [non_exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 2 + assert _STREAM_NAME not in handler._pending_partitions_per_exclusive_stream From 38e1579c0e5ee89916b7c77f2239b3ac07c7616a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 17:45:04 +0000 Subject: [PATCH 3/4] fix: Add use_exclusive_concurrency property to StreamFacade Co-Authored-By: alfredo.garcia@airbyte.io --- airbyte_cdk/sources/streams/concurrent/adapters.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 41674bdae..c625b7713 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -214,6 +214,10 @@ def log_stream_sync_configuration(self) -> None: def get_underlying_stream(self) -> DefaultStream: return self._abstract_stream + @property + def use_exclusive_concurrency(self) -> bool: + return self._abstract_stream.use_exclusive_concurrency + class SliceEncoder(json.JSONEncoder): def default(self, obj: Any) -> Any: From 7099a0af5b639a81f57d2de5a11107c6400ee77e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 19:43:22 +0000 Subject: [PATCH 4/4] fix: Add thread safety to exclusive partition processing - Add threading.Lock to protect shared state for exclusive streams - Wrap access to _pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress with lock - Set in_progress flag BEFORE popping from queue to prevent race condition - Add docstring note about lock requirement for _submit_next_exclusive_partition Addresses Copilot review comments about thread safety concerns. Co-Authored-By: alfredo.garcia@airbyte.io --- .../concurrent_read_processor.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 474d9e46f..d7ac3d684 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -3,6 +3,7 @@ # import logging import os +import threading from collections import deque from typing import Deque, Dict, Iterable, List, Optional, Set @@ -75,6 +76,7 @@ def __init__( self._exclusive_stream_partition_in_progress: Dict[str, bool] = { stream_name: False for stream_name in self._exclusive_streams } + self._exclusive_partition_lock = threading.Lock() def on_partition_generation_completed( self, sentinel: PartitionGenerationCompletedSentinel @@ -114,9 +116,10 @@ def on_partition(self, partition: Partition) -> None: ) if stream_name in self._exclusive_streams: - self._pending_partitions_per_exclusive_stream[stream_name].append(partition) - if not self._exclusive_stream_partition_in_progress[stream_name]: - self._submit_next_exclusive_partition(stream_name) + with self._exclusive_partition_lock: + self._pending_partitions_per_exclusive_stream[stream_name].append(partition) + if not self._exclusive_stream_partition_in_progress[stream_name]: + self._submit_next_exclusive_partition(stream_name) else: self._thread_pool_manager.submit( self._partition_reader.process_partition, partition, cursor @@ -140,8 +143,9 @@ def on_partition_complete_sentinel( partitions_running.remove(partition) if stream_name in self._exclusive_streams: - self._exclusive_stream_partition_in_progress[stream_name] = False - self._submit_next_exclusive_partition(stream_name) + with self._exclusive_partition_lock: + self._exclusive_stream_partition_in_progress[stream_name] = False + self._submit_next_exclusive_partition(stream_name) # If all partitions were generated and this was the last one, the stream is done if ( @@ -276,12 +280,17 @@ def _submit_next_exclusive_partition(self, stream_name: str) -> None: Submit the next pending partition for an exclusive stream. This ensures that only one partition is processed at a time for streams that have use_exclusive_concurrency=True. + + Note: This method must be called while holding self._exclusive_partition_lock + to ensure thread safety. """ pending_partitions = self._pending_partitions_per_exclusive_stream[stream_name] if pending_partitions: + # Set the flag BEFORE popping to prevent race conditions where another thread + # could see in_progress=False and submit another partition concurrently + self._exclusive_stream_partition_in_progress[stream_name] = True partition = pending_partitions.popleft() cursor = self._stream_name_to_instance[stream_name].cursor - self._exclusive_stream_partition_in_progress[stream_name] = True self._thread_pool_manager.submit( self._partition_reader.process_partition, partition, cursor )