Skip to content

Commit

Permalink
refactor(cleanup): do custom tag deletion in parallel and support dry…
Browse files Browse the repository at this point in the history
… run

Signed-off-by: Alexey Igrychev <alexey.igrychev@flant.com>
  • Loading branch information
alexey-igrychev committed Apr 1, 2022
1 parent b56d6e4 commit acfb70a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
6 changes: 2 additions & 4 deletions pkg/cleaning/cleanup.go
Expand Up @@ -832,10 +832,8 @@ func (m *cleanupManager) deleteUnusedCustomTags(ctx context.Context) error {
if len(customTagListToDelete) != 0 {
header := fmt.Sprintf("Deleting unused custom tags (%d/%d)", len(customTagListToDelete), numberOfCustomTags)
if err := logboek.Context(ctx).LogProcess(header).DoError(func() error {
for _, customTag := range customTagListToDelete {
if err := deleteCustomTag(ctx, m.StorageManager.GetStagesStorage(), customTag); err != nil {
return err
}
if err := deleteCustomTags(ctx, m.StorageManager, customTagListToDelete, m.DryRun); err != nil {
return err
}

return nil
Expand Down
41 changes: 27 additions & 14 deletions pkg/cleaning/purge.go
Expand Up @@ -30,7 +30,7 @@ func newPurgeManager(projectName string, storageManager *manager.StorageManager,
}

type purgeManager struct {
StorageManager *manager.StorageManager
StorageManager manager.StorageManagerInterface
ProjectName string
RmContainersThatUseWerfImages bool
DryRun bool
Expand Down Expand Up @@ -167,12 +167,13 @@ func (m *purgeManager) deleteCustomTags(ctx context.Context) error {
}

if err := logboek.Context(ctx).LogProcess("Deleting custom tags").DoError(func() error {
for _, customTagList := range stageIDCustomTagList {
for _, customTag := range customTagList {
if err := deleteCustomTag(ctx, m.StorageManager.GetStagesStorage(), customTag); err != nil {
return err
}
}
var customTagList []string
for _, list := range stageIDCustomTagList {
customTagList = append(customTagList, list...)
}

if err := deleteCustomTags(ctx, m.StorageManager, customTagList, m.DryRun); err != nil {
return err
}

return nil
Expand All @@ -183,18 +184,30 @@ func (m *purgeManager) deleteCustomTags(ctx context.Context) error {
return nil
}

func deleteCustomTag(ctx context.Context, stagesStorage storage.StagesStorage, customTag string) error {
err := stagesStorage.DeleteStageCustomTag(ctx, customTag)
if err != nil {
if err := handleDeletionError(err); err != nil {
return err
func deleteCustomTags(ctx context.Context, storageManager manager.StorageManagerInterface, customTagList []string, dryRun bool) error {
if dryRun {
for _, customTag := range customTagList {
logboek.Context(ctx).Default().LogFDetails(" tag: %s\n", customTag)
logboek.Context(ctx).Default().LogOptionalLn()
}

return nil
}

logboek.Context(ctx).Default().LogFDetails(" tag: %s\n", customTag)
logboek.Context(ctx).Default().LogOptionalLn()
if err := storageManager.ForEachDeleteStageCustomTag(ctx, customTagList, func(ctx context.Context, tag string, err error) error {
if err != nil {
if err := handleDeletionError(err); err != nil {
return err
}
}

logboek.Context(ctx).Default().LogFDetails(" tag: %s\n", tag)
logboek.Context(ctx).Default().LogOptionalLn()

return nil
}); err != nil {
return err
}

return nil
}
11 changes: 11 additions & 0 deletions pkg/storage/manager/storage_manager.go
Expand Up @@ -81,6 +81,7 @@ type StorageManagerInterface interface {
ForEachGetImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.ImportMetadata, err error) error) error
ForEachRmImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, id string, err error) error) error
ForEachGetStageCustomTagMetadata(ctx context.Context, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.CustomTagMetadata, err error) error) error
ForEachDeleteStageCustomTag(ctx context.Context, ids []string, f func(ctx context.Context, tag string, err error) error) error
}

func RetryOnUnexpectedStagesStorageState(_ context.Context, _ StorageManagerInterface, f func() error) error {
Expand Down Expand Up @@ -1008,6 +1009,16 @@ func (m *StorageManager) ForEachRmImportMetadata(ctx context.Context, projectNam
})
}

func (m *StorageManager) ForEachDeleteStageCustomTag(ctx context.Context, ids []string, f func(ctx context.Context, tag string, err error) error) error {
return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
}, func(ctx context.Context, taskId int) error {
id := ids[taskId]
err := m.StagesStorage.DeleteStageCustomTag(ctx, id)
return f(ctx, id, err)
})
}

func (m *StorageManager) ForEachGetStageCustomTagMetadata(ctx context.Context, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.CustomTagMetadata, err error) error) error {
return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
Expand Down

0 comments on commit acfb70a

Please sign in to comment.