Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema fingerprint verifier #292

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,10 @@ type Config struct {
// The ghostferry target user should have SUPER permissions to actually write to the target DB,
// if ghostferry is ran with AllowSuperUserOnReadOnly = true and the target DB is set to read_only.
AllowSuperUserOnReadOnly bool

// The interval at which the periodic schema fingerprint verification occurs, in the
// format of time.ParseDuration. Default: 60s.
PeriodicallyVerifySchemaFingerPrintInterval string
}

func (c *Config) ValidateConfig() error {
Expand Down Expand Up @@ -833,5 +837,14 @@ func (c *Config) ValidateConfig() error {
c.CutoverRetryWaitSeconds = 1
}

if len(c.PeriodicallyVerifySchemaFingerPrintInterval) == 0 {
c.PeriodicallyVerifySchemaFingerPrintInterval = "60s"
} else {
_, err := time.ParseDuration(c.PeriodicallyVerifySchemaFingerPrintInterval)
if err != nil {
return fmt.Errorf("PeriodicallyVerifySchemaFingerPrintInterval invalid")
}
}

return nil
}
4 changes: 4 additions & 0 deletions docs/source/technicaloverview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ Limitations
- For tables with foreign key constraints, the constraints should be removed
before performing the data migration.

- Ghostferry does not support schema changes during the migration or when the migration is
interrupted. Currently, Ghostferry checks in the background the schema of the
databases being migrated every 1 minute.

Algorithm Correctness
---------------------

Expand Down
47 changes: 44 additions & 3 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Ferry struct {
DataIterator *DataIterator
BatchWriter *BatchWriter

SchemaFingerPrintVerifier *SchemaFingerPrintVerifier

StateTracker *StateTracker
ErrorHandler ErrorHandler
Throttler Throttler
Expand Down Expand Up @@ -245,6 +247,29 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier {
}
}

func (f *Ferry) NewSchemaFingerPrintVerifier() (*SchemaFingerPrintVerifier, error) {
fingerPrint := map[string]string{}
if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil {
fingerPrint = f.StateToResumeFrom.SchemaFingerPrint
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this code gives me a bit of pause. There's currently a race condition I think:

  • Ghostferry starts copying rows
  • within 1 second, the table schema changed
  • The schema finger print verifier reads the schema and thinks the schema is unchanged.

Did I miss something and this situation is not possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for clarity currently the following happens when you run the migration with copydb

Also just to avoid confusion by table schema I mean the information we retrieve from information_schema.columns table from mysql and not the TableSchema struct we use internally in ghostferry.

I am not sure I understand your point about after ghostferry starts copying rows why would the table schema would have changed. Copying rows shouldn't alter the schema structure of the table. Here is what an example output is when we try to retrieve the schema of a table.

mysql> SELECT * FROM information_schema.columns WHERE table_schema = 'sakila' ORDER BY table_name, column_name LIMIT 1\G
*************************** 1. row ***************************
           TABLE_CATALOG: def
            TABLE_SCHEMA: sakila
              TABLE_NAME: actor
             COLUMN_NAME: actor_id
        ORDINAL_POSITION: 1
          COLUMN_DEFAULT: NULL
             IS_NULLABLE: NO
               DATA_TYPE: smallint
CHARACTER_MAXIMUM_LENGTH: NULL
  CHARACTER_OCTET_LENGTH: NULL
       NUMERIC_PRECISION: 5
           NUMERIC_SCALE: 0
      DATETIME_PRECISION: NULL
      CHARACTER_SET_NAME: NULL
          COLLATION_NAME: NULL
             COLUMN_TYPE: smallint unsigned
              COLUMN_KEY: PRI
                   EXTRA: auto_increment
              PRIVILEGES: select,insert,update,references
          COLUMN_COMMENT:
   GENERATION_EXPRESSION:
                  SRS_ID: NULL
1 row in set (0.00 sec)

periodicallyVerifyInterval, err := time.ParseDuration(f.Config.PeriodicallyVerifySchemaFingerPrintInterval)
if err != nil {
return nil, fmt.Errorf("invalid PeriodicallyVerifySchemaFingerPrintInterval: %v. this error should have been caught via .Validate()", err)
}

return &SchemaFingerPrintVerifier{
SourceDB: f.SourceDB,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
ErrorHandler: f.ErrorHandler,
PeriodicallyVerifyInterval: periodicallyVerifyInterval,

FingerPrints: fingerPrint,

logger: logrus.WithField("tag", "schema_fingerprint_verifier"),
}, nil
}

func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier {
v := f.NewInlineVerifier()
v.StateTracker = nil
Expand Down Expand Up @@ -489,6 +514,11 @@ func (f *Ferry) Initialize() (err error) {
}
}

f.SchemaFingerPrintVerifier, err = f.NewSchemaFingerPrintVerifier()
if err != nil {
return err
}

// The iterative verifier needs the binlog streamer so this has to be first.
// Eventually this can be moved below the verifier initialization.
f.BinlogStreamer = f.NewSourceBinlogStreamer()
Expand Down Expand Up @@ -686,6 +716,13 @@ func (f *Ferry) Run() {
}()
}

schemaFingerVerifierPrintWg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably just reuse the supportingServicesWg instead of having another one here.

schemaFingerVerifierPrintWg.Add(1)
go func() {
defer schemaFingerVerifierPrintWg.Done()
f.SchemaFingerPrintVerifier.PeriodicallyVerifySchemaFingerprints(ctx)
}()

inlineVerifierWg := &sync.WaitGroup{}
inlineVerifierContext, stopInlineVerifier := context.WithCancel(ctx)
if f.inlineVerifier != nil {
Expand Down Expand Up @@ -787,6 +824,8 @@ func (f *Ferry) Run() {
f.DoneTime = time.Now()

shutdown()

schemaFingerVerifierPrintWg.Wait()
supportingServicesWg.Wait()

if f.Config.ProgressCallback.URI != "" {
Expand Down Expand Up @@ -908,12 +947,14 @@ func (f *Ferry) SerializeStateToJSON() (string, error) {
err := errors.New("no valid StateTracker")
return "", err
}
var binlogVerifyStore *BinlogVerifyStore = nil
var (
binlogVerifyStore *BinlogVerifyStore = nil
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my own knowledge: what prompted this change? A linting error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems like it. Will change it back.

if f.inlineVerifier != nil {
binlogVerifyStore = f.inlineVerifier.reverifyStore
}

serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore)
serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, f.SchemaFingerPrintVerifier.FingerPrints)

if f.Config.DoNotIncludeSchemaCacheInStateDump {
serializedState.LastKnownTableSchemaCache = nil
Expand Down Expand Up @@ -945,7 +986,7 @@ func (f *Ferry) Progress() *Progress {
}

// Table Progress
serializedState := f.StateTracker.Serialize(nil, nil)
serializedState := f.StateTracker.Serialize(nil, nil, nil)
// Note the below will not necessarily be synchronized with serializedState.
// This is fine as we don't need to be super precise with performance data.
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable()
Expand Down
Binary file added profiles/cpuprofile.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.