-
Notifications
You must be signed in to change notification settings - Fork 32
feat: Add use_exclusive_concurrency flag for serial partition processing #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add use_exclusive_concurrency flag for serial partition processing #867
Conversation
This feature allows streams to be marked as exclusive so their partitions are processed serially instead of concurrently. This is useful for APIs that have strict rate limits or do not support concurrent requests to the same endpoint, such as scroll-based pagination APIs that only allow one active scroll at a time. Changes: - Add disable_partition_concurrency field to DeclarativeStream schema - Add use_exclusive_concurrency property to AbstractStream and DefaultStream - Update ModelToComponentFactory to pass the new property - Update ConcurrentReadProcessor to handle exclusive streams serially Closes: airbytehq/oncall#8346 Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Original prompt from alfredo.garcia@airbyte.io |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1765819582-stream-concurrency-exclusivity#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1765819582-stream-concurrency-exclusivityHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces support for serial partition processing in declarative streams, allowing certain streams to be processed one partition at a time instead of concurrently. This is designed for APIs like Intercom that have strict constraints, such as scroll endpoints that only allow one active scroll at a time.
Key changes:
- Added
disable_partition_concurrencyconfiguration option to the declarative stream schema (exposed asuse_exclusive_concurrencyin Python code) - Enhanced
ConcurrentReadProcessorto queue partitions for exclusive streams and process them serially - Added comprehensive unit tests covering serial processing, partition queueing, and mixed stream scenarios
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
declarative_component_schema.yaml |
Adds disable_partition_concurrency field to stream configuration schema |
declarative_component_schema.py |
Adds Pydantic model field for the new configuration option |
abstract_stream.py |
Adds use_exclusive_concurrency property with default False for backward compatibility |
default_stream.py |
Implements use_exclusive_concurrency property and accepts it in constructor |
adapters.py |
Adds property to StreamFacade to delegate to underlying stream |
model_to_component_factory.py |
Maps YAML disable_partition_concurrency to Python use_exclusive_concurrency when creating streams |
concurrent_read_processor.py |
Implements serial partition processing logic with queue management |
test_concurrent_read_processor.py |
Adds unit tests for exclusive concurrency feature |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._exclusive_streams: Set[str] = { | ||
| s.name for s in stream_instances_to_read_from if s.use_exclusive_concurrency | ||
| } | ||
| self._pending_partitions_per_exclusive_stream: Dict[str, Deque[Partition]] = { | ||
| stream_name: deque() for stream_name in self._exclusive_streams | ||
| } | ||
| self._exclusive_stream_partition_in_progress: Dict[str, bool] = { | ||
| stream_name: False for stream_name in self._exclusive_streams | ||
| } |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shared state for exclusive stream partition processing (_pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress) is accessed from multiple methods (on_partition, on_partition_complete_sentinel, and _submit_next_exclusive_partition) that may execute in different threads. Without synchronization mechanisms (locks), this creates potential race conditions where multiple threads could read/write these dictionaries concurrently, leading to incorrect partition submission counts or lost partitions. Consider adding a threading lock to protect access to these shared data structures.
| partition = pending_partitions.popleft() | ||
| cursor = self._stream_name_to_instance[stream_name].cursor | ||
| self._exclusive_stream_partition_in_progress[stream_name] = True |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting _exclusive_stream_partition_in_progress[stream_name] = True after popping the partition creates a race condition window. If on_partition is called between the popleft() and setting the flag to True, it will see in_progress = False and submit another partition concurrently, violating the exclusive processing guarantee. The flag should be set to True before popping from the queue or the entire operation should be atomic.
| partition = pending_partitions.popleft() | |
| cursor = self._stream_name_to_instance[stream_name].cursor | |
| self._exclusive_stream_partition_in_progress[stream_name] = True | |
| self._exclusive_stream_partition_in_progress[stream_name] = True | |
| partition = pending_partitions.popleft() | |
| cursor = self._stream_name_to_instance[stream_name].cursor |
|
@brianjlai @tolik0 I tested this locally with |
|
@agarctfi It would be better to test this change using the preparation stream and its parent stream, or multiple streams that have this stream as their parent along with the parent stream itself. This would allow us to ensure that they all run at the same time. |
- Add threading.Lock to protect shared state for exclusive streams - Wrap access to _pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress with lock - Set in_progress flag BEFORE popping from queue to prevent race condition - Add docstring note about lock requirement for _submit_next_exclusive_partition Addresses Copilot review comments about thread safety concerns. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
feat: Add use_exclusive_concurrency flag for serial partition processing
Summary
This PR adds a new
disable_partition_concurrencyoption to declarative streams that allows certain streams to be processed serially instead of concurrently. This is needed for APIs like Intercom's scroll endpoints that only allow one active scroll at a time.Changes:
disable_partition_concurrencyfield toDeclarativeStreamin the YAML schema and Pydantic modeluse_exclusive_concurrencyproperty toAbstractStream(defaults toFalsefor backward compatibility)DefaultStreamto accept and expose the new propertyModelToComponentFactoryto pass the property when creating streamsConcurrentReadProcessorto queue partitions for exclusive streams and process them one at a timeuse_exclusive_concurrencyproperty toStreamFacadeto delegate to the underlying streamthreading.Lockto protect shared state for exclusive partition processingHow it works: When a stream has
disable_partition_concurrency: true, theConcurrentReadProcessorqueues its partitions instead of submitting them all to the thread pool. It only submits the next partition after the current one completes, ensuring serial processing.Closes: https://github.com/airbytehq/oncall/issues/8346
Updates since last revision
threading.Lockto protect access to_pending_partitions_per_exclusive_streamand_exclusive_stream_partition_in_progressin_progressflag BEFORE popping from queue_submit_next_exclusive_partitionStreamFacademissinguse_exclusive_concurrencypropertyReview & Testing Checklist for Human
_thread_pool_manager.submit()- confirm this doesn't cause deadlock issues if submit blocks.Recommended test plan:
disable_partition_concurrency: trueto a test stream in a connector manifestNotes
use_exclusive_concurrencyin Python code butdisable_partition_concurrencyin the YAML schema to make the YAML more intuitive for connector developers.Link to Devin run: https://app.devin.ai/sessions/27947ef21f8f472f89b46fdf712f599d
Requested by: alfredo.garcia@airbyte.io (@agarctfi)