Skip to content

Commit

Permalink
changefeedccl: make mock pulsar sink synchronous
Browse files Browse the repository at this point in the history
Although the comments in mock pulsar sink say that emitting messages and
flushing are synchronous, the use of a buffered channel to transmit
messages from the mock producer to the test feed makes it asynchronous.
This leads to problems where the consumer (the test) may not have stored
the data in a "durable" way before the changefeed job completes. As a
result, tests can be flaky as it does not always look like all rows have
been emitted.

This PR makes the mock pulsar sink synchronous by using an unbuffered
channel.

Fixes: #119289
Informs: #118899

Epic: CRDB-9180

Release note: None
  • Loading branch information
rharding6373 committed Apr 29, 2024
1 parent 515a323 commit d01acfd
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7538,8 +7538,7 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
}
}

// TODO(#119289): re-enable pulsar
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestOmitSinks("pulsar"))
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,8 @@ type mockPulsarServer struct {

func makeMockPulsarServer() *mockPulsarServer {
return &mockPulsarServer{
msgCh: make(chan *pulsar.ProducerMessage, 2048),
// TODO(#118899): Make msgCh a buffered channel for async message sending.
msgCh: make(chan *pulsar.ProducerMessage),
}
}

Expand Down

0 comments on commit d01acfd

Please sign in to comment.