Skip to content

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Dec 15, 2025

feat: Add use_exclusive_concurrency flag for serial partition processing

Summary

This PR adds a new disable_partition_concurrency option to declarative streams that allows certain streams to be processed serially instead of concurrently. This is needed for APIs like Intercom's scroll endpoints that only allow one active scroll at a time.

Changes:

  • Added disable_partition_concurrency field to DeclarativeStream in the YAML schema and Pydantic model
  • Added use_exclusive_concurrency property to AbstractStream (defaults to False for backward compatibility)
  • Updated DefaultStream to accept and expose the new property
  • Updated ModelToComponentFactory to pass the property when creating streams
  • Updated ConcurrentReadProcessor to queue partitions for exclusive streams and process them one at a time
  • Added use_exclusive_concurrency property to StreamFacade to delegate to the underlying stream
  • Added thread safety with threading.Lock to protect shared state for exclusive partition processing

How it works: When a stream has disable_partition_concurrency: true, the ConcurrentReadProcessor queues its partitions instead of submitting them all to the thread pool. It only submits the next partition after the current one completes, ensuring serial processing.

Closes: https://github.com/airbytehq/oncall/issues/8346

Updates since last revision

  • Added thread safety to exclusive partition processing:
    • Added threading.Lock to protect access to _pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress
    • Fixed race condition by setting in_progress flag BEFORE popping from queue
    • Added docstring noting lock requirement for _submit_next_exclusive_partition
  • Previously: Added unit tests for the exclusive concurrency feature
  • Previously: Fixed StreamFacade missing use_exclusive_concurrency property

Review & Testing Checklist for Human

  • Thread safety verification: While a lock was added, verify the implementation is correct. The lock is held while calling _thread_pool_manager.submit() - confirm this doesn't cause deadlock issues if submit blocks.
  • Pydantic model consistency: The Pydantic model was manually updated (not regenerated). Verify the field definition matches the YAML schema.
  • End-to-end test: Test with a connector that needs this feature (e.g., Intercom with companies/company_segments streams) to verify partitions are actually processed serially.

Recommended test plan:

  1. Add disable_partition_concurrency: true to a test stream in a connector manifest
  2. Run a sync and verify via logs that partitions are processed one at a time
  3. Compare behavior with and without the flag to confirm concurrent vs serial processing
  4. (Suggested by @tolik0) Test with preparation streams and parent/child stream relationships to ensure they work correctly together

Notes

  • The property is named use_exclusive_concurrency in Python code but disable_partition_concurrency in the YAML schema to make the YAML more intuitive for connector developers.
  • Backward compatible: streams without the flag continue to work as before (concurrent processing).

Link to Devin run: https://app.devin.ai/sessions/27947ef21f8f472f89b46fdf712f599d
Requested by: alfredo.garcia@airbyte.io (@agarctfi)

This feature allows streams to be marked as exclusive so their partitions
are processed serially instead of concurrently. This is useful for APIs
that have strict rate limits or do not support concurrent requests to the
same endpoint, such as scroll-based pagination APIs that only allow one
active scroll at a time.

Changes:
- Add disable_partition_concurrency field to DeclarativeStream schema
- Add use_exclusive_concurrency property to AbstractStream and DefaultStream
- Update ModelToComponentFactory to pass the new property
- Update ConcurrentReadProcessor to handle exclusive streams serially

Closes: airbytehq/oncall#8346
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
@devin-ai-integration
Copy link
Contributor Author

Original prompt from alfredo.garcia@airbyte.io
Received message in Slack channel #ask-devin-ai:

@Devin For this request, can you use the ai-fix template with this OC issue: <https://github.com/airbytehq/oncall/issues/8346>

Can you create a feature in the CDK that will allow us to specify exclusivity of concurrency for streams?
The code would have to live in the concurrent source, where the stream starts in the CDK.
The option should be defined on the stream itself.
Context: For Intercom, we need to be able to exclude streams from concurrency to prevent issues. The companies and company segments streams both use the scroll endpoints, but the API limits the amount of concurrent scrolls to one.
So to fix this properly, we would want to add this flag to these streams to they are processed serially instead of concurrently.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1765819381153309

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the enhancement New feature or request label Dec 15, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1765819582-stream-concurrency-exclusivity#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1765819582-stream-concurrency-exclusivity

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
@github-actions
Copy link

github-actions bot commented Dec 15, 2025

PyTest Results (Fast)

3 823 tests  +3 401   3 811 ✅ +3 399   6m 28s ⏱️ + 4m 50s
    1 suites ±    0      12 💤 +    3 
    1 files   ±    0       0 ❌  -     1 

