Skip to content

Commit

Permalink
Changed from using StateTracker to Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
EtienneBerubeShopify committed Aug 17, 2021
1 parent 8061908 commit 38b631c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 46 deletions.
23 changes: 12 additions & 11 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,6 @@ func (f *Ferry) Initialize() (err error) {
}
}

f.initializeTotalRowsAndBytes()

if f.Config.DataIterationBatchSizePerTableOverride != nil {
err = f.Config.DataIterationBatchSizePerTableOverride.UpdateBatchSizes(f.SourceDB, f.Tables)
if err != nil {
Expand Down Expand Up @@ -965,6 +963,7 @@ func (f *Ferry) Progress() *Progress {
}

tables := f.Tables.AsSlice()
totalRowsPerTable, totalBytesPerTable := f.GetTotalRowsAndBytesMap()

for _, table := range tables {
var currentAction string
Expand All @@ -989,8 +988,8 @@ func (f *Ferry) Progress() *Progress {
BatchSize: f.DataIterator.CursorConfig.GetBatchSize(table.Schema, table.Name),
RowsWritten: rowWrittenStats.NumRows,
BytesWritten: rowWrittenStats.NumBytes,
TotalBytes: f.StateTracker.TotalBytesPerTable(tableName),
TotalRows: f.StateTracker.TotalRowsPerTable(tableName),
TotalBytes: totalBytesPerTable[tableName],
TotalRows: totalRowsPerTable[tableName],
}
}

Expand Down Expand Up @@ -1163,7 +1162,10 @@ func (f *Ferry) checkSourceForeignKeyConstraints() error {
return nil
}

func (f *Ferry) initializeTotalRowsAndBytes() {
func (f *Ferry) GetTotalRowsAndBytesMap() (totalRowsPerTable map[string]uint64, totalBytesPerTable map[string]uint64) {
totalRowsPerTable = make(map[string]uint64)
totalBytesPerTable = make(map[string]uint64)

for _, table := range f.Tables {
query := fmt.Sprintf(`
SELECT table_rows, data_length
Expand All @@ -1185,14 +1187,13 @@ func (f *Ferry) initializeTotalRowsAndBytes() {
err = rows.Scan(&totalRows, &totalBytes)

if err != nil {
f.StateTracker.UpdateTotalBytesPerTable(table.Name, 0)
f.StateTracker.UpdateTotalRowsPerTable(table.Name, 0)
totalRowsPerTable[table.String()] = 0
totalBytesPerTable[table.String()] = 0
} else {
f.StateTracker.UpdateTotalBytesPerTable(table.String(), uint64(totalBytes))
f.StateTracker.UpdateTotalRowsPerTable(table.String(), uint64(totalRows))
totalRowsPerTable[table.String()] = uint64(totalRows)
totalBytesPerTable[table.String()] = uint64(totalBytes)
}

}
}

return totalRowsPerTable, totalBytesPerTable
}
35 changes: 0 additions & 35 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type SerializableState struct {
BinlogVerifyStore BinlogVerifySerializedStore
LastStoredBinlogPositionForInlineVerifier mysql.Position
LastStoredBinlogPositionForTargetVerifier mysql.Position
TotalRowsPerTable map[string]uint64 `json:",omitempty"`
TotalBytesPerTable map[string]uint64 `json:",omitempty"`
}

func (s *SerializableState) MinSourceBinlogPosition() mysql.Position {
Expand Down Expand Up @@ -101,8 +99,6 @@ type StateTracker struct {
// as it confuses the focus of this struct.
iterationSpeedLog *ring.Ring
rowStatsWrittenPerTable map[string]RowStats
totalRowsPerTable map[string]uint64
totalBytesPerTable map[string]uint64
}

func NewStateTracker(speedLogCount int) *StateTracker {
Expand All @@ -114,8 +110,6 @@ func NewStateTracker(speedLogCount int) *StateTracker {
completedTables: make(map[string]bool),
iterationSpeedLog: newSpeedLogRing(speedLogCount),
rowStatsWrittenPerTable: make(map[string]RowStats),
totalRowsPerTable: make(map[string]uint64),
totalBytesPerTable: make(map[string]uint64),
}
}

Expand All @@ -128,28 +122,9 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri
s.lastWrittenBinlogPosition = serializedState.LastWrittenBinlogPosition
s.lastStoredBinlogPositionForInlineVerifier = serializedState.LastStoredBinlogPositionForInlineVerifier
s.lastStoredBinlogPositionForTargetVerifier = serializedState.LastStoredBinlogPositionForTargetVerifier

// TODO: talk with shuhao on serialized states
s.totalBytesPerTable = serializedState.TotalBytesPerTable
s.totalRowsPerTable = serializedState.TotalRowsPerTable
return s
}

func (s *StateTracker) UpdateTotalRowsPerTable(table string, totalRows uint64) {
s.CopyRWMutex.RLock()
defer s.CopyRWMutex.RUnlock()

s.totalRowsPerTable[table] = totalRows

}

func (s *StateTracker) UpdateTotalBytesPerTable(table string, totalBytes uint64) {
s.CopyRWMutex.RLock()
defer s.CopyRWMutex.RUnlock()

s.totalBytesPerTable[table] = totalBytes
}

func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()
Expand Down Expand Up @@ -187,14 +162,6 @@ func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginatio
s.updateSpeedLog(deltaPaginationKey)
}

func (s StateTracker) TotalRowsPerTable(table string) uint64 {
return s.totalRowsPerTable[table]
}

func (s StateTracker) TotalBytesPerTable(table string) uint64 {
return s.totalBytesPerTable[table]
}

func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats {
s.CopyRWMutex.RLock()
defer s.CopyRWMutex.RUnlock()
Expand Down Expand Up @@ -301,8 +268,6 @@ func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, bin
LastWrittenBinlogPosition: s.lastWrittenBinlogPosition,
LastStoredBinlogPositionForInlineVerifier: s.lastStoredBinlogPositionForInlineVerifier,
LastStoredBinlogPositionForTargetVerifier: s.lastStoredBinlogPositionForTargetVerifier,
TotalRowsPerTable: s.totalRowsPerTable,
TotalBytesPerTable: s.totalBytesPerTable,
}

if binlogVerifyStore != nil {
Expand Down

0 comments on commit 38b631c

Please sign in to comment.