Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#
import logging
import os
from typing import Dict, Iterable, List, Optional, Set
import threading
from collections import deque
from typing import Deque, Dict, Iterable, List, Optional, Set

from airbyte_cdk.exception_handler import generate_failed_streams_error_message
from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
Expand Down Expand Up @@ -65,6 +67,16 @@ def __init__(
self._partition_reader = partition_reader
self._streams_done: Set[str] = set()
self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
self._exclusive_streams: Set[str] = {
s.name for s in stream_instances_to_read_from if s.use_exclusive_concurrency
}
self._pending_partitions_per_exclusive_stream: Dict[str, Deque[Partition]] = {
stream_name: deque() for stream_name in self._exclusive_streams
}
self._exclusive_stream_partition_in_progress: Dict[str, bool] = {
stream_name: False for stream_name in self._exclusive_streams
}
Comment on lines +70 to +78
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared state for exclusive stream partition processing (_pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress) is accessed from multiple methods (on_partition, on_partition_complete_sentinel, and _submit_next_exclusive_partition) that may execute in different threads. Without synchronization mechanisms (locks), this creates potential race conditions where multiple threads could read/write these dictionaries concurrently, leading to incorrect partition submission counts or lost partitions. Consider adding a threading lock to protect access to these shared data structures.

Copilot uses AI. Check for mistakes.
self._exclusive_partition_lock = threading.Lock()

def on_partition_generation_completed(
self, sentinel: PartitionGenerationCompletedSentinel
Expand Down Expand Up @@ -92,7 +104,8 @@ def on_partition(self, partition: Partition) -> None:
This method is called when a partition is generated.
1. Add the partition to the set of partitions for the stream
2. Log the slice if necessary
3. Submit the partition to the thread pool manager
3. For exclusive streams, queue the partition and only submit if no partition is in progress
4. For non-exclusive streams, submit the partition to the thread pool manager immediately
"""
stream_name = partition.stream_name()
self._streams_to_running_partitions[stream_name].add(partition)
Expand All @@ -101,30 +114,45 @@ def on_partition(self, partition: Partition) -> None:
self._message_repository.emit_message(
self._slice_logger.create_slice_log_message(partition.to_slice())
)
self._thread_pool_manager.submit(
self._partition_reader.process_partition, partition, cursor
)

if stream_name in self._exclusive_streams:
with self._exclusive_partition_lock:
self._pending_partitions_per_exclusive_stream[stream_name].append(partition)
if not self._exclusive_stream_partition_in_progress[stream_name]:
self._submit_next_exclusive_partition(stream_name)
else:
self._thread_pool_manager.submit(
self._partition_reader.process_partition, partition, cursor
)

def on_partition_complete_sentinel(
self, sentinel: PartitionCompleteSentinel
) -> Iterable[AirbyteMessage]:
"""
This method is called when a partition is completed.
1. Close the partition
2. If the stream is done, mark it as such and return a stream status message
3. Emit messages that were added to the message repository
2. For exclusive streams, submit the next pending partition if any
3. If the stream is done, mark it as such and return a stream status message
4. Emit messages that were added to the message repository
"""
partition = sentinel.partition
stream_name = partition.stream_name()

partitions_running = self._streams_to_running_partitions[partition.stream_name()]
partitions_running = self._streams_to_running_partitions[stream_name]
if partition in partitions_running:
partitions_running.remove(partition)

if stream_name in self._exclusive_streams:
with self._exclusive_partition_lock:
self._exclusive_stream_partition_in_progress[stream_name] = False
self._submit_next_exclusive_partition(stream_name)

# If all partitions were generated and this was the last one, the stream is done
if (
partition.stream_name() not in self._streams_currently_generating_partitions
stream_name not in self._streams_currently_generating_partitions
and len(partitions_running) == 0
):
yield from self._on_stream_is_done(partition.stream_name())
yield from self._on_stream_is_done(stream_name)
yield from self._message_repository.consume_queue()

def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
Expand Down Expand Up @@ -246,3 +274,23 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
else AirbyteStreamStatus.COMPLETE
)
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)

def _submit_next_exclusive_partition(self, stream_name: str) -> None:
"""
Submit the next pending partition for an exclusive stream.
This ensures that only one partition is processed at a time for streams
that have use_exclusive_concurrency=True.

Note: This method must be called while holding self._exclusive_partition_lock
to ensure thread safety.
"""
pending_partitions = self._pending_partitions_per_exclusive_stream[stream_name]
if pending_partitions:
# Set the flag BEFORE popping to prevent race conditions where another thread
# could see in_progress=False and submit another partition concurrently
self._exclusive_stream_partition_in_progress[stream_name] = True
partition = pending_partitions.popleft()
cursor = self._stream_name_to_instance[stream_name].cursor
self._thread_pool_manager.submit(
self._partition_reader.process_partition, partition, cursor
)
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,14 @@ definitions:
$parameters:
type: object
additionalProperties: true
disable_partition_concurrency:
title: Disable Partition Concurrency
description: >
If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently.
This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint,
such as scroll-based pagination APIs that only allow one active scroll at a time.
type: boolean
default: false
$parameters:
type: object
additional_properties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,11 @@ class Config:
description="(experimental) Describes how to fetch a file",
title="File Uploader",
)
disable_partition_concurrency: Optional[bool] = Field(
False,
description="If set to true, partitions for this stream will be processed serially (one at a time) instead of concurrently. This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, such as scroll-based pagination APIs that only allow one active scroll at a time.",
title="Disable Partition Concurrency",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,7 @@ def create_default_stream(
logger=logging.getLogger(f"airbyte.{stream_name}"),
cursor=concurrent_cursor,
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
use_exclusive_concurrency=bool(model.disable_partition_concurrency),
)

def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None:
Expand Down
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,13 @@ def check_availability(self) -> StreamAvailability:
"""
:return: If the stream is available and if not, why
"""

@property
def use_exclusive_concurrency(self) -> bool:
"""
If True, partitions for this stream will be processed serially (one at a time) instead of concurrently.
This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint,
such as scroll-based pagination APIs that only allow one active scroll at a time.
:return: Whether to disable concurrent partition processing for this stream.
"""
return False
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ def log_stream_sync_configuration(self) -> None:
def get_underlying_stream(self) -> DefaultStream:
return self._abstract_stream

@property
def use_exclusive_concurrency(self) -> bool:
return self._abstract_stream.use_exclusive_concurrency


class SliceEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
Expand Down
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
use_exclusive_concurrency: bool = False,
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -36,6 +37,7 @@ def __init__(
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer
self._use_exclusive_concurrency = use_exclusive_concurrency

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -94,6 +96,10 @@ def log_stream_sync_configuration(self) -> None:
def cursor(self) -> Cursor:
return self._cursor

@property
def use_exclusive_concurrency(self) -> bool:
return self._use_exclusive_concurrency

def check_availability(self) -> StreamAvailability:
"""
Check stream availability by attempting to read the first record of the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,3 +792,119 @@ def test_start_next_partition_generator(self):
self._thread_pool_manager.submit.assert_called_with(
self._partition_enqueuer.generate_partitions, self._stream
)

def test_exclusive_stream_partitions_are_queued_and_processed_serially(self):
"""Test that partitions for exclusive streams are queued and only one is processed at a time."""
exclusive_stream = Mock(spec=AbstractStream)
exclusive_stream.name = _STREAM_NAME
exclusive_stream.use_exclusive_concurrency = True
exclusive_stream.as_airbyte_stream.return_value = AirbyteStream(
name=_STREAM_NAME,
json_schema={},
supported_sync_modes=[SyncMode.full_refresh],
)

stream_instances_to_read_from = [exclusive_stream]
handler = ConcurrentReadProcessor(
stream_instances_to_read_from,
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)

partition1 = Mock(spec=Partition)
partition1.stream_name.return_value = _STREAM_NAME
partition1.to_slice.return_value = self._log_message

partition2 = Mock(spec=Partition)
partition2.stream_name.return_value = _STREAM_NAME
partition2.to_slice.return_value = self._log_message

handler.on_partition(partition1)
handler.on_partition(partition2)

assert self._thread_pool_manager.submit.call_count == 1
assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 1
assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True

def test_exclusive_stream_submits_next_partition_on_completion(self):
"""Test that the next partition is submitted when the current one completes."""
exclusive_stream = Mock(spec=AbstractStream)
exclusive_stream.name = _STREAM_NAME
exclusive_stream.use_exclusive_concurrency = True
exclusive_stream.as_airbyte_stream.return_value = AirbyteStream(
name=_STREAM_NAME,
json_schema={},
supported_sync_modes=[SyncMode.full_refresh],
)

stream_instances_to_read_from = [exclusive_stream]
handler = ConcurrentReadProcessor(
stream_instances_to_read_from,
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)
handler.start_next_partition_generator()

partition1 = Mock(spec=Partition)
partition1.stream_name.return_value = _STREAM_NAME
partition1.to_slice.return_value = self._log_message

partition2 = Mock(spec=Partition)
partition2.stream_name.return_value = _STREAM_NAME
partition2.to_slice.return_value = self._log_message

handler.on_partition(partition1)
handler.on_partition(partition2)

assert self._thread_pool_manager.submit.call_count == 2

sentinel = PartitionCompleteSentinel(partition1)
list(handler.on_partition_complete_sentinel(sentinel))

assert self._thread_pool_manager.submit.call_count == 3
assert len(handler._pending_partitions_per_exclusive_stream[_STREAM_NAME]) == 0
assert handler._exclusive_stream_partition_in_progress[_STREAM_NAME] is True

def test_non_exclusive_stream_partitions_are_submitted_immediately(self):
"""Test that partitions for non-exclusive streams are submitted immediately."""
non_exclusive_stream = Mock(spec=AbstractStream)
non_exclusive_stream.name = _STREAM_NAME
non_exclusive_stream.use_exclusive_concurrency = False
non_exclusive_stream.as_airbyte_stream.return_value = AirbyteStream(
name=_STREAM_NAME,
json_schema={},
supported_sync_modes=[SyncMode.full_refresh],
)

stream_instances_to_read_from = [non_exclusive_stream]
handler = ConcurrentReadProcessor(
stream_instances_to_read_from,
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)

partition1 = Mock(spec=Partition)
partition1.stream_name.return_value = _STREAM_NAME
partition1.to_slice.return_value = self._log_message

partition2 = Mock(spec=Partition)
partition2.stream_name.return_value = _STREAM_NAME
partition2.to_slice.return_value = self._log_message

handler.on_partition(partition1)
handler.on_partition(partition2)

assert self._thread_pool_manager.submit.call_count == 2
assert _STREAM_NAME not in handler._pending_partitions_per_exclusive_stream
Loading