Skip to content

Commit

Permalink
Merge #123791
Browse files Browse the repository at this point in the history
123791: Revert "changefeedccl: make mock pulsar sink synchronous" r=rharding6373 a=rharding6373

Reverts #123228

This is causing flaky pulsar tests (e.g., #123720)

Epic: None

Release note: none

Co-authored-by: Rachael Harding <rharding6373@users.noreply.github.com>
  • Loading branch information
craig[bot] and rharding6373 committed May 7, 2024
2 parents 09ac089 + 7ba2ab8 commit 1fb89af
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 16 deletions.
14 changes: 2 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5941,9 +5941,6 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {
for i := 0; i < 5; i++ {
beforeCreate := timeutil.Now()
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo VALUES (%d) RETURNING cluster_logical_timestamp()`, i))
// Read the event from the sink.
_, err := foo.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeCreate.UnixNano(), interval.Nanoseconds(), false)
}
}
Expand Down Expand Up @@ -6010,20 +6007,12 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
beforeInsert := timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (1)`)
// Read the events from the sinks.
_, err := foo.Next()
require.NoError(t, err)
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job1, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo.Close())

beforeInsert = timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (2)`)
// Read the events from the sinks.
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo2.Close())
}
Expand Down Expand Up @@ -7550,7 +7539,8 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
}
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
// TODO(#119289): re-enable pulsar
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestOmitSinks("pulsar"))
}

func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,8 +2584,7 @@ type mockPulsarServer struct {

func makeMockPulsarServer() *mockPulsarServer {
return &mockPulsarServer{
// TODO(#118899): Make msgCh a buffered channel for async message sending.
msgCh: make(chan *pulsar.ProducerMessage),
msgCh: make(chan *pulsar.ProducerMessage, 2048),
}
}

Expand Down Expand Up @@ -2639,14 +2638,12 @@ func (p *mockPulsarProducer) SendAsync(
m *pulsar.ProducerMessage,
f func(pulsar.MessageID, *pulsar.ProducerMessage, error),
) {
log.Infof(ctx, "SendAsync start %d", len(p.pulsarServer.msgCh))
select {
case <-ctx.Done():
f(nil, m, ctx.Err())
case p.pulsarServer.msgCh <- m:
f(nil, m, nil)
}
log.Infof(ctx, "SendAsync end")
}

func (p *mockPulsarProducer) LastSequenceID() int64 {
Expand Down

0 comments on commit 1fb89af

Please sign in to comment.