Skip to content

Commit

Permalink
Add RowsVerified to InlineVerifier and EventsVerified to TargetVerifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Lincoln Lo committed Feb 24, 2021
1 parent 7b03a45 commit 34bc990
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 11 deletions.
5 changes: 4 additions & 1 deletion copydb/copydb.go
Expand Up @@ -109,7 +109,10 @@ func (this *CopydbFerry) Run() {
// should be identical.
copyWG.Wait()

this.Ferry.StopTargetVerifier()
err := this.Ferry.StopTargetVerifier()
if err != nil {
this.Ferry.ErrorHandler.Fatal("target_verifier", err)
}

// This is where you cutover from using the source database to
// using the target database.
Expand Down
8 changes: 7 additions & 1 deletion ferry.go
Expand Up @@ -844,11 +844,17 @@ func (f *Ferry) FlushBinlogAndStopStreaming() {
f.BinlogStreamer.FlushAndStop()
}

func (f *Ferry) StopTargetVerifier() {
func (f *Ferry) StopTargetVerifier() error {
if !f.Config.SkipTargetVerification {
f.TargetVerifier.BinlogStreamer.FlushAndStop()
f.targetVerifierWg.Wait()
if f.TargetVerifier.EventsChecked == 0 {
err := fmt.Errorf("no events verified")
f.logger.WithField("error", err).Errorf("target verifier did not verify any events")
return err
}
}
return nil
}

func (f *Ferry) SerializeStateToJSON() (string, error) {
Expand Down
12 changes: 12 additions & 0 deletions inline_verifier.go
Expand Up @@ -248,6 +248,7 @@ type InlineVerifier struct {
reverifyStore *BinlogVerifyStore
verifyDuringCutoverStarted AtomicBoolean

rowsChecked int
sourceStmtCache *StmtCache
targetStmtCache *StmtCache
logger *logrus.Entry
Expand Down Expand Up @@ -435,6 +436,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
return VerificationResult{}, err
}

if v.rowsChecked == 0 {
return VerificationResult{
DataCorrect: false,
Message: "cutover verification failed, verifier did not compare any events",
}, nil
}

if !mismatchFound {
return VerificationResult{
DataCorrect: true,
Expand Down Expand Up @@ -563,13 +571,15 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
}
v.rowsChecked += 1
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
}
v.rowsChecked += 1
}

return mismatchSet
Expand All @@ -587,6 +597,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s

for colName, targetData := range targetDecompressedColumns {
sourceData, exists := sourceDecompressedColumns[colName]
v.rowsChecked += 1
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
break // no need to compare other columns
Expand All @@ -603,6 +614,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s

for colName, sourceData := range sourceDecompressedColumns {
targetData, exists := targetDecompressedColumns[colName]
v.rowsChecked += 1
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
break
Expand Down
8 changes: 5 additions & 3 deletions sharding/sharding.go
Expand Up @@ -157,11 +157,10 @@ func (r *ShardingFerry) Run() {
r.logger.WithField("error", err).Errorf("verification encountered an error, aborting run")
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
} else if !verificationResult.DataCorrect {
err = fmt.Errorf("verifier detected data discrepancy: %s", verificationResult.Message)
err = fmt.Errorf("verifier detected error: %s", verificationResult.Message)
r.logger.WithField("error", err).Errorf("verification failed, aborting run")
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
}

metrics.Measure("CopyPrimaryKeyTables", nil, 1.0, func() {
err = r.copyPrimaryKeyTables()
})
Expand All @@ -172,7 +171,10 @@ func (r *ShardingFerry) Run() {

r.Ferry.Throttler.SetDisabled(false)

r.Ferry.StopTargetVerifier()
err = r.Ferry.StopTargetVerifier()
if err != nil {
r.Ferry.ErrorHandler.Fatal("target_verifier", err)
}

metrics.Measure("CutoverUnlock", nil, 1.0, func() {
err = r.config.CutoverUnlock.Post(&client)
Expand Down
4 changes: 3 additions & 1 deletion target_verifier.go
Expand Up @@ -12,6 +12,7 @@ type TargetVerifier struct {
DB *sql.DB
BinlogStreamer *BinlogStreamer
StateTracker *StateTracker
EventsChecked int
}

func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error) {
Expand All @@ -36,7 +37,8 @@ func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error {
return err
}

// Ghostferry's annotation will alwaays be the first, if available
t.EventsChecked += 1
// Ghostferry's annotation will always be the first, if available
if annotation == "" || annotation != t.DB.Marginalia {
paginationKey, err := ev.PaginationKey()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions test/integration/inline_verifier_test.rb
Expand Up @@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_same_decompressed_data_different_compressed_test_passes_inline_verification
Expand Down Expand Up @@ -430,7 +430,7 @@ def test_positive_negative_zero

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]

# Now we run the real test case.
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1")
Expand Down Expand Up @@ -484,7 +484,7 @@ def test_null_vs_empty_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_null_vs_null_string
Expand All @@ -507,7 +507,7 @@ def test_null_vs_null_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_null_in_different_order
Expand All @@ -533,7 +533,7 @@ def test_null_in_different_order

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

###################
Expand Down
11 changes: 11 additions & 0 deletions test/integration/trivial_test.rb
Expand Up @@ -39,4 +39,15 @@ def test_logged_query_omits_columns
end
end
end

def test_no_events_verified_will_log_error
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

ghostferry.run

assert_equal "no events verified", ghostferry.error_lines.last["error"]
end
end

0 comments on commit 34bc990

Please sign in to comment.