Skip to content

Commit

Permalink
Added catchup mechanism and BinlogReconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jan 28, 2019
1 parent 9d2d676 commit c8e4d54
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 2 deletions.
223 changes: 223 additions & 0 deletions binlog_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package ghostferry

import (
"database/sql"
"fmt"
"strings"

"github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
)

// This event generates a REPLACE INTO SQL statement to overwrite a row if an
// INSERT or UPDATE occured. For a DELETE statement, it generates the
// corresponding DELETE statement based on the primary key.
//
// This is used during the resume when we are catching up to the binlog events
// missed by Ghostferry while it is down.
type ReconcilationDMLEvent struct {
*DMLEventBase
newValues RowData
pk uint64
}

func (e *ReconcilationDMLEvent) OldValues() RowData {
return nil
}

func (e *ReconcilationDMLEvent) NewValues() RowData {
return e.newValues
}

func (e *ReconcilationDMLEvent) PK() (uint64, error) {
return e.pk, nil
}

func (e *ReconcilationDMLEvent) AsSQLString(target *schema.Table) (string, error) {
var query string
if e.newValues != nil {
columns, err := loadColumnsForTable(&e.table, e.newValues)
if err != nil {
return "", err
}

query = "REPLACE INTO " +
QuotedTableNameFromString(target.Schema, target.Name) +
" (" + strings.Join(columns, ",") + ")" +
" VALUES (" + buildStringListForValues(e.newValues) + ")"
} else {
pkColumnName := e.TableSchema().GetPKColumn(0).Name
if pkColumnName == "" {
return "", fmt.Errorf("cannot get PK column for table %s", e.Table())
}

pkColumnName = quoteField(pkColumnName)
query = "DELETE FROM " + QuotedTableNameFromString(target.Schema, target.Name) +
" WHERE " + buildStringMapForWhere([]string{pkColumnName}, []interface{}{e.pk})
}

return query, nil
}

func NewReconciliationDMLEvent(table *schema.Table, pk uint64, row RowData) DMLEvent {
return &ReconcilationDMLEvent{
DMLEventBase: &DMLEventBase{table: *table},
pk: pk,
newValues: row,
}
}

// Instead of replacing/deleting every row, we first store all the rows changed
// during the downtime and perform all the operations at the end. This maybe
// faster (trading memory usage tho) and it is crucial for schema changes (as
// we can simply delete a key from this map when we realize a table has
// changed).
type UniqueRowMap map[TableIdentifier]map[uint64]struct{}

func (m UniqueRowMap) AddRow(table *schema.Table, pk uint64) bool {
tableId := NewTableIdentifierFromSchemaTable(table)
if _, exists := m[tableId]; !exists {
m[tableId] = make(map[uint64]struct{})
}

if _, exists := m[tableId][pk]; !exists {
m[tableId][pk] = struct{}{}
return true
}

return false
}

type BinlogReconciler struct {
BatchSize int
TableSchemaCache TableSchemaCache

SourceDB *sql.DB
BinlogWriter *BinlogWriter

modifiedRows UniqueRowMap
logger *logrus.Entry
}

func (r *BinlogReconciler) Initialize() {
r.modifiedRows = make(UniqueRowMap)
r.logger = logrus.WithField("tag", "binlogreconcile")
}

func (r *BinlogReconciler) AddRowsToStore(events []DMLEvent) error {
for _, ev := range events {
pk, err := ev.PK()
if err != nil {
return err
}

r.modifiedRows.AddRow(ev.TableSchema(), pk)
}

return nil
}

func (r *BinlogReconciler) ReplaceModifiedRowsAfterCatchup() error {
batch := make([]DMLEvent, 0, r.BatchSize)

totalModifiedRows := 0
for _, pkSet := range r.modifiedRows {
totalModifiedRows += len(pkSet)
}

r.logger.WithField("row_count", totalModifiedRows).Info("begin replacing modified rows")

count := 0
for tableId, pkSet := range r.modifiedRows {
table := r.TableSchemaCache.Get(tableId.SchemaName, tableId.TableName)

for pk, _ := range pkSet {
count++

if len(batch) == r.BatchSize {
r.logger.WithField("rows_replaced", count).Debug("replacing batch")
err := r.replaceBatch(batch)
if err != nil {
return err
}

batch = make([]DMLEvent, 0, r.BatchSize)
}

row, err := r.selectRowFromSource(table, pk)
if err != nil {
r.logger.WithError(err).Error("failed to select row from source")
return err
}

batch = append(batch, NewReconciliationDMLEvent(table, pk, row))
}
}

if len(batch) > 0 {
err := r.replaceBatch(batch)
if err != nil {
return err
}
}

r.logger.WithField("row_count", totalModifiedRows).Info("replaced modified rows")

return nil
}

func (r *BinlogReconciler) replaceBatch(batch []DMLEvent) error {
err := r.BinlogWriter.WriteEvents(batch)
if err != nil {
r.logger.WithError(err).Error("cannot replace batch")
return err
}

return nil
}

