Skip to content

Commit

Permalink
[INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normali…
Browse files Browse the repository at this point in the history
…ze operator (#7870)
  • Loading branch information
e-mhui committed Apr 19, 2023
1 parent 9a85147 commit 9038d9e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Expand Up @@ -81,6 +81,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String inlongAudit;
private final String rowValidator;
private final boolean sourceMultipleEnable;
private final boolean changelogNormalizeEnabled;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -113,7 +114,8 @@ public MongoDBTableSource(
String inlongMetric,
String inlongAudit,
String rowFilter,
Boolean sourceMultipleEnable) {
Boolean sourceMultipleEnable,
Boolean changelogNormalizeEnabled) {
this.physicalSchema = physicalSchema;
this.hosts = checkNotNull(hosts);
this.username = username;
Expand All @@ -137,11 +139,12 @@ public MongoDBTableSource(
this.inlongAudit = inlongAudit;
this.rowValidator = rowFilter;
this.sourceMultipleEnable = sourceMultipleEnable;
this.changelogNormalizeEnabled = changelogNormalizeEnabled;
}

@Override
public ChangelogMode getChangelogMode() {
if (this.sourceMultipleEnable) {
if (this.sourceMultipleEnable || !changelogNormalizeEnabled) {
return ChangelogMode.all();
} else {
return ChangelogMode.newBuilder()
Expand Down Expand Up @@ -286,7 +289,8 @@ public DynamicTableSource copy() {
inlongMetric,
inlongAudit,
rowValidator,
sourceMultipleEnable);
sourceMultipleEnable,
changelogNormalizeEnabled);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down
Expand Up @@ -73,6 +73,15 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
+ "\"+U\" represents UPDATE_AFTER.\n"
+ "\"-D\" represents DELETE.");

public static final ConfigOption<Boolean> CHANGELOG_NORMALIZE_ENABLED =
ConfigOptions.key("changelog.normalize.enabled")
.booleanType()
.defaultValue(Boolean.TRUE)
.withDescription("MongoDB's Change Stream lacks the -U message, "
+ "so it needs to be converted to Flink UPSERT changelog using "
+ "the Changelog Normalize operator. The default value is true. (For scenarios that do not "
+ "require the -U message, this operator can be disabled.) \n");

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
Expand Down Expand Up @@ -121,6 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty()
? ROW_KINDS_FILTERED.defaultValue()
: config.get(ROW_KINDS_FILTERED);
Boolean changelogNormalizeEnabled = config.get(CHANGELOG_NORMALIZE_ENABLED);

return new MongoDBTableSource(
physicalSchema,
Expand All @@ -143,7 +153,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
inlongMetric,
inlongAudit,
rowKindFiltered,
sourceMultipleEnable);
sourceMultipleEnable,
changelogNormalizeEnabled);
}

private void checkPrimaryKey(UniqueConstraint pk, String message) {
Expand Down Expand Up @@ -186,6 +197,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHANGELOG_NORMALIZE_ENABLED);
return options;
}
}

0 comments on commit 9038d9e

Please sign in to comment.