Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sanity check to InlineVerifier and TargetVerifier #261

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion copydb/copydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (this *CopydbFerry) Run() {
// should be identical.
copyWG.Wait()

this.Ferry.StopTargetVerifier()
err := this.Ferry.StopTargetVerifier()
if err != nil {
this.Ferry.ErrorHandler.Fatal("target_verifier", err)
}

// This is where you cutover from using the source database to
// using the target database.
Expand Down
8 changes: 7 additions & 1 deletion ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,11 +844,17 @@ func (f *Ferry) FlushBinlogAndStopStreaming() {
f.BinlogStreamer.FlushAndStop()
}

func (f *Ferry) StopTargetVerifier() {
func (f *Ferry) StopTargetVerifier() error {
if !f.Config.SkipTargetVerification {
f.TargetVerifier.BinlogStreamer.FlushAndStop()
f.targetVerifierWg.Wait()
if f.TargetVerifier.EventsChecked == 0 {
err := fmt.Errorf("no events checked")
f.logger.WithField("error", err).Errorf("target verifier did not check any events")
return err
}
}
return nil
}

func (f *Ferry) SerializeStateToJSON() (string, error) {
Expand Down
12 changes: 12 additions & 0 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ type InlineVerifier struct {
reverifyStore *BinlogVerifyStore
verifyDuringCutoverStarted AtomicBoolean

rowsChecked int
sourceStmtCache *StmtCache
targetStmtCache *StmtCache
logger *logrus.Entry
Expand Down Expand Up @@ -435,6 +436,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
return VerificationResult{}, err
}

if v.rowsChecked == 0 {
return VerificationResult{
DataCorrect: false,
Message: "cutover verification failed, no rows were compared",
}, nil
}
Comment on lines +439 to +444
Copy link
Author

@Lincoln23 Lincoln23 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure how to test this in the ruby integration tests since in integrationferry here we only pass the incorrect tables. perhaps we pass the message as well?


if !mismatchFound {
return VerificationResult{
DataCorrect: true,
Expand Down Expand Up @@ -563,13 +571,15 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
}
v.rowsChecked += 1
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
}
v.rowsChecked += 1
}

return mismatchSet
Expand All @@ -587,6 +597,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s

for colName, targetData := range targetDecompressedColumns {
sourceData, exists := sourceDecompressedColumns[colName]
v.rowsChecked += 1
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
break // no need to compare other columns
Expand All @@ -603,6 +614,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s

for colName, sourceData := range sourceDecompressedColumns {
targetData, exists := targetDecompressedColumns[colName]
v.rowsChecked += 1
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
break
Expand Down
8 changes: 5 additions & 3 deletions sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,10 @@ func (r *ShardingFerry) Run() {
r.logger.WithField("error", err).Errorf("verification encountered an error, aborting run")
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
} else if !verificationResult.DataCorrect {
err = fmt.Errorf("verifier detected data discrepancy: %s", verificationResult.Message)
err = fmt.Errorf("verifier detected error: %s", verificationResult.Message)
r.logger.WithField("error", err).Errorf("verification failed, aborting run")
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
}

metrics.Measure("CopyPrimaryKeyTables", nil, 1.0, func() {
err = r.copyPrimaryKeyTables()
})
Expand All @@ -172,7 +171,10 @@ func (r *ShardingFerry) Run() {

r.Ferry.Throttler.SetDisabled(false)

r.Ferry.StopTargetVerifier()
err = r.Ferry.StopTargetVerifier()
if err != nil {
r.Ferry.ErrorHandler.Fatal("target_verifier", err)
}

metrics.Measure("CutoverUnlock", nil, 1.0, func() {
err = r.config.CutoverUnlock.Post(&client)
Expand Down
4 changes: 3 additions & 1 deletion target_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type TargetVerifier struct {
DB *sql.DB
BinlogStreamer *BinlogStreamer
StateTracker *StateTracker
EventsChecked int
}

func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error) {
Expand All @@ -36,7 +37,8 @@ func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error {
return err
}

// Ghostferry's annotation will alwaays be the first, if available
t.EventsChecked += 1
// Ghostferry's annotation will always be the first, if available
if annotation == "" || annotation != t.DB.Marginalia {
paginationKey, err := ev.PaginationKey()
if err != nil {
Expand Down
22 changes: 17 additions & 5 deletions test/integration/inline_verifier_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
Copy link
Author

@Lincoln23 Lincoln23 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests will log target verifier did not check any events since there are no events on the target. I added a separate test to check target verifier events below so I will ignore that error here

Same changes for tests below

end

def test_same_decompressed_data_different_compressed_test_passes_inline_verification
Expand Down Expand Up @@ -430,7 +430,7 @@ def test_positive_negative_zero

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]

# Now we run the real test case.
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1")
Expand Down Expand Up @@ -484,7 +484,7 @@ def test_null_vs_empty_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_null_vs_null_string
Expand All @@ -507,7 +507,7 @@ def test_null_vs_null_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_null_in_different_order
Expand All @@ -533,7 +533,19 @@ def test_null_in_different_order

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
end

def test_no_events_verified_on_target_will_log_error
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })

ghostferry.run

assert_equal "target verifier did not check any events", ghostferry.error_lines.last["msg"]
assert_equal "no events checked", ghostferry.error_lines.last["error"]
end

###################
Expand Down