Skip to content

Commit

Permalink
changefeedccl: make mock pulsar sink synchronous
Browse files Browse the repository at this point in the history
Although the comments in mock pulsar sink say that emitting messages and
flushing are synchronous, the use of a buffered channel to transmit
messages from the mock producer to the test feed makes it asynchronous.
This leads to problems where the consumer (the test) may not have stored
the data in a "durable" way before the changefeed job completes. As a
result, tests can be flaky as it does not always look like all rows have
been emitted.

This PR makes the mock pulsar sink synchronous by using an unbuffered
channel.

Fixes: #119289
Informs: #118899

Epic: CRDB-9180

Release note: None
  • Loading branch information
rharding6373 committed May 4, 2024
1 parent 515a323 commit 97dcb3d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
14 changes: 12 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5941,6 +5941,9 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {
for i := 0; i < 5; i++ {
beforeCreate := timeutil.Now()
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo VALUES (%d) RETURNING cluster_logical_timestamp()`, i))
// Read the event from the sink.
_, err := foo.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeCreate.UnixNano(), interval.Nanoseconds(), false)
}
}
Expand Down Expand Up @@ -6007,12 +6010,20 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
beforeInsert := timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (1)`)
// Read the events from the sinks.
_, err := foo.Next()
require.NoError(t, err)
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job1, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo.Close())

beforeInsert = timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (2)`)
// Read the events from the sinks.
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo2.Close())
}
Expand Down Expand Up @@ -7538,8 +7549,7 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
}
}

// TODO(#119289): re-enable pulsar
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestOmitSinks("pulsar"))
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,8 @@ type mockPulsarServer struct {

func makeMockPulsarServer() *mockPulsarServer {
return &mockPulsarServer{
msgCh: make(chan *pulsar.ProducerMessage, 2048),
// TODO(#118899): Make msgCh a buffered channel for async message sending.
msgCh: make(chan *pulsar.ProducerMessage),
}
}

Expand Down Expand Up @@ -2638,12 +2639,14 @@ func (p *mockPulsarProducer) SendAsync(
m *pulsar.ProducerMessage,
f func(pulsar.MessageID, *pulsar.ProducerMessage, error),
) {
log.Infof(ctx, "SendAsync start %d", len(p.pulsarServer.msgCh))
select {
case <-ctx.Done():
f(nil, m, ctx.Err())
case p.pulsarServer.msgCh <- m:
f(nil, m, nil)
}
log.Infof(ctx, "SendAsync end")
}

func (p *mockPulsarProducer) LastSequenceID() int64 {
Expand Down

0 comments on commit 97dcb3d

Please sign in to comment.