diff --git a/cmd/werf/bundle/copy/copy.go b/cmd/werf/bundle/copy/copy.go index 4e76d31e02..a4fe6e32e6 100644 --- a/cmd/werf/bundle/copy/copy.go +++ b/cmd/werf/bundle/copy/copy.go @@ -8,17 +8,23 @@ import ( "github.com/spf13/cobra" helm_v3 "helm.sh/helm/v3/cmd/helm" + "github.com/werf/logboek" "github.com/werf/werf/cmd/werf/common" "github.com/werf/werf/pkg/deploy/bundles" + "github.com/werf/werf/pkg/docker_registry" "github.com/werf/werf/pkg/werf" "github.com/werf/werf/pkg/werf/global_warnings" ) var cmdData struct { - Repo *common.RepoData + // TODO(2.0): Legacy { + Repo string Tag string - To *common.RepoData ToTag string + // TODO(2.0): } Legacy + + From string + To string } var commonCmdData common.CmdData @@ -57,22 +63,16 @@ func NewCmd(ctx context.Context) *cobra.Command { common.SetupInsecureHelmDependencies(&commonCmdData, cmd) common.SetupSkipTlsVerifyRegistry(&commonCmdData, cmd) - cmdData.Repo = common.NewRepoData("repo", common.RepoDataOptions{OnlyAddress: true}) - cmdData.Repo.SetupCmd(cmd) - - cmdData.To = common.NewRepoData("to", common.RepoDataOptions{OnlyAddress: true}) - cmdData.To.SetupCmd(cmd) - common.SetupLogOptions(&commonCmdData, cmd) common.SetupLogProjectDir(&commonCmdData, cmd) common.SetupPlatform(&commonCmdData, cmd) - defaultTag := os.Getenv("WERF_TAG") - if defaultTag == "" { - defaultTag = "latest" - } - cmd.Flags().StringVarP(&cmdData.Tag, "tag", "", defaultTag, "Provide from tag version of the bundle to copy ($WERF_TAG or latest by default)") - cmd.Flags().StringVarP(&cmdData.ToTag, "to-tag", "", "", "Provide to tag version of the bundle to copy ($WERF_TO_TAG or same as --tag by default)") + cmd.Flags().StringVarP(&cmdData.Repo, "repo", "", os.Getenv("WERF_REPO"), "Deprecated param, use --from=ADDR instead. Source address of bundle which should be copied.") + cmd.Flags().StringVarP(&cmdData.Tag, "tag", "", os.Getenv("WERF_TAG"), "Deprecated param, use --from=REPO:TAG instead. Provide from tag version of the bundle to copy ($WERF_TAG or latest by default).") + cmd.Flags().StringVarP(&cmdData.ToTag, "to-tag", "", os.Getenv("WERF_TO_TAG"), "Deprecated param, use --to=REPO:TAG instead. Provide to tag version of the bundle to copy ($WERF_TO_TAG or same as --tag by default).") + + cmd.Flags().StringVarP(&cmdData.From, "from", "", os.Getenv("WERF_FROM"), "Source address of the bundle to copy, specify bundle archive using schema `archive:PATH_TO_ARCHIVE.tar.gz`, specify remote bundle with schema `[docker://]REPO:TAG` or without schema.") + cmd.Flags().StringVarP(&cmdData.To, "to", "", os.Getenv("WERF_TO"), "Destination address of the bundle to copy, specify bundle archive using schema `archive:PATH_TO_ARCHIVE.tar.gz`, specify remote bundle with schema `[docker://]REPO:TAG` or without schema.") return cmd } @@ -93,26 +93,68 @@ func runCopy(ctx context.Context) error { return err } - if *cmdData.Repo.Address == "" { - return fmt.Errorf("--repo=ADDRESS param required") + fromAddrRaw := cmdData.From + if fromAddrRaw != "" && cmdData.Repo != "" { + return fmt.Errorf("unable to use --repo=ADDRESS and --from=ADDRESS params at the same time: please specify only --from=ADDRESS param, --repo param has been deprecated") + } else if cmdData.Repo != "" { + logboek.Context(ctx).Warn().LogF("Please use --from=ADDRESS param instead of deprecated --repo=ADDRESS param\n") + fromAddrRaw = cmdData.Repo + } + if fromAddrRaw == "" { + return fmt.Errorf("--from=ADDRESS param required") } - if *cmdData.To.Address == "" { + + toAddrRaw := cmdData.To + if toAddrRaw == "" { return fmt.Errorf("--to=ADDRESS param required") } - fromRegistry, err := cmdData.Repo.CreateDockerRegistry(ctx, *commonCmdData.InsecureRegistry, *commonCmdData.SkipTlsVerifyRegistry) + fromAddr, err := bundles.ParseAddr(fromAddrRaw) if err != nil { - return fmt.Errorf("error creating container registry accessor for repo %s: %w", *cmdData.Repo.Address, err) + return fmt.Errorf("invalid from addr %q: %w", fromAddrRaw, err) } - fromTag := cmdData.Tag - toTag := cmdData.ToTag - if toTag == "" { - toTag = fromTag + toAddr, err := bundles.ParseAddr(toAddrRaw) + if err != nil { + return fmt.Errorf("invalid to addr %q: %w", toAddrRaw, err) } - fromRef := fmt.Sprintf("%s:%s", *cmdData.Repo.Address, fromTag) - toRef := fmt.Sprintf("%s:%s", *cmdData.To.Address, toTag) + var fromRegistry, toRegistry docker_registry.Interface - return bundles.Copy(ctx, fromRef, toRef, bundlesRegistryClient, fromRegistry) + if fromAddr.RegistryAddress != nil { + // TODO(2.0): remove legacy compatibility param + if cmdData.Tag != "" { + logboek.Context(ctx).Warn().LogF("Please use --from=REPO:TAG tag specification instead of deprecated --tag=TAG param\n") + fromAddr.RegistryAddress.Tag = cmdData.Tag + } + + fromRegistry, err = common.CreateDockerRegistry(fromAddr.RegistryAddress.Repo, *commonCmdData.InsecureRegistry, *commonCmdData.SkipTlsVerifyRegistry) + if err != nil { + return err + } + } + + if toAddr.RegistryAddress != nil { + // TODO(2.0): remove legacy compatibility param + if cmdData.ToTag != "" { + logboek.Context(ctx).Warn().LogF("Please use --to=REPO:TAG tag specification instead of deprecated --to-tag=TAG param\n") + toAddr.RegistryAddress.Tag = cmdData.ToTag + } + + toRegistry, err = common.CreateDockerRegistry(toAddr.RegistryAddress.Repo, *commonCmdData.InsecureRegistry, *commonCmdData.SkipTlsVerifyRegistry) + if err != nil { + return err + } + } + + return logboek.Context(ctx).LogProcess("Copy bundle").DoError(func() error { + logboek.Context(ctx).LogFDetails("From: %s\n", fromAddr.String()) + logboek.Context(ctx).LogFDetails("To: %s\n", toAddr.String()) + + return bundles.Copy(ctx, fromAddr, toAddr, bundles.CopyOptions{ + BundlesRegistryClient: bundlesRegistryClient, + FromRegistry: fromRegistry, + ToRegistry: toRegistry, + }) + }) } diff --git a/cmd/werf/common/repo_data.go b/cmd/werf/common/repo_data.go index 6c2dd235dd..865d671e92 100644 --- a/cmd/werf/common/repo_data.go +++ b/cmd/werf/common/repo_data.go @@ -14,6 +14,20 @@ import ( "github.com/werf/werf/pkg/storage" ) +func CreateDockerRegistry(addr string, insecureRegistry, skipTlsVerifyRegistry bool) (docker_registry.Interface, error) { + regOpts := docker_registry.DockerRegistryOptions{ + InsecureRegistry: insecureRegistry, + SkipTlsVerifyRegistry: skipTlsVerifyRegistry, + } + + dockerRegistry, err := docker_registry.NewDockerRegistry(addr, "", regOpts) + if err != nil { + return nil, fmt.Errorf("error creating container registry accessor for repo %q: %w", addr, err) + } + + return dockerRegistry, nil +} + func (repoData *RepoData) CreateDockerRegistry(ctx context.Context, insecureRegistry, skipTlsVerifyRegistry bool) (docker_registry.Interface, error) { addr, err := repoData.GetAddress() if err != nil { diff --git a/go.mod b/go.mod index cba1e79041..c7d669e54b 100644 --- a/go.mod +++ b/go.mod @@ -314,6 +314,6 @@ replace k8s.io/helm => github.com/werf/helm v0.0.0-20210202111118-81e74d46da0f replace github.com/deislabs/oras => github.com/werf/third-party-oras v0.9.1-0.20210927171747-6d045506f4c8 -replace helm.sh/helm/v3 => github.com/werf/3p-helm/v3 v3.0.0-20220823144404-27eb1367786c +replace helm.sh/helm/v3 => github.com/werf/3p-helm/v3 v3.0.0-20220902145201-6265178e3c32 replace github.com/go-git/go-git/v5 => github.com/ZauberNerd/go-git/v5 v5.4.3-0.20220315170230-29ec1bc1e5db diff --git a/go.sum b/go.sum index e0d46ad08b..29e6137ffd 100644 --- a/go.sum +++ b/go.sum @@ -2050,8 +2050,8 @@ github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59b github.com/weppos/publicsuffix-go v0.4.0/go.mod h1:z3LCPQ38eedDQSwmsSRW4Y7t2L8Ln16JPQ02lHAdn5k= github.com/weppos/publicsuffix-go v0.5.0 h1:rutRtjBJViU/YjcI5d80t4JAVvDltS6bciJg2K1HrLU= github.com/weppos/publicsuffix-go v0.5.0/go.mod h1:z3LCPQ38eedDQSwmsSRW4Y7t2L8Ln16JPQ02lHAdn5k= -github.com/werf/3p-helm/v3 v3.0.0-20220823144404-27eb1367786c h1:/cMYGHrCXCohS+2Thjj+7fiVVG1a/LmGC4BxZOK1sYY= -github.com/werf/3p-helm/v3 v3.0.0-20220823144404-27eb1367786c/go.mod h1:NxtE2KObf2PrzDl6SIamPFPKyAqWi10iWuvKlQn/Yao= +github.com/werf/3p-helm/v3 v3.0.0-20220902145201-6265178e3c32 h1:Z3XL0banUFF7IkIzY82onwWA2qiTvuQ0pBMERA/scis= +github.com/werf/3p-helm/v3 v3.0.0-20220902145201-6265178e3c32/go.mod h1:NxtE2KObf2PrzDl6SIamPFPKyAqWi10iWuvKlQn/Yao= github.com/werf/copy-recurse v0.2.4 h1:kEyGUKhgS8WdEOjInNQKgk4lqPWzP2AgR27F3dcGsVc= github.com/werf/copy-recurse v0.2.4/go.mod h1:KVHSQ90p19xflWW0B7BJhLBwmSbEtuxIaBnjlUYRPhk= github.com/werf/helm v0.0.0-20210202111118-81e74d46da0f h1:81YscYTF9mmTf0ULOsCmm42YWQp+qWDzWi1HjWniZrg= diff --git a/pkg/deploy/bundles/addr.go b/pkg/deploy/bundles/addr.go new file mode 100644 index 0000000000..b8a8b4845f --- /dev/null +++ b/pkg/deploy/bundles/addr.go @@ -0,0 +1,75 @@ +package bundles + +import ( + "fmt" + "strings" + + "github.com/werf/werf/pkg/deploy/bundles/registry" +) + +const ( + ArchiveSchema = "archive:" + RegistrySchema = "docker://" +) + +type Addr struct { + *ArchiveAddress + *RegistryAddress +} + +func (addr *Addr) String() string { + if addr.RegistryAddress != nil { + return addr.RegistryAddress.FullName() + } + if addr.ArchiveAddress != nil { + return addr.ArchiveAddress.Path + } + return "" +} + +type ArchiveAddress struct { + Path string +} + +type RegistryAddress struct { + *registry.Reference +} + +func ParseAddr(addr string) (*Addr, error) { + switch { + case strings.HasPrefix(addr, ArchiveSchema): + return &Addr{ArchiveAddress: parseArchiveAddress(addr)}, nil + case strings.HasPrefix(addr, RegistrySchema): + if regAddr, err := parseRegistryAddress(addr); err != nil { + return nil, fmt.Errorf("unable to parse registry address %q: %w", addr, err) + } else { + return &Addr{RegistryAddress: regAddr}, nil + } + default: + if regAddr, err := parseRegistryAddress(addr); err != nil { + return nil, fmt.Errorf("unable to parse registry address %q: %w", addr, err) + } else { + return &Addr{RegistryAddress: regAddr}, nil + } + } +} + +func parseRegistryAddress(addr string) (*RegistryAddress, error) { + cleanAddr := strings.TrimPrefix(addr, RegistrySchema) + + ref, err := registry.ParseReference(cleanAddr) + if err != nil { + return nil, err + } + + if ref.Tag == "" { + ref.Tag = "latest" + } + + return &RegistryAddress{Reference: ref}, nil +} + +func parseArchiveAddress(addr string) *ArchiveAddress { + path := strings.TrimPrefix(addr, ArchiveSchema) + return &ArchiveAddress{Path: path} +} diff --git a/pkg/deploy/bundles/addr_test.go b/pkg/deploy/bundles/addr_test.go new file mode 100644 index 0000000000..e0fa305344 --- /dev/null +++ b/pkg/deploy/bundles/addr_test.go @@ -0,0 +1,44 @@ +package bundles + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Bundle addr", func() { + It("should parse different bundle address schemas", func() { + { + addr, err := ParseAddr("registry.werf.io/group/image:mytag") + Expect(err).NotTo(HaveOccurred()) + Expect(addr.RegistryAddress).NotTo(BeNil()) + Expect(addr.ArchiveAddress).To(BeNil()) + Expect(addr.Repo).To(Equal("registry.werf.io/group/image")) + Expect(addr.Tag).To(Equal("mytag")) + } + + { + addr, err := ParseAddr("registry.werf.io/group/image") + Expect(err).NotTo(HaveOccurred()) + Expect(addr.RegistryAddress).NotTo(BeNil()) + Expect(addr.ArchiveAddress).To(BeNil()) + Expect(addr.Repo).To(Equal("registry.werf.io/group/image")) + Expect(addr.Tag).To(Equal("latest")) + } + + { + addr, err := ParseAddr("archive:path/to/file.tar.gz") + Expect(err).NotTo(HaveOccurred()) + Expect(addr.RegistryAddress).To(BeNil()) + Expect(addr.ArchiveAddress).NotTo(BeNil()) + Expect(addr.Path).To(Equal("path/to/file.tar.gz")) + } + + { + addr, err := ParseAddr("archive:/absolute/path/to/file.tar.gz") + Expect(err).NotTo(HaveOccurred()) + Expect(addr.RegistryAddress).To(BeNil()) + Expect(addr.ArchiveAddress).NotTo(BeNil()) + Expect(addr.Path).To(Equal("/absolute/path/to/file.tar.gz")) + } + }) +}) diff --git a/pkg/deploy/bundles/bundle.go b/pkg/deploy/bundles/bundle.go new file mode 100644 index 0000000000..83d10c7b8b --- /dev/null +++ b/pkg/deploy/bundles/bundle.go @@ -0,0 +1,302 @@ +package bundles + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + "io" + "os" + "time" + + "github.com/google/uuid" + + bundles_registry "github.com/werf/werf/pkg/deploy/bundles/registry" +) + +const ( + chartArchiveFileName = "chart.tar.gz" +) + +type Bundle interface{} + +type BundleArchiveWriter interface { + Open() error + WriteChartArchive(data []byte) error + WriteImageArchive(imageTag string, data []byte) error + Save() error +} + +type BundleArchiveReader interface { + ReadChartArchive() ([]byte, error) + GetImageArchiveOpener(imageTag string) *ImageArchiveOpener + ReadImageArchive(imageTag string) (*ImageArchiveReadCloser, error) +} + +type RemoteBundle struct { + RegistryClient *bundles_registry.Client +} + +func NewRemoteBundle(registryClient *bundles_registry.Client) *RemoteBundle { + return &RemoteBundle{ + RegistryClient: registryClient, + } +} + +type BundleArchive struct { + Path string + + tmpArchivePath string + tmpArchiveWriter *tar.Writer + tmpArchiveCloser func() error +} + +func NewBundleArchive(path string) *BundleArchive { + return &BundleArchive{ + Path: path, + } +} + +func (bundle *BundleArchive) Open() error { + p := fmt.Sprintf("%s.%s.tmp", bundle.Path, uuid.New().String()) + + f, err := os.Create(p) + if err != nil { + return fmt.Errorf("unable to open tmp archive file %q: %w", p, err) + } + + zipper := gzip.NewWriter(f) + zipper.Header.Comment = "bundle-archive" + twriter := tar.NewWriter(zipper) + + bundle.tmpArchivePath = p + bundle.tmpArchiveWriter = twriter + bundle.tmpArchiveCloser = func() error { + if err := twriter.Close(); err != nil { + return fmt.Errorf("unable to close tar writer for %q: %w", bundle.tmpArchivePath, err) + } + if err := zipper.Close(); err != nil { + return fmt.Errorf("unable to close zipper for %q: %w", bundle.tmpArchivePath, err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("unable to close %q: %w", bundle.tmpArchivePath, err) + } + return nil + } + + now := time.Now() + header := &tar.Header{ + Name: "images", + Typeflag: tar.TypeDir, + Mode: 0o777, + ModTime: now, + AccessTime: now, + ChangeTime: now, + } + if err := bundle.tmpArchiveWriter.WriteHeader(header); err != nil { + return fmt.Errorf("unable to write images dir header: %w", err) + } + + return nil +} + +func (bundle *BundleArchive) Save() error { + if bundle.tmpArchiveWriter == nil { + panic(fmt.Sprintf("bundle archive %q is not opened", bundle.Path)) + } + + if err := bundle.tmpArchiveCloser(); err != nil { + return fmt.Errorf("unable to close tmp archive %q: %w", bundle.tmpArchivePath, err) + } + + if err := os.RemoveAll(bundle.Path); err != nil { + return fmt.Errorf("unable to cleanup destination archive path %q: %w", bundle.Path, err) + } + + if err := os.Rename(bundle.tmpArchivePath, bundle.Path); err != nil { + return fmt.Errorf("unable to rename tmp bundle archive %q to %q: %w", bundle.tmpArchivePath, bundle.Path, err) + } + + return nil +} + +func (bundle *BundleArchive) WriteChartArchive(data []byte) error { + now := time.Now() + header := &tar.Header{ + Name: chartArchiveFileName, + Typeflag: tar.TypeReg, + Mode: 0o777, + Size: int64(len(data)), + ModTime: now, + AccessTime: now, + ChangeTime: now, + } + + if err := bundle.tmpArchiveWriter.WriteHeader(header); err != nil { + return fmt.Errorf("unable to write %q header: %w", chartArchiveFileName, err) + } + + if _, err := bundle.tmpArchiveWriter.Write(data); err != nil { + return fmt.Errorf("unable to write %q data: %w", chartArchiveFileName, err) + } + + return nil +} + +func (bundle *BundleArchive) WriteImageArchive(imageTag string, data []byte) error { + now := time.Now() + + header := &tar.Header{ + Name: fmt.Sprintf("images/%s.tar.gz", imageTag), + Typeflag: tar.TypeReg, + Mode: 0o777, + Size: int64(len(data)), + ModTime: now, + AccessTime: now, + ChangeTime: now, + } + + if err := bundle.tmpArchiveWriter.WriteHeader(header); err != nil { + return fmt.Errorf("unable to write chart.tar.gz header: %w", err) + } + + if _, err := bundle.tmpArchiveWriter.Write(data); err != nil { + return fmt.Errorf("unable to write chart.tar.gz data: %w", err) + } + + return nil +} + +func (bundle *BundleArchive) openForReading() (*tar.Reader, func() error, error) { + f, err := os.Open(bundle.Path) + if err != nil { + return nil, func() error { return nil }, err + } + + unzipper, err := gzip.NewReader(f) + if err != nil { + return nil, f.Close, fmt.Errorf("unable to open bundle archive gzip %q: %w", bundle.Path, err) + } + + closer := func() error { + if err := unzipper.Close(); err != nil { + return fmt.Errorf("unable to close gzipper for %q: %w", bundle.Path, err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("unable to close %q: %w", bundle.Path, err) + } + return nil + } + + return tar.NewReader(unzipper), closer, nil +} + +func (bundle *BundleArchive) ReadChartArchive() ([]byte, error) { + treader, closer, err := bundle.openForReading() + defer closer() + + if err != nil { + return nil, fmt.Errorf("unable to open bundle archive: %w", err) + } + + b := bytes.NewBuffer(nil) + + for { + header, err := treader.Next() + if err == io.EOF { + return nil, fmt.Errorf("no chart archive found in the bundle archive %q", bundle.Path) + } + if err != nil { + return nil, fmt.Errorf("error reading tar archive: %w", err) + } + + if header.Typeflag != tar.TypeReg { + continue + } + if header.Name != chartArchiveFileName { + continue + } + + if _, err := io.Copy(b, treader); err != nil { + return nil, fmt.Errorf("unable to read chart archive %q from the bundle archive %q: %w", chartArchiveFileName, bundle.Path, err) + } + + return b.Bytes(), nil + } +} + +func (bundle *BundleArchive) GetImageArchiveOpener(imageTag string) *ImageArchiveOpener { + return NewImageArchiveOpener(bundle, imageTag) +} + +func (bundle *BundleArchive) ReadImageArchive(imageTag string) (*ImageArchiveReadCloser, error) { + treader, closer, err := bundle.openForReading() + if err != nil { + defer closer() + return nil, fmt.Errorf("unable to open bundle archive: %w", err) + } + + for { + header, err := treader.Next() + if err == io.EOF { + return nil, fmt.Errorf("no image tag %q found in the bundle archive %q", imageTag, bundle.Path) + } + if err != nil { + return nil, fmt.Errorf("error reading tar archive: %w", err) + } + + if header.Typeflag != tar.TypeReg { + continue + } + + if header.Name == fmt.Sprintf("images/%s.tar.gz", imageTag) { + unzipper, err := gzip.NewReader(treader) + if err != nil { + return nil, fmt.Errorf("unable to create gzip reader for image archive: %w", err) + } + + return NewImageArchiveReadCloser(unzipper, func() error { + if err := unzipper.Close(); err != nil { + return fmt.Errorf("unable to close gzip reader for image archive: %w", err) + } + return closer() + }), nil + } + } +} + +type ImageArchiveOpener struct { + Archive *BundleArchive + ImageTag string +} + +func NewImageArchiveOpener(archive *BundleArchive, imageTag string) *ImageArchiveOpener { + return &ImageArchiveOpener{ + Archive: archive, + ImageTag: imageTag, + } +} + +func (opener *ImageArchiveOpener) Open() (io.ReadCloser, error) { + return opener.Archive.ReadImageArchive(opener.ImageTag) +} + +type ImageArchiveReadCloser struct { + reader io.Reader + closer func() error +} + +func NewImageArchiveReadCloser(reader io.Reader, closer func() error) *ImageArchiveReadCloser { + return &ImageArchiveReadCloser{ + reader: reader, + closer: closer, + } +} + +func (opener *ImageArchiveReadCloser) Read(p []byte) (int, error) { + return opener.reader.Read(p) +} + +func (opener *ImageArchiveReadCloser) Close() error { + return opener.closer() +} diff --git a/pkg/deploy/bundles/chart_helpers.go b/pkg/deploy/bundles/chart_helpers.go new file mode 100644 index 0000000000..d2eb232f07 --- /dev/null +++ b/pkg/deploy/bundles/chart_helpers.go @@ -0,0 +1,38 @@ +package bundles + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chart/loader" + "helm.sh/helm/v3/pkg/chartutil" +) + +func ChartToBytes(ch *chart.Chart) ([]byte, error) { + chartBytes := bytes.NewBuffer(nil) + zipper := gzip.NewWriter(chartBytes) + chartutil.SetGzipWriterMeta(zipper) + twriter := tar.NewWriter(zipper) + + if err := chartutil.SaveIntoTar(twriter, ch, chartutil.SaveIntoTarOptions{}); err != nil { + return nil, fmt.Errorf("unable to save chart to tar: %w", err) + } + + if err := twriter.Close(); err != nil { + return nil, fmt.Errorf("unable to close chart tar: %w", err) + } + + if err := zipper.Close(); err != nil { + return nil, fmt.Errorf("unable to close chart gzip: %w", err) + } + + return chartBytes.Bytes(), nil +} + +func BytesToChart(data []byte) (*chart.Chart, error) { + dataReader := bytes.NewBuffer(data) + return loader.LoadArchiveWithOptions(dataReader, loader.LoadOptions{}) +} diff --git a/pkg/deploy/bundles/copy.go b/pkg/deploy/bundles/copy.go index 14009be43b..3a8fb065b2 100644 --- a/pkg/deploy/bundles/copy.go +++ b/pkg/deploy/bundles/copy.go @@ -1,34 +1,219 @@ package bundles import ( + "bytes" + "compress/gzip" "context" "fmt" "strings" v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/otiai10/copy" "helm.sh/helm/v3/pkg/chart" "helm.sh/helm/v3/pkg/chartutil" "sigs.k8s.io/yaml" "github.com/werf/logboek" - "github.com/werf/werf/pkg/deploy/bundles/registry" + bundles_registry "github.com/werf/werf/pkg/deploy/bundles/registry" "github.com/werf/werf/pkg/docker_registry" + "github.com/werf/werf/pkg/image" "github.com/werf/werf/pkg/util" ) -func Copy(ctx context.Context, fromAddr, toAddr string, bundlesRegistryClient *registry.Client, fromRegistry docker_registry.Interface) error { - fromRef, err := registry.ParseReference(fromAddr) +type CopyOptions struct { + BundlesRegistryClient *bundles_registry.Client + FromRegistry, ToRegistry docker_registry.Interface +} + +func Copy(ctx context.Context, fromAddr, toAddr *Addr, opts CopyOptions) error { + switch { + case fromAddr.RegistryAddress != nil && toAddr.ArchiveAddress != nil: + return copyFromRegistryToArchive(ctx, fromAddr.RegistryAddress, toAddr.ArchiveAddress, opts.BundlesRegistryClient, opts.FromRegistry) + case fromAddr.ArchiveAddress != nil && toAddr.RegistryAddress != nil: + return copyFromArchiveToRegistry(ctx, fromAddr.ArchiveAddress, toAddr.RegistryAddress, opts.BundlesRegistryClient, opts.ToRegistry) + case fromAddr.RegistryAddress != nil && toAddr.RegistryAddress != nil: + return copyFromRegistryToRegistry(ctx, fromAddr.RegistryAddress, toAddr.RegistryAddress, opts.BundlesRegistryClient, opts.FromRegistry, opts.ToRegistry) + case fromAddr.ArchiveAddress != nil && toAddr.ArchiveAddress != nil: + return copyFromArchiveToArchive(ctx, fromAddr.ArchiveAddress, toAddr.ArchiveAddress) + default: + panic(fmt.Sprintf("unexpected from %v and to %v", fromAddr, toAddr)) + } +} + +func copyFromArchiveToArchive(ctx context.Context, from, to *ArchiveAddress) error { + logboek.Context(ctx).Debug().LogF("-- copyFromArchiveToArchive\n") + if err := copy.Copy(from.Path, to.Path); err != nil { + return err + } + return nil +} + +func copyFromArchiveToRegistry(ctx context.Context, from *ArchiveAddress, to *RegistryAddress, bundlesRegistryClient *bundles_registry.Client, toRegistry docker_registry.Interface) error { + logboek.Context(ctx).Debug().LogF("-- copyFromArchiveToRegistry\n") + + bundleArchive := NewBundleArchive(from.Path) + + chartBytes, err := bundleArchive.ReadChartArchive() if err != nil { - return fmt.Errorf("unable to parse source address %q: %w", fromAddr, err) + return fmt.Errorf("unable to read chart from the bundle archive at %q: %w", bundleArchive.Path, err) } - toRef, err := registry.ParseReference(toAddr) + + ch, err := BytesToChart(chartBytes) if err != nil { - return fmt.Errorf("unable to parse destination address %q: %w", toAddr, err) + return fmt.Errorf("unable to read chart from the bundle archive %q: %w", bundleArchive.Path, err) + } + + if werfVals, ok := ch.Values["werf"].(map[string]interface{}); ok { + if imageVals, ok := werfVals["image"].(map[string]interface{}); ok { + newImageVals := make(map[string]interface{}) + + for imageName, v := range imageVals { + if imageRef, ok := v.(string); ok { + ref, err := bundles_registry.ParseReference(imageRef) + if err != nil { + return fmt.Errorf("unable to parse bundle image %s: %w", imageRef, err) + } + ref.Repo = to.Repo + + if imageRef != ref.FullName() { + if err := logboek.Context(ctx).LogProcess("Copy image from bundle archive").DoError(func() error { + logboek.Context(ctx).Default().LogFDetails("Destination: %s\n", ref.FullName()) + + imageArchiveOpener := bundleArchive.GetImageArchiveOpener(ref.Tag) + + if err := toRegistry.PushImageArchive(ctx, imageArchiveOpener, ref.FullName()); err != nil { + return fmt.Errorf("error copying image from bundle archive %q into %q: %w", bundleArchive.Path, ref.FullName(), err) + } + + return nil + }); err != nil { + return err + } + } + + newImageVals[imageName] = ref.FullName() + } else { + return fmt.Errorf("unexpected value .Values.werf.image.%s=%v", imageName, v) + } + } + + werfVals["image"] = newImageVals + } + + werfVals["repo"] = to.Repo + } + + ch.Metadata.Name = util.Reverse(strings.SplitN(util.Reverse(to.Repo), "/", 2)[0]) + + if err := logboek.Context(ctx).LogProcess("Saving bundle %s", to.FullName()).DoError(func() error { + if err := bundlesRegistryClient.SaveChart(ch, to.Reference); err != nil { + return fmt.Errorf("unable to save bundle %s to the local chart helm cache: %w", to.FullName(), err) + } + return nil + }); err != nil { + return err + } + + if err := logboek.Context(ctx).LogProcess("Pushing bundle %s", to.FullName()).DoError(func() error { + if err := bundlesRegistryClient.PushChart(to.Reference); err != nil { + return fmt.Errorf("unable to push bundle %s: %w", to.FullName(), err) + } + return nil + }); err != nil { + return err } - if err := logboek.Context(ctx).LogProcess("Pulling bundle %s", fromRef.FullName()).DoError(func() error { - if err := bundlesRegistryClient.PullChartToCache(fromRef); err != nil { - return fmt.Errorf("unable to pull bundle %s: %w", fromRef.FullName(), err) + return nil +} + +func copyFromRegistryToArchive(ctx context.Context, from *RegistryAddress, to *ArchiveAddress, bundlesRegistryClient *bundles_registry.Client, fromRegistry docker_registry.Interface) error { + logboek.Context(ctx).Debug().LogF("-- copyFromRegistryToArchive\n") + + if err := logboek.Context(ctx).LogProcess("Pulling bundle %s", from.FullName()).DoError(func() error { + if err := bundlesRegistryClient.PullChartToCache(from.Reference); err != nil { + return fmt.Errorf("unable to pull bundle %s: %w", from.FullName(), err) + } + return nil + }); err != nil { + return err + } + + var ch *chart.Chart + if err := logboek.Context(ctx).LogProcess("Loading bundle %s", from.FullName()).DoError(func() error { + var err error + ch, err = bundlesRegistryClient.LoadChart(from.Reference) + if err != nil { + return fmt.Errorf("unable to load pulled bundle %s: %w", from.FullName(), err) + } + return nil + }); err != nil { + return err + } + + b := NewBundleArchive(to.Path) + + if err := b.Open(); err != nil { + return fmt.Errorf("unable to open target bundle archive: %w", err) + } + + if err := logboek.Context(ctx).LogProcess("Saving bundle %s into archive", from.FullName()).DoError(func() error { + chartBytes, err := ChartToBytes(ch) + if err != nil { + return fmt.Errorf("uanble to dump chart to bytes: %w", err) + } + + if err := b.WriteChartArchive(chartBytes); err != nil { + return fmt.Errorf("unable to write chart archive into bundle archive: %w", err) + } + + return nil + }); err != nil { + return err + } + + if werfVals, ok := ch.Values["werf"].(map[string]interface{}); ok { + if imageVals, ok := werfVals["image"].(map[string]interface{}); ok { + for imageName, v := range imageVals { + if imageRef, ok := v.(string); ok { + logboek.Context(ctx).Default().LogFDetails("Saving image %s\n", imageRef) + + _, tag := image.ParseRepositoryAndTag(imageRef) + + // TODO: maybe save into tmp file archive OR read resulting image size from the registry before pulling + imageBytes := bytes.NewBuffer(nil) + zipper := gzip.NewWriter(imageBytes) + + if err := fromRegistry.PullImageArchive(ctx, zipper, imageRef); err != nil { + return fmt.Errorf("error pulling image %q archive: %w", imageRef, err) + } + + if err := zipper.Close(); err != nil { + return fmt.Errorf("unable to close gzip writer: %w", err) + } + + if err := b.WriteImageArchive(tag, imageBytes.Bytes()); err != nil { + return fmt.Errorf("error writing image %q into bundle archive: %w", imageRef, err) + } + } else { + return fmt.Errorf("unexpected value .Values.werf.image.%s=%v", imageName, v) + } + } + } + } + + if err := b.Save(); err != nil { + return fmt.Errorf("error saving destination bundle archive: %w", err) + } + + return nil +} + +func copyFromRegistryToRegistry(ctx context.Context, from, to *RegistryAddress, bundlesRegistryClient *bundles_registry.Client, fromRegistry, toRegistry docker_registry.Interface) error { + logboek.Context(ctx).Debug().LogF("-- copyFromRegistryToRegistry\n") + + if err := logboek.Context(ctx).LogProcess("Pulling bundle %s", from.FullName()).DoError(func() error { + if err := bundlesRegistryClient.PullChartToCache(from.Reference); err != nil { + return fmt.Errorf("unable to pull bundle %s: %w", from.FullName(), err) } return nil }); err != nil { @@ -36,10 +221,11 @@ func Copy(ctx context.Context, fromAddr, toAddr string, bundlesRegistryClient *r } var ch *chart.Chart - if err := logboek.Context(ctx).LogProcess("Loading bundle %s", fromRef.FullName()).DoError(func() error { - ch, err = bundlesRegistryClient.LoadChart(fromRef) + if err := logboek.Context(ctx).LogProcess("Loading bundle %s", from.FullName()).DoError(func() error { + var err error + ch, err = bundlesRegistryClient.LoadChart(from.Reference) if err != nil { - return fmt.Errorf("unable to load pulled bundle %s: %w", fromRef.FullName(), err) + return fmt.Errorf("unable to load pulled bundle %s: %w", from.FullName(), err) } return nil }); err != nil { @@ -57,12 +243,12 @@ func Copy(ctx context.Context, fromAddr, toAddr string, bundlesRegistryClient *r for imageName, v := range imageVals { if image, ok := v.(string); ok { - ref, err := registry.ParseReference(image) + ref, err := bundles_registry.ParseReference(image) if err != nil { return fmt.Errorf("unable to parse bundle image %s: %w", image, err) } - ref.Repo = toRef.Repo + ref.Repo = to.Repo // TODO: copy images in parallel if image != ref.FullName() { @@ -88,7 +274,7 @@ func Copy(ctx context.Context, fromAddr, toAddr string, bundlesRegistryClient *r werfVals["image"] = newImageVals } - werfVals["repo"] = toRef.Repo + werfVals["repo"] = to.Repo } valuesRaw, err := yaml.Marshal(ch.Values) @@ -104,20 +290,20 @@ func Copy(ctx context.Context, fromAddr, toAddr string, bundlesRegistryClient *r } } - ch.Metadata.Name = util.Reverse(strings.SplitN(util.Reverse(toRef.Repo), "/", 2)[0]) + ch.Metadata.Name = util.Reverse(strings.SplitN(util.Reverse(to.Repo), "/", 2)[0]) - if err := logboek.Context(ctx).LogProcess("Saving bundle %s", toRef.FullName()).DoError(func() error { - if err := bundlesRegistryClient.SaveChart(ch, toRef); err != nil { - return fmt.Errorf("unable to save bundle %s to the local chart helm cache: %w", toRef.FullName(), err) + if err := logboek.Context(ctx).LogProcess("Saving bundle %s", to.FullName()).DoError(func() error { + if err := bundlesRegistryClient.SaveChart(ch, to.Reference); err != nil { + return fmt.Errorf("unable to save bundle %s to the local chart helm cache: %w", to.FullName(), err) } return nil }); err != nil { return err } - if err := logboek.Context(ctx).LogProcess("Pushing bundle %s", toRef.FullName()).DoError(func() error { - if err := bundlesRegistryClient.PushChart(toRef); err != nil { - return fmt.Errorf("unable to push bundle %s: %w", toRef.FullName(), err) + if err := logboek.Context(ctx).LogProcess("Pushing bundle %s", to.FullName()).DoError(func() error { + if err := bundlesRegistryClient.PushChart(to.Reference); err != nil { + return fmt.Errorf("unable to push bundle %s: %w", to.FullName(), err) } return nil }); err != nil { diff --git a/pkg/deploy/bundles/suite_test.go b/pkg/deploy/bundles/suite_test.go new file mode 100644 index 0000000000..7115c5d2a2 --- /dev/null +++ b/pkg/deploy/bundles/suite_test.go @@ -0,0 +1,13 @@ +package bundles + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestStage(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "deploy/bundles suite") +} diff --git a/pkg/docker_registry/api.go b/pkg/docker_registry/api.go index 6aeab7bc9d..163119cf56 100644 --- a/pkg/docker_registry/api.go +++ b/pkg/docker_registry/api.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "errors" "fmt" + "io" "math/rand" "net/http" "os" @@ -19,6 +20,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/tarball" "github.com/werf/logboek" "github.com/werf/werf/pkg/docker_registry/container_registry_extensions" @@ -236,37 +238,12 @@ type PushImageOptions struct { } func (api *api) PushImage(ctx context.Context, reference string, opts *PushImageOptions) error { - retriesLimit := 5 - -attemptLoop: - for attempt := 1; attempt <= retriesLimit; attempt++ { - if err := api.pushImage(ctx, reference, opts); err != nil { - for _, substr := range []string{ - "REDACTED: UNKNOWN", - "http2: server sent GOAWAY and closed the connection", - "http2: Transport received Server's graceful shutdown GOAWAY", - } { - if strings.Contains(err.Error(), substr) { - seconds := rand.Intn(5) + 1 - - msg := fmt.Sprintf("Retrying publishing in %d seconds (%d/%d) ...\n", seconds, attempt, retriesLimit) - logboek.Context(ctx).Warn().LogLn(msg) - - time.Sleep(time.Duration(seconds) * time.Second) - continue attemptLoop - } - } - - return err - } - - return nil - } - - return nil + return api.pushWithRetry(ctx, func() error { + return api.pushImage(ctx, reference, opts) + }) } -func (api *api) pushImage(_ context.Context, reference string, opts *PushImageOptions) error { +func (api *api) pushImage(ctx context.Context, reference string, opts *PushImageOptions) error { ref, err := name.ParseReference(reference, api.parseReferenceOptions()...) if err != nil { return fmt.Errorf("parsing reference %q: %w", reference, err) @@ -276,15 +253,9 @@ func (api *api) pushImage(_ context.Context, reference string, opts *PushImageOp if opts != nil { labels = opts.Labels } - img := container_registry_extensions.NewManifestOnlyImage(labels) - oldDefaultTransport := http.DefaultTransport - http.DefaultTransport = api.getHttpTransport() - err = remote.Write(ref, img, remote.WithAuthFromKeychain(authn.DefaultKeychain)) - http.DefaultTransport = oldDefaultTransport - - if err != nil { + if err := api.writeToRemote(ctx, ref, img); err != nil { return fmt.Errorf("write to the remote %s have failed: %w", ref.String(), err) } @@ -388,6 +359,120 @@ func (api *api) parseReferenceParts(reference string) (referenceParts, error) { return referenceParts, nil } +func (api *api) PushImageArchive(ctx context.Context, archiveOpener ArchiveOpener, reference string) error { + tag, err := name.NewTag(reference, api.parseReferenceOptions()...) + if err != nil { + return fmt.Errorf("unable to parse reference %q: %w", reference, err) + } + + img, err := tarball.Image(archiveOpener.Open, nil) + if err != nil { + return fmt.Errorf("unable to open tarball image: %w", err) + } + + return api.pushWithRetry(ctx, func() error { + if err := api.writeToRemote(ctx, tag, img); err != nil { + return fmt.Errorf("write to the remote %s have failed: %w", tag.String(), err) + } + return nil + }) +} + +func (api *api) pushWithRetry(ctx context.Context, pusher func() error) error { + const retriesLimit = 5 + var err error + +attemptLoop: + for attempt := 1; attempt <= retriesLimit; attempt++ { + err = pusher() + + if err != nil { + for _, substr := range []string{ + "REDACTED: UNKNOWN", + "http2: server sent GOAWAY and closed the connection", + "http2: Transport received Server's graceful shutdown GOAWAY", + } { + if strings.Contains(err.Error(), substr) { + seconds := rand.Intn(5) + 1 + + msg := fmt.Sprintf("Retrying publishing in %d seconds (%d/%d) ...\n", seconds, attempt, retriesLimit) + logboek.Context(ctx).Warn().LogLn(msg) + + time.Sleep(time.Duration(seconds) * time.Second) + continue attemptLoop + } + } + + return err + } + + return nil + } + + return fmt.Errorf("retries limit reached: %w", err) +} + +func (api *api) writeToRemote(ctx context.Context, ref name.Reference, img v1.Image) error { + oldDefaultTransport := http.DefaultTransport + http.DefaultTransport = api.getHttpTransport() + defer func() { + http.DefaultTransport = oldDefaultTransport + }() + + c := make(chan v1.Update, 200) + + go remote.Write(ref, img, remote.WithAuthFromKeychain(authn.DefaultKeychain), remote.WithProgress(c)) + + for upd := range c { + switch { + case upd.Error != nil && errors.Is(upd.Error, io.EOF): + logboek.Context(ctx).Debug().LogF("(%d/%d) done pushing image %q\n", upd.Complete, upd.Total, ref.String()) + return nil + case upd.Error != nil: + return fmt.Errorf("error pushing image: %w", upd.Error) + default: + logboek.Context(ctx).Debug().LogF("(%d/%d) pushing image %s is in progress\n", upd.Complete, upd.Total, ref.String()) + } + } + + return nil +} + +func (api *api) PullImageArchive(ctx context.Context, archiveWriter io.Writer, reference string) error { + ref, err := name.ParseReference(reference, api.parseReferenceOptions()...) + if err != nil { + return fmt.Errorf("unable to parse reference %q: %w", reference, err) + } + + desc, err := remote.Get(ref, api.defaultRemoteOptions()...) + if err != nil { + return fmt.Errorf("getting reference %q: %w", reference, err) + } + + img, err := desc.Image() + if err != nil { + return fmt.Errorf("unable to resolve image manifest for reference %q: %w", reference, err) + } + + c := make(chan v1.Update, 200) + + go tarball.Write(ref, img, archiveWriter, tarball.WithProgress(c)) + + for upd := range c { + switch { + case upd.Error != nil && errors.Is(upd.Error, io.EOF): + logboek.Context(ctx).Debug().LogF("(%d/%d) done pulling image %s to archive\n", upd.Complete, upd.Total, reference) + return nil + case upd.Error != nil: + return fmt.Errorf("error receiving image data: %w", upd.Error) + default: + logboek.Context(ctx).Debug().LogF("(%d/%d) pulling image %s is in progress\n", upd.Complete, upd.Total, reference) + } + } + + return nil +} + func ValidateRepositoryReference(reference string) error { reg := regexp.MustCompile(`^` + dockerReference.NameRegexp.String() + `$`) if !reg.MatchString(reference) { diff --git a/pkg/docker_registry/interface.go b/pkg/docker_registry/interface.go index 937901cc3a..76dd165fd9 100644 --- a/pkg/docker_registry/interface.go +++ b/pkg/docker_registry/interface.go @@ -2,6 +2,7 @@ package docker_registry import ( "context" + "io" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -19,6 +20,10 @@ type Interface interface { DeleteRepoImage(ctx context.Context, repoImage *image.Info) error PushImage(ctx context.Context, reference string, opts *PushImageOptions) error MutateAndPushImage(ctx context.Context, sourceReference, destinationReference string, mutateConfigFunc func(v1.Config) (v1.Config, error)) error + + PushImageArchive(ctx context.Context, archiveOpener ArchiveOpener, reference string) error + PullImageArchive(ctx context.Context, archiveWriter io.Writer, reference string) error + String() string parseReferenceParts(reference string) (referenceParts, error) @@ -27,3 +32,7 @@ type Interface interface { type ApiInterface interface { GetRepoImageConfigFile(ctx context.Context, reference string) (*v1.ConfigFile, error) } + +type ArchiveOpener interface { + Open() (io.ReadCloser, error) +}