Skip to content

Commit

Permalink
add info logging for uploadObjectDiskParts and `downloadObjectDis…
Browse files Browse the repository at this point in the history
…kParts` operation
  • Loading branch information
Slach committed Apr 26, 2024
1 parent 4f63c06 commit 3d12b63
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
@@ -1,3 +1,7 @@
# v2.5.4
IMPROVEMENTS
- add `info` logging for `uploadObjectDiskParts` and `downloadObjectDiskParts` operation

# v2.5.3
BUG FIXES
- fixed `Unknown setting base_backup` for `use_embedded_backup_restore: true` and `create --diff-from-remote`, affected 2.5.0+ versions, fix [735](https://github.com/Altinity/clickhouse-backup/issues/735)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/create.go
Expand Up @@ -757,11 +757,12 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string,
log.WithField("disk", disk.Name).Debug("shadow moved")
if len(parts) > 0 && (b.isDiskTypeObject(disk.Type) || b.isDiskTypeEncryptedObject(disk, diskList)) {
start := time.Now()
log.WithField("disk", disk.Name).Info("upload object_disk start")
if size, err = b.uploadObjectDiskParts(ctx, backupName, tablesDiffFromRemote[metadata.TableTitle{Database: table.Database, Table: table.Name}], backupShadowPath, disk); err != nil {
return disksToPartsMap, realSize, err
}
realSize[disk.Name] += size
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data uploaded")
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("upload object_disk finish")
}
// Clean all the files under the shadowPath, cause UNFREEZE unavailable
if version < 21004000 {
Expand Down
32 changes: 21 additions & 11 deletions pkg/backup/restore.go
Expand Up @@ -1239,9 +1239,14 @@ func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName st
return fmt.Errorf("can't copy data to storage '%s.%s': %v", table.Database, table.Table, err)
}
log.Debug("data to 'storage' copied")
if err := b.downloadObjectDiskParts(ctx, backupName, backupMetadata, table, diskMap, diskTypes, disks); err != nil {
log.Info("download object_disks start")
var size int64
var err error
start := time.Now()
if size, err = b.downloadObjectDiskParts(ctx, backupName, backupMetadata, table, diskMap, diskTypes, disks); err != nil {
return fmt.Errorf("can't restore object_disk server-side copy data parts '%s.%s': %v", table.Database, table.Table, err)
}
log.WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("download object_disks finish")
if err := b.ch.AttachTable(ctx, table, dstTable); err != nil {
return fmt.Errorf("can't attach table '%s.%s': %v", table.Database, table.Table, err)
}
Expand All @@ -1253,16 +1258,21 @@ func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName str
return fmt.Errorf("can't copy data to detached '%s.%s': %v", table.Database, table.Table, err)
}
log.Debug("data to 'detached' copied")
if err := b.downloadObjectDiskParts(ctx, backupName, backupMetadata, table, diskMap, diskTypes, disks); err != nil {
log.Info("download object_disks start")
var size int64
var err error
start := time.Now()
if size, err = b.downloadObjectDiskParts(ctx, backupName, backupMetadata, table, diskMap, diskTypes, disks); err != nil {
return fmt.Errorf("can't restore object_disk server-side copy data parts '%s.%s': %v", table.Database, table.Table, err)
}
log.WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("download object_disks finish")
if err := b.ch.AttachDataParts(table, dstTable); err != nil {
return fmt.Errorf("can't attach data parts for table '%s.%s': %v", table.Database, table.Table, err)
}
return nil
}

func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, backupTable metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk) error {
func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, backupTable metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk) (int64, error) {
log := apexLog.WithFields(apexLog.Fields{
"operation": "downloadObjectDiskParts",
"table": fmt.Sprintf("%s.%s", backupTable.Database, backupTable.Table),
Expand All @@ -1276,7 +1286,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
for diskName, parts := range backupTable.Parts {
diskType, exists := diskTypes[diskName]
if !exists {
return fmt.Errorf("%s disk doesn't present in diskTypes: %v", diskName, diskTypes)
return 0, fmt.Errorf("%s disk doesn't present in diskTypes: %v", diskName, diskTypes)
}
isObjectDiskEncrypted := false
if diskType == "encrypted" {
Expand All @@ -1297,18 +1307,18 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
isObjectDisk := b.isDiskTypeObject(diskType)
if isObjectDisk || isObjectDiskEncrypted {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return err
return 0, err
}
if _, exists := diskMap[diskName]; !exists {
for _, part := range parts {
if part.RebalancedDisk != "" {
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, part.RebalancedDisk); err != nil {
return err
return 0, err
}
}
}
} else if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil {
return err
return 0, err
}
start := time.Now()
downloadObjectDiskPartsWorkingGroup, downloadCtx := errgroup.WithContext(ctx)
Expand All @@ -1326,7 +1336,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
var findRecursiveErr error
srcBackupName, srcDiskName, findRecursiveErr = b.findObjectDiskPartRecursive(ctx, backupMetadata, backupTable, part, diskName, log)
if findRecursiveErr != nil {
return findRecursiveErr
return 0, findRecursiveErr
}
}
walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
Expand Down Expand Up @@ -1385,17 +1395,17 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
return nil
})
if walkErr != nil {
return walkErr
return 0, walkErr
}
}
if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil {
return fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr)
return 0, fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr)
}
log.WithField("disk", diskName).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data downloaded")
}
}

return nil
return size, nil
}

func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metadata.BackupMetadata, table metadata.TableMetadata, part metadata.Part, diskName string, log *apexLog.Entry) (string, string, error) {
Expand Down

0 comments on commit 3d12b63

Please sign in to comment.