diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 905999a4d..d7ac3d684 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -3,7 +3,9 @@ # import logging import os -from typing import Dict, Iterable, List, Optional, Set +import threading +from collections import deque +from typing import Deque, Dict, Iterable, List, Optional, Set from airbyte_cdk.exception_handler import generate_failed_streams_error_message from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor @@ -65,6 +67,16 @@ def __init__( self._partition_reader = partition_reader self._streams_done: Set[str] = set() self._exceptions_per_stream_name: dict[str, List[Exception]] = {} + self._exclusive_streams: Set[str] = { + s.name for s in stream_instances_to_read_from if s.use_exclusive_concurrency + } + self._pending_partitions_per_exclusive_stream: Dict[str, Deque[Partition]] = { + stream_name: deque() for stream_name in self._exclusive_streams + } + self._exclusive_stream_partition_in_progress: Dict[str, bool] = { + stream_name: False for stream_name in self._exclusive_streams + } + self._exclusive_partition_lock = threading.Lock() def on_partition_generation_completed( self, sentinel: PartitionGenerationCompletedSentinel @@ -92,7 +104,8 @@ def on_partition(self, partition: Partition) -> None: This method is called when a partition is generated. 1. Add the partition to the set of partitions for the stream 2. Log the slice if necessary - 3. Submit the partition to the thread pool manager + 3. For exclusive streams, queue the partition and only submit if no partition is in progress + 4. For non-exclusive streams, submit the partition to the thread pool manager immediately """ stream_name = partition.stream_name() self._streams_to_running_partitions[stream_name].add(partition) @@ -101,9 +114,16 @@ def on_partition(self, partition: Partition) -> None: self._message_repository.emit_message( self._slice_logger.create_slice_log_message(partition.to_slice()) ) - self._thread_pool_manager.submit( - self._partition_reader.process_partition, partition, cursor - ) + + if stream_name in self._exclusive_streams: + with self._exclusive_partition_lock: + self._pending_partitions_per_exclusive_stream[stream_name].append(partition) + if not self._exclusive_stream_partition_in_progress[stream_name]: + self._submit_next_exclusive_partition(stream_name) + else: + self._thread_pool_manager.submit( + self._partition_reader.process_partition, partition, cursor + ) def on_partition_complete_sentinel( self, sentinel: PartitionCompleteSentinel @@ -111,20 +131,28 @@ def on_partition_complete_sentinel( """ This method is called when a partition is completed. 1. Close the partition - 2. If the stream is done, mark it as such and return a stream status message - 3. Emit messages that were added to the message repository + 2. For exclusive streams, submit the next pending partition if any + 3. If the stream is done, mark it as such and return a stream status message + 4. Emit messages that were added to the message repository """ partition = sentinel.partition + stream_name = partition.stream_name() - partitions_running = self._streams_to_running_partitions[partition.stream_name()] + partitions_running = self._streams_to_running_partitions[stream_name] if partition in partitions_running: partitions_running.remove(partition) + + if stream_name in self._exclusive_streams: + with self._exclusive_partition_lock: + self._exclusive_stream_partition_in_progress[stream_name] = False + self._submit_next_exclusive_partition(stream_name) + # If all partitions were generated and this was the last one, the stream is done if ( - partition.stream_name() not in self._streams_currently_generating_partitions + stream_name not in self._streams_currently_generating_partitions and len(partitions_running) == 0 ): - yield from self._on_stream_is_done(partition.stream_name()) + yield from self._on_stream_is_done(stream_name) yield from self._message_repository.consume_queue() def on_record(self, record: Record) -> Iterable[AirbyteMessage]: @@ -246,3 +274,23 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: else AirbyteStreamStatus.COMPLETE ) yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) + + def _submit_next_exclusive_partition(self, stream_name: str) -> None: + """ + Submit the next pending partition for an exclusive stream. + This ensures that only one partition is processed at a time for streams + that have use_exclusive_concurrency=True. + + Note: This method must be called while holding self._exclusive_partition_lock + to ensure thread safety. + """ + pending_partitions = self._pending_partitions_per_exclusive_stream[stream_name] + if pending_partitions: + # Set the flag BEFORE popping to prevent race conditions where another thread + # could see in_progress=False and submit another partition concurrently + self._exclusive_stream_partition_in_progress[stream_name] = True + partition = pending_partitions.popleft() + cursor = self._stream_name_to_instance[stream_name].cursor + self._thread_pool_manager.submit( + self._partition_reader.process_partition, partition, cursor + ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e04a82c0d..7cf722cfb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1655,6 +1655,14 @@ definitions: $parameters: type: object additionalProperties: true + disable_partition_concurrency: + title: Disable Partition Concurrency + description: > + If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently. + This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, + such as scroll-based pagination APIs that only allow one active scroll at a time. + type: boolean + default: false $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b78a07021..0c3b00518 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2558,6 +2558,11 @@ class Config: description="(experimental) Describes how to fetch a file", title="File Uploader", ) + disable_partition_concurrency: Optional[bool] = Field( + False, + description="If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently. This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, such as scroll-based pagination APIs that only allow one active scroll at a time.", + title="Disable Partition Concurrency", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3a772b691..17d3738b6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2118,6 +2118,7 @@ def create_default_stream( logger=logging.getLogger(f"airbyte.{stream_name}"), cursor=concurrent_cursor, supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader), + use_exclusive_concurrency=bool(model.disable_partition_concurrency), ) def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None: diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 667d088ab..2a5903eb4 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -90,3 +90,13 @@ def check_availability(self) -> StreamAvailability: """ :return: If the stream is available and if not, why """ + + @property + def use_exclusive_concurrency(self) -> bool: + """ + If True, partitions for this stream will be processed serially (one at a time) instead of concurrently. + This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, + such as scroll-based pagination APIs that only allow one active scroll at a time. + :return: Whether to disable concurrent partition processing for this stream. + """ + return False diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 41674bdae..c625b7713 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -214,6 +214,10 @@ def log_stream_sync_configuration(self) -> None: def get_underlying_stream(self) -> DefaultStream: return self._abstract_stream + @property + def use_exclusive_concurrency(self) -> bool: + return self._abstract_stream.use_exclusive_concurrency + class SliceEncoder(json.JSONEncoder): def default(self, obj: Any) -> Any: diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index f5d4ccf2e..26de9ede4 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -26,6 +26,7 @@ def __init__( cursor: Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False, + use_exclusive_concurrency: bool = False, ) -> None: self._stream_partition_generator = partition_generator self._name = name @@ -36,6 +37,7 @@ def __init__( self._cursor = cursor self._namespace = namespace self._supports_file_transfer = supports_file_transfer + self._use_exclusive_concurrency = use_exclusive_concurrency def generate_partitions(self) -> Iterable[Partition]: yield from self._stream_partition_generator.generate() @@ -94,6 +96,10 @@ def log_stream_sync_configuration(self) -> None: def cursor(self) -> Cursor: return self._cursor + @property + def use_exclusive_concurrency(self) -> bool: + return self._use_exclusive_concurrency + def check_availability(self) -> StreamAvailability: """ Check stream availability by attempting to read the first record of the stream. diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index a681f75eb..1ae043fd5 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -792,3 +792,119 @@ def test_start_next_partition_generator(self): self._thread_pool_manager.submit.assert_called_with( self._partition_enqueuer.generate_partitions, self._stream ) + + def test_exclusive_stream_partitions_are_queued_and_processed_serially(self): + """Test that partitions for exclusive streams are queued and only one is processed at a time.""" + exclusive_stream = Mock(spec=AbstractStream) + exclusive_stream.name = _STREAM_NAME + exclusive_stream.use_exclusive_concurrency = True + exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 1 + assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 1 + assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True + + def test_exclusive_stream_submits_next_partition_on_completion(self): + """Test that the next partition is submitted when the current one completes.""" + exclusive_stream = Mock(spec=AbstractStream) + exclusive_stream.name = _STREAM_NAME + exclusive_stream.use_exclusive_concurrency = True + exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + handler.start_next_partition_generator() + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 2 + + sentinel = PartitionCompleteSentinel(partition1) + list(handler.on_partition_complete_sentinel(sentinel)) + + assert self._thread_pool_manager.submit.call_count == 3 + assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 0 + assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True + + def test_non_exclusive_stream_partitions_are_submitted_immediately(self): + """Test that partitions for non-exclusive streams are submitted immediately.""" + non_exclusive_stream = Mock(spec=AbstractStream) + non_exclusive_stream.name = _STREAM_NAME + non_exclusive_stream.use_exclusive_concurrency = False + non_exclusive_stream.as_airbyte_stream.return_value = AirbyteStream( + name=_STREAM_NAME, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + stream_instances_to_read_from = [non_exclusive_stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + partition1 = Mock(spec=Partition) + partition1.stream_name.return_value = _STREAM_NAME + partition1.to_slice.return_value = self._log_message + + partition2 = Mock(spec=Partition) + partition2.stream_name.return_value = _STREAM_NAME + partition2.to_slice.return_value = self._log_message + + handler.on_partition(partition1) + handler.on_partition(partition2) + + assert self._thread_pool_manager.submit.call_count == 2 + assert _STREAM_NAME not in handler._pending_partitions_per_exclusive_stream