From 9038d9e61d0fc801938165f41b5a463c8703f081 Mon Sep 17 00:00:00 2001 From: emhui <111486498+e-mhui@users.noreply.github.com> Date: Wed, 19 Apr 2023 12:47:34 +0800 Subject: [PATCH] [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870) --- .../sort/cdc/mongodb/table/MongoDBTableSource.java | 10 +++++++--- .../mongodb/table/MongoDBTableSourceFactory.java | 14 +++++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java index 1770884c63a..66f1ec1a660 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java @@ -81,6 +81,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final String inlongAudit; private final String rowValidator; private final boolean sourceMultipleEnable; + private final boolean changelogNormalizeEnabled; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -113,7 +114,8 @@ public MongoDBTableSource( String inlongMetric, String inlongAudit, String rowFilter, - Boolean sourceMultipleEnable) { + Boolean sourceMultipleEnable, + Boolean changelogNormalizeEnabled) { this.physicalSchema = physicalSchema; this.hosts = checkNotNull(hosts); this.username = username; @@ -137,11 +139,12 @@ public MongoDBTableSource( this.inlongAudit = inlongAudit; this.rowValidator = rowFilter; this.sourceMultipleEnable = sourceMultipleEnable; + this.changelogNormalizeEnabled = changelogNormalizeEnabled; } @Override public ChangelogMode getChangelogMode() { - if (this.sourceMultipleEnable) { + if (this.sourceMultipleEnable || !changelogNormalizeEnabled) { return ChangelogMode.all(); } else { return ChangelogMode.newBuilder() @@ -286,7 +289,8 @@ public DynamicTableSource copy() { inlongMetric, inlongAudit, rowValidator, - sourceMultipleEnable); + sourceMultipleEnable, + changelogNormalizeEnabled); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java index 1808bc9f73f..53674840a59 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java @@ -73,6 +73,15 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { + "\"+U\" represents UPDATE_AFTER.\n" + "\"-D\" represents DELETE."); + public static final ConfigOption CHANGELOG_NORMALIZE_ENABLED = + ConfigOptions.key("changelog.normalize.enabled") + .booleanType() + .defaultValue(Boolean.TRUE) + .withDescription("MongoDB's Change Stream lacks the -U message, " + + "so it needs to be converted to Flink UPSERT changelog using " + + "the Changelog Normalize operator. The default value is true. (For scenarios that do not " + + "require the -U message, this operator can be disabled.) \n"); + @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = @@ -121,6 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty() ? ROW_KINDS_FILTERED.defaultValue() : config.get(ROW_KINDS_FILTERED); + Boolean changelogNormalizeEnabled = config.get(CHANGELOG_NORMALIZE_ENABLED); return new MongoDBTableSource( physicalSchema, @@ -143,7 +153,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { inlongMetric, inlongAudit, rowKindFiltered, - sourceMultipleEnable); + sourceMultipleEnable, + changelogNormalizeEnabled); } private void checkPrimaryKey(UniqueConstraint pk, String message) { @@ -186,6 +197,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); options.add(CHUNK_META_GROUP_SIZE); + options.add(CHANGELOG_NORMALIZE_ENABLED); return options; } }