func (r *BinlogReconciler) selectRowFromSource(table *schema.Table, pk uint64) (RowData, error) {
quotedPK := quoteField(table.GetPKColumn(0).Name)

query, args, err := squirrel.
Select("*").
From(QuotedTableName(table)).
Where(squirrel.Eq{quotedPK: pk}).ToSql()

if err != nil {
return nil, err
}

// TODO: make this cached for faster reconciliation
stmt, err := r.SourceDB.Prepare(query)
if err != nil {
return nil, err
}

defer stmt.Close()

rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
defer rows.Close()

var rowData RowData = nil
count := 0

for rows.Next() {
rowData, err = ScanGenericRow(rows, len(table.Columns))
if err != nil {
return nil, err
}

count++
}

if count > 1 {
return nil, fmt.Errorf("multiple rows detected when only one or zero is expected for %s %v", table.String(), pk)
}

return rowData, rows.Err()
}
5 changes: 5 additions & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (s *BinlogStreamer) IsAlmostCaughtUp() bool {
return time.Now().Sub(s.lastProcessedEventTime) < caughtUpThreshold
}

func (s *BinlogStreamer) FlushToTargetBinlogPositionAndStop(target mysql.Position) {
s.targetBinlogPosition = target
s.stopRequested = true
}

func (s *BinlogStreamer) FlushAndStop() {
s.logger.Info("requesting binlog streamer to stop")
// Must first read the binlog position before requesting stop
Expand Down
4 changes: 2 additions & 2 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (b *BinlogWriter) Run() {
}

err := WithRetries(b.WriteRetries, 0, b.logger, "write events to target", func() error {
return b.writeEvents(batch)
return b.WriteEvents(batch)
})
if err != nil {
b.ErrorHandler.Fatal("binlog_writer", err)
Expand All @@ -75,7 +75,7 @@ func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error {
return nil
}

func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
func (b *BinlogWriter) WriteEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)

queryBuffer := []byte("BEGIN;\n")
Expand Down
96 changes: 96 additions & 0 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,102 @@ func (f *Ferry) Initialize() (err error) {
return nil
}

func (f *Ferry) CatchUpOnMissedBinlogsOnResume() error {
if f.StateToResumeFrom == nil {
return nil
}

tableSchemaCache := f.StateToResumeFrom.LastKnownTableSchemaCache

// Step 1: setup the binlog reconciler
binlogReconciler := &BinlogReconciler{
BatchSize: f.Config.BinlogEventBatchSize,
TableSchemaCache: tableSchemaCache,

SourceDB: f.SourceDB,
BinlogWriter: f.BinlogWriter,
}
binlogReconciler.Initialize()

// TODO: Step 2: setup the iterative verifier reconciler

// Step 3: initialize the binlog streamer and the throttler
binlogStreamer := f.NewBinlogStreamer()
binlogStreamer.TableSchema = tableSchemaCache

// Step 4: attach the reconcilers' binlog event listeners
binlogStreamer.AddEventListener(binlogReconciler.AddRowsToStore)
// TODO: attach the iterative verifier reconciler binlog listener

// Step 5: Connect the binlog streamer from the resuming coordinate
binlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastWrittenBinlogPosition)

// Step 6: Start the throttler
ctx, shutdown := context.WithCancel(context.Background())
throttlerWg := &sync.WaitGroup{}
throttlerWg.Add(1)
go func() {
defer throttlerWg.Done()

err := f.Throttler.Run(ctx)
if err != nil {
f.ErrorHandler.Fatal("throttler", err)
}
}()
defer func() {
shutdown()
throttlerWg.Wait()
}()

// Step 7: run the binlog streamer until the current position
currentBinlogPos, err := ShowMasterStatusBinlogPosition(f.SourceDB)
if err != nil {
return err
}

f.logger.WithFields(logrus.Fields{
"from": f.StateToResumeFrom.LastWrittenBinlogPosition,
"to": currentBinlogPos,
}).Info("catching up binlogs")

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
binlogStreamer.Run()
}()

binlogStreamer.FlushToTargetBinlogPositionAndStop(currentBinlogPos)
wg.Wait()

f.logger.WithFields(logrus.Fields{
"streamed_pos": binlogStreamer.GetLastStreamedBinlogPosition(),
"target_pos": currentBinlogPos,
}).Info("binlog catch up completed")

// defensive programming
if binlogStreamer.GetLastStreamedBinlogPosition().Compare(currentBinlogPos) < 0 {
return fmt.Errorf("programming error: last streamed position %v is less than target position %v?", binlogStreamer.GetLastStreamedBinlogPosition(), currentBinlogPos)
}

// Step 8: REPLACE rows that have been modified
err = binlogReconciler.ReplaceModifiedRowsAfterCatchup()
if err != nil {
f.logger.WithError(err).Error("failed to replace modified rows")
return err
}

// TODO: Step 9: perform iterative verifier catchups

// Step 10: update the state to resume from so the rest of Ferry can pretend
// as if it was interrupted at currentBinlogPos as this method took care of
// all possible issues (including schema changes, although this is TODO).
f.StateToResumeFrom.LastWrittenBinlogPosition = currentBinlogPos
f.StateTracker.UpdateLastWrittenBinlogPosition(currentBinlogPos)

return nil
}

// Determine the binlog coordinates, table mapping for the pending
// Ghostferry run.
func (f *Ferry) Start() error {
Expand Down

0 comments on commit c8e4d54

Please sign in to comment.