-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Dynamic Sink: Add support for dropping columns #14728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Before I go into the details of the PR, could you please help me understand what to expect in the interim period, when the sink receives records with and without the dropped column? |
New records will write with the new schema with any columns not part of the input schema dropped. Old records will continue to write with the old schema, which still exists. If there are any previously unseen schemas which include removed columns, those columns will be re-added as new columns. This is a catch which users will have to accept. That's why the feature is opt-in and disabled by default. |
|
Essentially, there’s a race condition between adding and dropping columns. For example, if a user does the following:
If these actions occur within a short time frame and the streams are skewed, the table could end up with either:
Afterward, querying the table with the “old” schema becomes difficult. Additionally, users cannot revert the table to any previously created schema using DynamicSink. This behavior is consistent with the current implementation, but with column-dropping support, users might expect this capability. @Guosmilesmile: Would these restrictions impact your use cases? |
|
IMHO this is fine if the user opts in. We deliberately chose not to allow dropping columns because of this race condition; it's important that this remains the default setting. I agree that we should add more documentation around the semantics of removing columns. |
|
|
Cool! Seems like an ok feature. |
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
Outdated
Show resolved
Hide resolved
|
Rebased to fix merge conflicts in CI after #14406. |
This commit adds support for strict 1:1 schema synchronization by allowing columns to be dropped from the table when they are not present in the input schema. This is controlled via a new dropUnusedColumns parameter in DynamicIcebergSink. The default behavior (dropUnusedColumns=false) remains unchanged.
.../v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
Show resolved
Hide resolved
| * We don't support: | ||
| * | ||
| * <ul> | ||
| * <li>Dropping columns | ||
| * <li>Renaming columns | ||
| * </ul> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove the list, as it is not needed anymore 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, I'll keep this list with one element 😄
|
Please update the documentation, and highlight the caveats, like what happens when the column is added back with the same name, and what happens when multiple schema changes happen simultaneously |
This commit adds support for strict 1:1 schema synchronization by allowing columns to be dropped from the table when they are not present in the input schema. This is controlled via a new dropUnusedColumns parameter in DynamicIcebergSink.
The default behavior (dropUnusedColumns=false) remains unchanged.