Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink #3233

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

loserwang1024
Copy link
Contributor

Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.

private void sinkTo(
        DataStream<Event> input,
        Sink<Event> sink,
        String sinkName,
        OperatorID schemaOperatorID) {
    DataStream<Event> stream = input;
    // Pre write topology
    if (sink instanceof WithPreWriteTopology) {
        stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
    }

    if (sink instanceof TwoPhaseCommittingSink) {
        addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
    } else {
        input.transform(
                SINK_WRITER_PREFIX + sinkName,
                CommittableMessageTypeInfo.noOutput(),
                new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
    }
} 

(ps: the modify of StarRocksUtils just apply spotless)

…e-write topology if not TwoPhaseCommittingSink
@loserwang1024
Copy link
Contributor Author

@PatrickRen , CC

@pvary
Copy link

pvary commented Apr 18, 2024

Good catch @loserwang1024!

Could you please add a test case to prevent later code changes to revert this fix?

@loserwang1024
Copy link
Contributor Author

Could you please add a test case to prevent later code changes to revert this fix?

I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.

@pvary
Copy link

pvary commented May 2, 2024

Could you please add a test case to prevent later code changes to revert this fix?

I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.

Could you define your own one in the test itself?
Then you have free hands what it does, and does not...

@loserwang1024
Copy link
Contributor Author

Could you define your own one in the test itself? Then you have free hands what it does, and does not...

Done it.

DataStreamSource<Event> inputStream = env.fromCollection(mockEvents);
DataSinkTranslator translator = new DataSinkTranslator();

String uid = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe some more descriptive content, like

String uid = "Uid set by the addPreWriteTopology topology";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried it before, but it shows that :
java.lang.IllegalArgumentException: Node hash must be a 32 character String that describes a hex code.

@loserwang1024
Copy link
Contributor Author

@PatrickRen , CC, Would you like to help me review this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants