Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: replicate manual split points #123225

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 75 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
// SpanConfigEvent indicates that the SpanConfig field of an event holds an updated
// SpanConfigRecord.
SpanConfigEvent
// SplitEvent indicates that the SplitKey field of an event holds a split key.
SplitEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
Expand All @@ -59,6 +61,9 @@ type Event interface {

// GetSpanConfigEvent returns a SpanConfig event if the EventType is SpanConfigEvent
GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry

// GetSplitEvent returns the split event if the EventType is a SplitEvent
GetSplitEvent() *roachpb.Key
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -98,6 +103,11 @@ func (kve kvEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (kve kvEvent) GetSplitEvent() *roachpb.Key {
return nil
}

// sstableEvent is a sstable that needs to be ingested.
type sstableEvent struct {
sst kvpb.RangeFeedSSTable
Expand Down Expand Up @@ -133,6 +143,11 @@ func (sste sstableEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (sste sstableEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = sstableEvent{}

// delRangeEvent is a DeleteRange event that needs to be ingested.
Expand Down Expand Up @@ -170,6 +185,11 @@ func (dre delRangeEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (dre delRangeEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = delRangeEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
Expand Down Expand Up @@ -210,6 +230,11 @@ func (ce checkpointEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (ce checkpointEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type spanConfigEvent struct {
spanConfig streampb.StreamedSpanConfigEntry
}
Expand Down Expand Up @@ -246,6 +271,52 @@ func (spe spanConfigEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntr
return &spe.spanConfig
}

// GetSplitEvent implements the Event interface.
func (spe spanConfigEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type splitEvent struct {
splitKey roachpb.Key
}

var _ Event = splitEvent{}

// Type implements the Event interface.
func (se splitEvent) Type() EventType {
return SplitEvent
}

// GetKV implements the Event interface.
func (se splitEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (se splitEvent) GetSSTable() *kvpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (se splitEvent) GetDeleteRange() *kvpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (se splitEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (se splitEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (se splitEvent) GetSplitEvent() *roachpb.Key {
return &se.splitKey
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -269,3 +340,7 @@ func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
func MakeSpanConfigEvent(streamedSpanConfig streampb.StreamedSpanConfigEntry) Event {
return spanConfigEvent{spanConfig: streamedSpanConfig}
}

func MakeSplitEvent(splitKey roachpb.Key) Event {
return splitEvent{splitKey: splitKey}
}
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {
case len(streamEvent.Batch.SpanConfigs) > 0:
event = streamingccl.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0])
streamEvent.Batch.SpanConfigs = streamEvent.Batch.SpanConfigs[1:]
case len(streamEvent.Batch.SplitPoints) > 0:
event = streamingccl.MakeSplitEvent(streamEvent.Batch.SplitPoints[0])
streamEvent.Batch.SplitPoints = streamEvent.Batch.SplitPoints[1:]
}

if len(streamEvent.Batch.KeyValues) == 0 &&
len(streamEvent.Batch.Ssts) == 0 &&
len(streamEvent.Batch.DelRanges) == 0 &&
len(streamEvent.Batch.SpanConfigs) == 0 {
len(streamEvent.Batch.SpanConfigs) == 0 &&
len(streamEvent.Batch.SplitPoints) == 0 {
streamEvent.Batch = nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,39 @@ INSERT INTO d.t_for_import (i) VALUES (1);
cutoverTime := c.SrcSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
}

func TestReplicateManualSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, replicationtestutils.DefaultTenantStreamingClustersArgs)
defer cleanup()
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING physical_replication.consumer.ingest_split_event.enabled = true")

c.SrcTenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 100)")
var fooTableID int
c.SrcTenantSQL.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'foo'").Scan(&fooTableID)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcSysServer.Clock().Now(), jobspb.JobID(ingestionJobID))

c.SrcTenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (50)")
expectedSplitKey := fmt.Sprintf("Table/%d/1/50", fooTableID)
splitKeyQuery := fmt.Sprintf("SELECT count(*) FROM crdb_internal.ranges WHERE start_pretty ~ '%s'", expectedSplitKey)

testutils.SucceedsSoon(t, func() error {
var splitKeyPresent int
c.DestSysSQL.QueryRow(t, splitKeyQuery).Scan(&splitKeyPresent)
if splitKeyPresent == 0 {
return errors.Newf("split key not present")
}
return nil
})
}

func TestTenantStreamingDeleteRange(t *testing.T) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ var quantize = settings.RegisterDurationSettingWithExplicitUnit(
5*time.Second,
)

var ingestSplitEvent = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.consumer.ingest_split_event.enabled",
"whether to ingest split events",
false,
)

var streamIngestionResultTypes = []*types.T{
types.Bytes, // jobspb.ResolvedSpans
}
Expand Down Expand Up @@ -742,6 +749,10 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error {
return err
}
return nil
case streamingccl.SplitEvent:
if err := sip.handleSplitEvent(event.GetSplitEvent()); err != nil {
return err
}
default:
return errors.Newf("unknown streaming event type %v", event.Type())
}
Expand Down Expand Up @@ -839,6 +850,26 @@ func (sip *streamIngestionProcessor) bufferRangeKeyVal(
return nil
}

func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
ctx, sp := tracing.ChildSpan(sip.Ctx(), "replicated-split")
defer sp.Finish()
if !ingestSplitEvent.Get(&sip.EvalCtx.Settings.SV) {
return nil
}

kvDB := sip.FlowCtx.Cfg.DB.KV()
rekey, ok, err := sip.rekey(*key)
if err != nil {
return err
}
if !ok {
return nil
}
log.Infof(ctx, "replicating split at %s", rekey)
expiration := kvDB.Clock().Now().AddDuration(time.Hour)
return kvDB.AdminSplit(ctx, rekey, expiration)
msbutler marked this conversation as resolved.
Show resolved Hide resolved
}

func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
// TODO: In addition to flushing when receiving a checkpoint event, we
// should also flush when we've buffered sufficient KVs. A buffering adder
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ var quantize = settings.RegisterDurationSettingWithExplicitUnit(
5*time.Second,
)

var emitMetadata = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.producer.emit_metadata.enabled",
"whether to emit metadata events",
true,
)

var _ eval.ValueGenerator = (*eventStream)(nil)

var eventStreamReturnType = types.MakeLabeledTuple(
Expand Down Expand Up @@ -136,6 +143,9 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
rangefeed.WithOnValues(s.onValues),
}
if emitMetadata.Get(&s.execCfg.Settings.SV) {
opts = append(opts, rangefeed.WithOnMetadata(s.onMetadata))
}

initialTimestamp := s.spec.InitialScanTimestamp
s.frontier, err = span.MakeFrontier(s.spec.Spans...)
Expand Down Expand Up @@ -291,6 +301,14 @@ func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFee
s.seb.addDelRange(*delRange)
s.setErr(s.maybeFlushBatch(ctx))
}
func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMetadata) {
log.VInfof(ctx, 2, "received metadata event: %s, fromManualSplit: %t, parent start key %s", metadata.Span, metadata.FromManualSplit, metadata.ParentStartKey)
if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) {
// Only send new manual split keys (i.e. a child rangefeed start key that
// differs from the parent start key)
s.seb.addSplitPoint(metadata.Span.Key)
}
}

func (s *eventStream) maybeCheckpoint(
ctx context.Context, advanced bool, frontier rangefeed.VisitableFrontier,
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (seb *streamEventBatcher) reset() {
seb.batch.Ssts = seb.batch.Ssts[:0]
seb.batch.DelRanges = seb.batch.DelRanges[:0]
seb.batch.SpanConfigs = seb.batch.SpanConfigs[:0]
seb.batch.SplitPoints = seb.batch.SplitPoints[:0]
}

func (seb *streamEventBatcher) addSST(sst kvpb.RangeFeedSSTable) {
Expand All @@ -52,6 +53,11 @@ func (seb *streamEventBatcher) addDelRange(d kvpb.RangeFeedDeleteRange) {
seb.size += d.Size()
}

func (seb *streamEventBatcher) addSplitPoint(k roachpb.Key) {
seb.batch.SplitPoints = append(seb.batch.SplitPoints, k)
seb.size += len(k)
}

// addSpanConfigs adds a slice of spanConfig entries that were recently flushed
// by the rangefeed cache. The function elides duplicate updates by checking
// that an update is newer than the tracked frontier and by checking for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func TestStreamEventBatcher(t *testing.T) {
require.Equal(t, 1, len(seb.batch.Ssts))
require.Equal(t, runningSize, seb.getSize())

splitKey := roachpb.Key("1")
runningSize += len(splitKey)
seb.addSplitPoint(splitKey)
require.Equal(t, 1, len(seb.batch.SplitPoints))
require.Equal(t, runningSize, seb.getSize())

// Reset should clear the batch.
seb.reset()
require.Equal(t, 0, seb.getSize())
require.Equal(t, 0, len(seb.batch.KeyValues))
Expand Down
1 change: 1 addition & 0 deletions pkg/repstream/streampb/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ message StreamEvent {
repeated roachpb.RangeFeedSSTable ssts = 2 [(gogoproto.nullable) = false];
repeated roachpb.RangeFeedDeleteRange del_ranges = 3 [(gogoproto.nullable) = false];
repeated StreamedSpanConfigEntry span_configs = 4 [(gogoproto.nullable) = false];
repeated bytes split_points = 5 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
}

// Checkpoint represents stream checkpoint.
Expand Down