-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7713] Enforce ordering of fields during schema reconciliation #11154
base: master
Are you sure you want to change the base?
[HUDI-7713] Enforce ordering of fields during schema reconciliation #11154
Conversation
Schema expected = createRecord("reorderNestedFields", | ||
createPrimitiveField("field1", Schema.Type.INT), | ||
createPrimitiveField("field2", Schema.Type.INT), | ||
createArrayField("field3", createRecord("reorderNestedFields.field3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonvex can you confirm this is the expected naming after reconcile is run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, that doesn't look right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should it look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be "nestedRecord" I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java#L341 - this logic was not created/updated by me. Do you want me to change it as part of this PR?
InternalSchema targetInternalSchema = convert(targetSchema); | ||
// Use existing fieldIds for consistent field ordering between commits when shouldReorderColumns is true | ||
InternalSchema sourceInternalSchema = convert(sourceSchema, shouldReorderColumns ? targetInternalSchema.getNameToPosition() : Collections.emptyMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why only source schema? wny not reorder target schema too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The target schema is the source for the ordering. In this code, the target schema is the existing table and the source is the incoming dataset
val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) { | ||
canonicalizeSchema(sourceSchema, latestTableSchema, opts) | ||
canonicalizeSchema(sourceSchema, latestTableSchema, opts, !shouldReconcileSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why shouldn't we reorder columns when reconcile schema is true? Can you please add a note in the comment regarding this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonvex advised to do this. I think it is because reconcile is schema on read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reconcile is not necessarily dependent on schema on read. I think the reason might have been to not conflict schema reconciliation rules incase that is enabled. @jonvex to clarify. Whatever be the reason, let's add a comment for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reconcile has been deprecated, so we shouldn't modify it's behavior
hudi-common/src/main/java/org/apache/hudi/internal/schema/visitor/NameToPositionVisitor.java
Outdated
Show resolved
Hide resolved
import static org.apache.hudi.internal.schema.utils.InternalSchemaUtils.createFullName; | ||
|
||
/** | ||
* Schema visitor to produce name -> id map for internalSchema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Schema visitor to produce name -> id map for internalSchema. | |
* Schema visitor to produce name -> position map for internalSchema, where position indicates position of the field in the schema. |
@@ -67,6 +68,10 @@ public Map<String, Integer> buildNameToId(Type type) { | |||
return visit(type, new NameToIDVisitor()); | |||
} | |||
|
|||
Map<String, Integer> buildNameToPosition(Type type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level question: Do we use the InternalSchemaBuilder even when schema on read is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal schema seems to provide some nice utilities but I was not familiar with it before this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use internal schema even when schema on read is disabled. We use it to add null for missing columns, promote incoming batch if it can be promoted to the table schema, and also to fix the ordering of unions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@jonvex can you also review?
@@ -169,20 +169,35 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser | |||
// 2. Write 2d batch with another schema (added column `age`) | |||
// | |||
|
|||
val secondSchema = StructType( | |||
val secondInputSchema = StructType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the behavior be the same when reconcile is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but reconcile is not always enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments
Change Logs
Impact
Risk level (write none, low medium or high below)
None, just reorders fields
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist