Skip to content

Commit

Permalink
streamingccl: replicate manual split points
Browse files Browse the repository at this point in the history
This patch teaches PCR to replicate source cluster manual split points to the
destination cluster. Specifically, this patch configures the event_stream
rangefeeds to emit rangefeed metadata events that indicate if a rangefeed
spawned due to a manual split. Then, the event stream sends the manual split
key over to the destination cluster.

Informs #122846

Release note: none
  • Loading branch information
msbutler committed Apr 29, 2024
1 parent 207178c commit bd1a600
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 1 deletion.
75 changes: 75 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
// SpanConfigEvent indicates that the SpanConfig field of an event holds an updated
// SpanConfigRecord.
SpanConfigEvent
// SplitEvent indicates that the SplitKey field of an event holds a split key.
SplitEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
Expand All @@ -59,6 +61,9 @@ type Event interface {

// GetSpanConfigEvent returns a SpanConfig event if the EventType is SpanConfigEvent
GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry

// GetSplitEvent returns the split event if the EventType is a SplitEvent
GetSplitEvent() *roachpb.Key
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -98,6 +103,11 @@ func (kve kvEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (kve kvEvent) GetSplitEvent() *roachpb.Key {
return nil
}

// sstableEvent is a sstable that needs to be ingested.
type sstableEvent struct {
sst kvpb.RangeFeedSSTable
Expand Down Expand Up @@ -133,6 +143,11 @@ func (sste sstableEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (sste sstableEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = sstableEvent{}

// delRangeEvent is a DeleteRange event that needs to be ingested.
Expand Down Expand Up @@ -170,6 +185,11 @@ func (dre delRangeEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (dre delRangeEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = delRangeEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
Expand Down Expand Up @@ -210,6 +230,11 @@ func (ce checkpointEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (ce checkpointEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type spanConfigEvent struct {
spanConfig streampb.StreamedSpanConfigEntry
}
Expand Down Expand Up @@ -246,6 +271,52 @@ func (spe spanConfigEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntr
return &spe.spanConfig
}

// GetSplitEvent implements the Event interface.
func (spe spanConfigEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type splitEvent struct {
splitKey roachpb.Key
}

var _ Event = splitEvent{}

// Type implements the Event interface.
func (se splitEvent) Type() EventType {
return SplitEvent
}

// GetKV implements the Event interface.
func (se splitEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (se splitEvent) GetSSTable() *kvpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (se splitEvent) GetDeleteRange() *kvpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (se splitEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (se splitEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (se splitEvent) GetSplitEvent() *roachpb.Key {
return &se.splitKey
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -269,3 +340,7 @@ func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
func MakeSpanConfigEvent(streamedSpanConfig streampb.StreamedSpanConfigEntry) Event {
return spanConfigEvent{spanConfig: streamedSpanConfig}
}

func MakeSplitEvent(splitKey roachpb.Key) Event {
return splitEvent{splitKey: splitKey}
}
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {
case len(streamEvent.Batch.SpanConfigs) > 0:
event = streamingccl.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0])
streamEvent.Batch.SpanConfigs = streamEvent.Batch.SpanConfigs[1:]
case len(streamEvent.Batch.SplitPoints) > 0:
event = streamingccl.MakeSplitEvent(streamEvent.Batch.SplitPoints[0])
streamEvent.Batch.SplitPoints = streamEvent.Batch.SplitPoints[1:]
}

if len(streamEvent.Batch.KeyValues) == 0 &&
len(streamEvent.Batch.Ssts) == 0 &&
len(streamEvent.Batch.DelRanges) == 0 &&
len(streamEvent.Batch.SpanConfigs) == 0 {
len(streamEvent.Batch.SpanConfigs) == 0 &&
len(streamEvent.Batch.SplitPoints) == 0 {
streamEvent.Batch = nil
}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,38 @@ INSERT INTO d.t_for_import (i) VALUES (1);
cutoverTime := c.SrcSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
}

func TestReplicateManualSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, replicationtestutils.DefaultTenantStreamingClustersArgs)
defer cleanup()

c.SrcTenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 100)")
var fooTableID int
c.SrcTenantSQL.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'foo'").Scan(&fooTableID)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcSysServer.Clock().Now(), jobspb.JobID(ingestionJobID))

c.SrcTenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (50)")
expectedSplitKey := fmt.Sprintf("Table/%d/1/50", fooTableID)
splitKeyQuery := fmt.Sprintf("SELECT count(*) FROM crdb_internal.ranges WHERE start_pretty ~ '%s'", expectedSplitKey)

testutils.SucceedsSoon(t, func() error {
var splitKeyPresent int
c.DestSysSQL.QueryRow(t, splitKeyQuery).Scan(&splitKeyPresent)
if splitKeyPresent == 0 {
return errors.Newf("split key not present")
}
return nil
})
}

func TestTenantStreamingDeleteRange(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error {
return err
}
return nil
case streamingccl.SplitEvent:
if err := sip.handleSplitEvent(event.GetSplitEvent()); err != nil {
return err
}
default:
return errors.Newf("unknown streaming event type %v", event.Type())
}
Expand Down Expand Up @@ -839,6 +843,22 @@ func (sip *streamIngestionProcessor) bufferRangeKeyVal(
return nil
}

func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
ctx, sp := tracing.ChildSpan(sip.Ctx(), "replicated-split")
defer sp.Finish()
kvDB := sip.FlowCtx.Cfg.DB.KV()
rekey, ok, err := sip.rekey(*key)
if err != nil {
return err
}
if !ok {
return nil
}
log.VInfof(ctx, 2, "replicating split at %s", rekey)
expiration := kvDB.Clock().Now().AddDuration(time.Hour)
return kvDB.AdminSplit(ctx, rekey, expiration)
}

func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
// TODO: In addition to flushing when receiving a checkpoint event, we
// should also flush when we've buffered sufficient KVs. A buffering adder
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithOnDeleteRange(s.onDeleteRange),
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
rangefeed.WithOnValues(s.onValues),
rangefeed.WithOnMetadata(s.onMetadata),
}

initialTimestamp := s.spec.InitialScanTimestamp
Expand Down Expand Up @@ -291,6 +292,14 @@ func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFee
s.seb.addDelRange(*delRange)
s.setErr(s.maybeFlushBatch(ctx))
}
func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMetadata) {
log.VInfof(ctx, 2, "received metadata event: %s, fromManualSplit: %t, parent start key %s", metadata.Span, metadata.FromManualSplit, metadata.ParentStartKey)
if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) {
// Only send new manual split keys (i.e. a child rangefeed start key that
// differs from the parent start key)
s.seb.addSplitPoint(metadata.Span.Key)
}
}

func (s *eventStream) maybeCheckpoint(
ctx context.Context, advanced bool, frontier rangefeed.VisitableFrontier,
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (seb *streamEventBatcher) reset() {
seb.batch.Ssts = seb.batch.Ssts[:0]
seb.batch.DelRanges = seb.batch.DelRanges[:0]
seb.batch.SpanConfigs = seb.batch.SpanConfigs[:0]
seb.batch.SplitPoints = seb.batch.SplitPoints[:0]
}

func (seb *streamEventBatcher) addSST(sst kvpb.RangeFeedSSTable) {
Expand All @@ -52,6 +53,11 @@ func (seb *streamEventBatcher) addDelRange(d kvpb.RangeFeedDeleteRange) {
seb.size += d.Size()
}

func (seb *streamEventBatcher) addSplitPoint(k roachpb.Key) {
seb.batch.SplitPoints = append(seb.batch.SplitPoints, k)
seb.size += len(k)
}

// addSpanConfigs adds a slice of spanConfig entries that were recently flushed
// by the rangefeed cache. The function elides duplicate updates by checking
// that an update is newer than the tracked frontier and by checking for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func TestStreamEventBatcher(t *testing.T) {
require.Equal(t, 1, len(seb.batch.Ssts))
require.Equal(t, runningSize, seb.getSize())

splitKey := roachpb.Key("1")
runningSize += len(splitKey)
seb.addSplitPoint(splitKey)
require.Equal(t, 1, len(seb.batch.SplitPoints))
require.Equal(t, runningSize, seb.getSize())

// Reset should clear the batch.
seb.reset()
require.Equal(t, 0, seb.getSize())
require.Equal(t, 0, len(seb.batch.KeyValues))
Expand Down
1 change: 1 addition & 0 deletions pkg/repstream/streampb/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ message StreamEvent {
repeated roachpb.RangeFeedSSTable ssts = 2 [(gogoproto.nullable) = false];
repeated roachpb.RangeFeedDeleteRange del_ranges = 3 [(gogoproto.nullable) = false];
repeated StreamedSpanConfigEntry span_configs = 4 [(gogoproto.nullable) = false];
repeated bytes split_points = 5 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key", (gogoproto.nullable) = false];
}

// Checkpoint represents stream checkpoint.
Expand Down

0 comments on commit bd1a600

Please sign in to comment.