Skip to content

Commit

Permalink
feat(build): speeding up with runtime caching for stages
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Igrychev <alexey.igrychev@flant.com>
  • Loading branch information
alexey-igrychev committed Mar 17, 2022
1 parent cbb31b2 commit a13a7b0
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 75 deletions.
79 changes: 25 additions & 54 deletions pkg/build/build_phase.go
Expand Up @@ -412,7 +412,6 @@ func (phase *BuildPhase) onImageStage(ctx context.Context, img *Image, stg stage
return err
}

// Stage is cached in the stages storage
if foundSuitableStage {
logboek.Context(ctx).Default().LogFHighlight("Use cache image for %s\n", stg.LogDetailedName())
container_runtime.LogImageInfo(ctx, stg.GetImage(), phase.getPrevNonEmptyStageImageSize())
Expand Down Expand Up @@ -467,6 +466,7 @@ func (phase *BuildPhase) onImageStage(ctx context.Context, img *Image, stg stage
func (phase *BuildPhase) findAndFetchStageFromSecondaryStagesStorage(ctx context.Context, img *Image, stg stage.Interface) (bool, error) {
foundSuitableStage := false

storageManager := phase.Conveyor.StorageManager
atomicCopySuitableStageFromSecondaryStagesStorage := func(secondaryStageDesc *imagePkg.StageDescription, secondaryStagesStorage storage.StagesStorage) error {
// Lock the primary stages storage
var stageUnlocked bool
Expand All @@ -484,66 +484,42 @@ func (phase *BuildPhase) findAndFetchStageFromSecondaryStagesStorage(ctx context
defer unlockStage()
}

// Query the primary stages storage for suitable stage again.
// Suitable stage can be found this time and should be used in this case
if stages, err := phase.Conveyor.StorageManager.GetStagesByDigest(ctx, stg.LogDetailedName(), stg.GetDigest()); err != nil {
return err
} else {
if stageDesc, err := phase.Conveyor.StorageManager.SelectSuitableStage(ctx, phase.Conveyor, stg, stages); err != nil {
return err
} else if stageDesc != nil {
i := phase.Conveyor.GetOrCreateStageImage(castToStageImage(phase.StagesIterator.GetPrevImage(img, stg)), stageDesc.Info.Name)
i.SetStageDescription(stageDesc)
err := logboek.Context(ctx).Default().LogProcess("Copy suitable stage from secondary %s", secondaryStagesStorage.String()).DoError(func() error {
// Copy suitable stage from a secondary stages storage to the primary stages storage
// while primary stages storage lock for this digest is held
if copiedStageDesc, err := storageManager.CopySuitableByDigestStage(ctx, secondaryStageDesc, secondaryStagesStorage, storageManager.GetStagesStorage(), phase.Conveyor.ContainerRuntime); err != nil {
return fmt.Errorf("unable to copy suitable stage %s from %s to %s: %s", secondaryStageDesc.StageID.String(), secondaryStagesStorage.String(), storageManager.GetStagesStorage().String(), err)
} else {
i := phase.Conveyor.GetOrCreateStageImage(castToStageImage(phase.StagesIterator.GetPrevImage(img, stg)), copiedStageDesc.Info.Name)
i.SetStageDescription(copiedStageDesc)
stg.SetImage(i)

logboek.Context(ctx).Default().LogFHighlight("Use cache image for %s\n", stg.LogDetailedName())
container_runtime.LogImageInfo(ctx, stg.GetImage(), phase.getPrevNonEmptyStageImageSize())

return nil
}
})
if err != nil {
return err
}

err = logboek.Context(ctx).Default().LogProcess("Copy suitable stage from secondary %s", secondaryStagesStorage.String()).DoError(func() error {
// Copy suitable stage from a secondary stages storage to the primary stages storage
// while primary stages storage lock for this digest is held
if copiedStageDesc, err := phase.Conveyor.StorageManager.CopySuitableByDigestStage(ctx, secondaryStageDesc, secondaryStagesStorage, phase.Conveyor.StorageManager.GetStagesStorage(), phase.Conveyor.ContainerRuntime); err != nil {
return fmt.Errorf("unable to copy suitable stage %s from %s to %s: %s", secondaryStageDesc.StageID.String(), secondaryStagesStorage.String(), phase.Conveyor.StorageManager.GetStagesStorage().String(), err)
} else {
i := phase.Conveyor.GetOrCreateStageImage(castToStageImage(phase.StagesIterator.GetPrevImage(img, stg)), copiedStageDesc.Info.Name)
i.SetStageDescription(copiedStageDesc)
stg.SetImage(i)

var stageIDs []imagePkg.StageID
for _, stageDesc := range stages {
stageIDs = append(stageIDs, *stageDesc.StageID)
}
stageIDs = append(stageIDs, *copiedStageDesc.StageID)

logboek.Context(ctx).Default().LogFHighlight("Use cache image for %s\n", stg.LogDetailedName())
container_runtime.LogImageInfo(ctx, stg.GetImage(), phase.getPrevNonEmptyStageImageSize())

return nil
}
})
if err != nil {
return err
}

unlockStage()

if err := phase.Conveyor.StorageManager.CopyStageIntoCacheStorages(ctx, stg, phase.Conveyor.ContainerRuntime); err != nil {
return fmt.Errorf("unable to copy stage %s into cache storages: %s", stg.GetImage().GetStageDescription().StageID.String(), err)
}
unlockStage()

return nil
if err := storageManager.CopyStageIntoCacheStorages(ctx, stg, phase.Conveyor.ContainerRuntime); err != nil {
return fmt.Errorf("unable to copy stage %s into cache storages: %s", stg.GetImage().GetStageDescription().StageID.String(), err)
}

return nil
}

ScanSecondaryStagesStorageList:
for _, secondaryStagesStorage := range phase.Conveyor.StorageManager.GetSecondaryStagesStorageList() {
if secondaryStages, err := phase.Conveyor.StorageManager.GetStagesByDigestFromStagesStorage(ctx, stg.LogDetailedName(), stg.GetDigest(), secondaryStagesStorage); err != nil {
for _, secondaryStagesStorage := range storageManager.GetSecondaryStagesStorageList() {
secondaryStages, err := storageManager.GetStagesByDigestFromStagesStorageWithCache(ctx, stg.LogDetailedName(), stg.GetDigest(), secondaryStagesStorage)
if err != nil {
return false, err
} else {
if secondaryStageDesc, err := phase.Conveyor.StorageManager.SelectSuitableStage(ctx, phase.Conveyor, stg, secondaryStages); err != nil {
if secondaryStageDesc, err := storageManager.SelectSuitableStage(ctx, phase.Conveyor, stg, secondaryStages); err != nil {
return false, err
} else if secondaryStageDesc != nil {
if err := atomicCopySuitableStageFromSecondaryStagesStorage(secondaryStageDesc, secondaryStagesStorage); err != nil {
Expand Down Expand Up @@ -601,10 +577,11 @@ func (phase *BuildPhase) calculateStage(ctx context.Context, img *Image, stg sta
Do(phase.Conveyor.GetStageDigestMutex(stg.GetDigest()).Lock)

foundSuitableStage := false
if stages, err := phase.Conveyor.StorageManager.GetStagesByDigest(ctx, stg.LogDetailedName(), stageDigest); err != nil {
storageManager := phase.Conveyor.StorageManager
if stages, err := storageManager.GetStagesByDigestWithCache(ctx, stg.LogDetailedName(), stageDigest); err != nil {
return false, phase.Conveyor.GetStageDigestMutex(stg.GetDigest()).Unlock, err
} else {
if stageDesc, err := phase.Conveyor.StorageManager.SelectSuitableStage(ctx, phase.Conveyor, stg, stages); err != nil {
if stageDesc, err := storageManager.SelectSuitableStage(ctx, phase.Conveyor, stg, stages); err != nil {
return false, phase.Conveyor.GetStageDigestMutex(stg.GetDigest()).Unlock, err
} else if stageDesc != nil {
i := phase.Conveyor.GetOrCreateStageImage(castToStageImage(phase.StagesIterator.GetPrevImage(img, stg)), stageDesc.Info.Name)
Expand Down Expand Up @@ -811,12 +788,6 @@ func (phase *BuildPhase) atomicBuildStageImage(ctx context.Context, img *Image,
return err
}

var stageIDs []imagePkg.StageID
for _, stageDesc := range stages {
stageIDs = append(stageIDs, *stageDesc.StageID)
}
stageIDs = append(stageIDs, *stageImage.GetStageDescription().StageID)

unlockStage()

if err := phase.Conveyor.StorageManager.CopyStageIntoCacheStorages(ctx, stg, phase.Conveyor.ContainerRuntime); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/docker_server_stages_storage.go
Expand Up @@ -56,7 +56,7 @@ func (storage *DockerServerStagesStorage) ConstructStageImageName(projectName, d
return fmt.Sprintf(LocalStage_ImageFormat, projectName, digest, uniqueID)
}

func (storage *DockerServerStagesStorage) GetStagesIDs(ctx context.Context, projectName string) ([]image.StageID, error) {
func (storage *DockerServerStagesStorage) GetStagesIDs(ctx context.Context, projectName string, _ ...Option) ([]image.StageID, error) {
filterSet := localStagesStorageFilterSetBase(projectName)
images, err := docker.Images(ctx, types.ImageListOptions{Filters: filterSet})
if err != nil {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (storage *DockerServerStagesStorage) GetManagedImages(_ context.Context, _
return []string{}, nil
}

func (storage *DockerServerStagesStorage) GetStagesIDsByDigest(ctx context.Context, projectName, digest string) ([]image.StageID, error) {
func (storage *DockerServerStagesStorage) GetStagesIDsByDigest(ctx context.Context, projectName, digest string, _ ...Option) ([]image.StageID, error) {
filterSet := filters.NewArgs()
filterSet.Add("reference", fmt.Sprintf(LocalStage_ImageRepoFormat, projectName))
// NOTE digest already depends on build-cache-version
Expand Down
34 changes: 21 additions & 13 deletions pkg/storage/manager/storage_manager.go
Expand Up @@ -61,7 +61,9 @@ type StorageManagerInterface interface {

LockStageImage(ctx context.Context, imageName string) error
GetStagesByDigest(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error)
GetStagesByDigestWithCache(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error)
GetStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error)
GetStagesByDigestFromStagesStorageWithCache(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error)
GetStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error)
GetStageDescriptionListWithCache(ctx context.Context) ([]*image.StageDescription, error)
GetFinalStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error)
Expand Down Expand Up @@ -694,27 +696,33 @@ func (m *StorageManager) SelectSuitableStage(ctx context.Context, c stage.Convey
return stageDesc, nil
}

func (m *StorageManager) GetStagesByDigestWithCache(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error) {
return m.GetStagesByDigestFromStagesStorageWithCache(ctx, stageName, stageDigest, m.StagesStorage)
}

func (m *StorageManager) GetStagesByDigest(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error) {
stageIDs, err := m.getStagesIDsByDigestFromStagesStorage(ctx, stageName, stageDigest, m.StagesStorage)
if err != nil {
return nil, fmt.Errorf("unable to get stages ids from %s by digest %s for stage %s: %s", m.StagesStorage.String(), stageDigest, stageName, err)
}
return m.GetStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, m.StagesStorage)
}

validStages, err := m.getStagesDescriptions(ctx, stageIDs, m.StagesStorage, m.CacheStagesStorageList)
func (m *StorageManager) GetStagesByDigestFromStagesStorageWithCache(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error) {
cachedStageDescriptionList, err := m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage, storage.WithCache())
if err != nil {
return nil, fmt.Errorf("unable to get stage descriptions by ids from %s: %s", m.StagesStorage.String(), err)
return nil, err
}

var validStageIDs []image.StageID
for _, s := range validStages {
validStageIDs = append(validStageIDs, *s.StageID)
if len(cachedStageDescriptionList) != 0 {
return cachedStageDescriptionList, nil
}

return validStages, nil
return m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage)
}

func (m *StorageManager) GetStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error) {
stageIDs, err := m.getStagesIDsByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage)
return m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage)
}

func (m *StorageManager) getStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage, opts ...storage.Option) ([]*image.StageDescription, error) {
stageIDs, err := m.getStagesIDsByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage, opts...)
if err != nil {
return nil, fmt.Errorf("unable to get stages ids from %s by digest %s for stage %s: %s", stagesStorage.String(), stageDigest, stageName, err)
}
Expand Down Expand Up @@ -757,12 +765,12 @@ func (m *StorageManager) getWithLocalManifestCacheOption() bool {
return m.StagesStorage.Address() != storage.LocalStorageAddress
}

func (m *StorageManager) getStagesIDsByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]image.StageID, error) {
func (m *StorageManager) getStagesIDsByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage, opts ...storage.Option) ([]image.StageID, error) {
var stageIDs []image.StageID
if err := logboek.Context(ctx).Info().LogProcess("Get %s stages by digest %s from storage", stageName, stageDigest).
DoError(func() error {
var err error
stageIDs, err = stagesStorage.GetStagesIDsByDigest(ctx, m.ProjectName, stageDigest)
stageIDs, err = stagesStorage.GetStagesIDsByDigest(ctx, m.ProjectName, stageDigest, opts...)
if err != nil {
return fmt.Errorf("error getting project %s stage %s images from storage: %s", m.StagesStorage.String(), stageDigest, err)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/repo_stages_storage.go
Expand Up @@ -86,10 +86,11 @@ func (storage *RepoStagesStorage) ConstructStageImageName(_, digest string, uniq
return fmt.Sprintf(RepoStage_ImageFormat, storage.RepoAddress, digest, uniqueID)
}

func (storage *RepoStagesStorage) GetStagesIDs(ctx context.Context, _ string) ([]image.StageID, error) {
func (storage *RepoStagesStorage) GetStagesIDs(ctx context.Context, _ string, opts ...Option) ([]image.StageID, error) {
var res []image.StageID

if tags, err := storage.DockerRegistry.Tags(ctx, storage.RepoAddress); err != nil {
o := makeOptions(opts...)
if tags, err := storage.DockerRegistry.Tags(ctx, storage.RepoAddress, o.dockerRegistryOptions...); err != nil {
return nil, fmt.Errorf("unable to fetch tags for repo %q: %s", storage.RepoAddress, err)
} else {
logboek.Context(ctx).Debug().LogF("-- RepoStagesStorage.GetStagesIDs fetched tags for %q: %#v\n", storage.RepoAddress, tags)
Expand Down Expand Up @@ -184,10 +185,11 @@ func (storage *RepoStagesStorage) DeleteRepo(ctx context.Context) error {
return storage.DockerRegistry.DeleteRepo(ctx, storage.RepoAddress)
}

func (storage *RepoStagesStorage) GetStagesIDsByDigest(ctx context.Context, _, digest string) ([]image.StageID, error) {
func (storage *RepoStagesStorage) GetStagesIDsByDigest(ctx context.Context, _, digest string, opts ...Option) ([]image.StageID, error) {
var res []image.StageID

if tags, err := storage.DockerRegistry.Tags(ctx, storage.RepoAddress); err != nil {
o := makeOptions(opts...)
if tags, err := storage.DockerRegistry.Tags(ctx, storage.RepoAddress, o.dockerRegistryOptions...); err != nil {
return nil, fmt.Errorf("unable to fetch tags for repo %q: %s", storage.RepoAddress, err)
} else {
var rejectedStages []image.StageID
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stages_storage.go
Expand Up @@ -24,8 +24,8 @@ func IsErrBrokenImage(err error) bool {
}

type StagesStorage interface {
GetStagesIDs(ctx context.Context, projectName string) ([]image.StageID, error)
GetStagesIDsByDigest(ctx context.Context, projectName, digest string) ([]image.StageID, error)
GetStagesIDs(ctx context.Context, projectName string, opts ...Option) ([]image.StageID, error)
GetStagesIDsByDigest(ctx context.Context, projectName, digest string, opts ...Option) ([]image.StageID, error)
GetStageDescription(ctx context.Context, projectName, digest string, uniqueID int64) (*image.StageDescription, error)
ExportStage(ctx context.Context, stageDescription *image.StageDescription, destinationReference string) error
DeleteStage(ctx context.Context, stageDescription *image.StageDescription, options DeleteImageOptions) error
Expand Down

0 comments on commit a13a7b0

Please sign in to comment.