Results for commit 7099a0a. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
@github-actions
Copy link

github-actions bot commented Dec 15, 2025

PyTest Results (Full)

3 826 tests  +3   3 814 ✅ +5   10m 59s ⏱️ +34s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌  - 2 

Results for commit 7099a0a. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

@agarctfi agarctfi marked this pull request as ready for review December 15, 2025 18:55
Copilot AI review requested due to automatic review settings December 15, 2025 18:55
@agarctfi agarctfi changed the title feat: Add use_exclusive_concurrency flag for serial partition processing (do not merge) feat: Add use_exclusive_concurrency flag for serial partition processing Dec 15, 2025
@agarctfi agarctfi requested review from brianjlai and tolik0 December 15, 2025 18:56
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces support for serial partition processing in declarative streams, allowing certain streams to be processed one partition at a time instead of concurrently. This is designed for APIs like Intercom that have strict constraints, such as scroll endpoints that only allow one active scroll at a time.

Key changes:

  • Added disable_partition_concurrency configuration option to the declarative stream schema (exposed as use_exclusive_concurrency in Python code)
  • Enhanced ConcurrentReadProcessor to queue partitions for exclusive streams and process them serially
  • Added comprehensive unit tests covering serial processing, partition queueing, and mixed stream scenarios

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
declarative_component_schema.yaml Adds disable_partition_concurrency field to stream configuration schema
declarative_component_schema.py Adds Pydantic model field for the new configuration option
abstract_stream.py Adds use_exclusive_concurrency property with default False for backward compatibility
default_stream.py Implements use_exclusive_concurrency property and accepts it in constructor
adapters.py Adds property to StreamFacade to delegate to underlying stream
model_to_component_factory.py Maps YAML disable_partition_concurrency to Python use_exclusive_concurrency when creating streams
concurrent_read_processor.py Implements serial partition processing logic with queue management
test_concurrent_read_processor.py Adds unit tests for exclusive concurrency feature

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +69 to +77
self._exclusive_streams: Set[str] = {
s.name for s in stream_instances_to_read_from if s.use_exclusive_concurrency
}
self._pending_partitions_per_exclusive_stream: Dict[str, Deque[Partition]] = {
stream_name: deque() for stream_name in self._exclusive_streams
}
self._exclusive_stream_partition_in_progress: Dict[str, bool] = {
stream_name: False for stream_name in self._exclusive_streams
}
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared state for exclusive stream partition processing (_pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress) is accessed from multiple methods (on_partition, on_partition_complete_sentinel, and _submit_next_exclusive_partition) that may execute in different threads. Without synchronization mechanisms (locks), this creates potential race conditions where multiple threads could read/write these dictionaries concurrently, leading to incorrect partition submission counts or lost partitions. Consider adding a threading lock to protect access to these shared data structures.

Copilot uses AI. Check for mistakes.
Comment on lines 282 to 284
partition = pending_partitions.popleft()
cursor = self._stream_name_to_instance[stream_name].cursor
self._exclusive_stream_partition_in_progress[stream_name] = True
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting _exclusive_stream_partition_in_progress[stream_name] = True after popping the partition creates a race condition window. If on_partition is called between the popleft() and setting the flag to True, it will see in_progress = False and submit another partition concurrently, violating the exclusive processing guarantee. The flag should be set to True before popping from the queue or the entire operation should be atomic.

Suggested change
partition = pending_partitions.popleft()
cursor = self._stream_name_to_instance[stream_name].cursor
self._exclusive_stream_partition_in_progress[stream_name] = True
self._exclusive_stream_partition_in_progress[stream_name] = True
partition = pending_partitions.popleft()
cursor = self._stream_name_to_instance[stream_name].cursor

Copilot uses AI. Check for mistakes.
@agarctfi
Copy link
Contributor

@brianjlai @tolik0 I tested this locally with default_concurrency set to 11 & all streams enabled, and I was able to reproduce the issue and see that it no longer occurred with this change to the CDK & manifest.

@tolik0
Copy link
Contributor

tolik0 commented Dec 15, 2025

@agarctfi It would be better to test this change using the preparation stream and its parent stream, or multiple streams that have this stream as their parent along with the parent stream itself. This would allow us to ensure that they all run at the same time.

- Add threading.Lock to protect shared state for exclusive streams
- Wrap access to _pending_partitions_per_exclusive_stream and
  _exclusive_stream_partition_in_progress with lock
- Set in_progress flag BEFORE popping from queue to prevent race condition
- Add docstring note about lock requirement for _submit_next_exclusive_partition

Addresses Copilot review comments about thread safety concerns.

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants