Skip to content

Commit

Permalink
Merge pull request #47629 from vvoland/tarexport-tracing-ctx-cancel
Browse files Browse the repository at this point in the history
tarexport: Plumb ctx, add OTEL spans, handle cancellation
  • Loading branch information
thaJeztah committed May 14, 2024
2 parents 5505c85 + ad0f263 commit ae976b9
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 21 deletions.
4 changes: 2 additions & 2 deletions daemon/images/image_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// outStream is the writer which the images are written to.
func (i *ImageService) ExportImage(ctx context.Context, names []string, outStream io.Writer) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Save(names, outStream)
return imageExporter.Save(ctx, names, outStream)
}

func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Container, fn func(root string) error) error {
Expand Down Expand Up @@ -46,5 +46,5 @@ func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Conta
// ball containing images and metadata.
func (i *ImageService) LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Load(inTar, outStream, quiet)
return imageExporter.Load(ctx, inTar, outStream, quiet)
}
5 changes: 3 additions & 2 deletions image/image.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package image // import "github.com/docker/docker/image"

import (
"context"
"encoding/json"
"errors"
"io"
Expand Down Expand Up @@ -279,9 +280,9 @@ func NewHistory(author, comment, createdBy string, isEmptyLayer bool) History {

// Exporter provides interface for loading and saving images
type Exporter interface {
Load(io.ReadCloser, io.Writer, bool) error
Load(context.Context, io.ReadCloser, io.Writer, bool) error
// TODO: Load(net.Context, io.ReadCloser, <- chan StatusMessage) error
Save([]string, io.Writer) error
Save(context.Context, []string, io.Writer) error
}

// NewFromJSON creates an Image configuration from json.
Expand Down
47 changes: 41 additions & 6 deletions image/tarexport/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"reflect"
"runtime"

"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/image"
v1 "github.com/docker/docker/image/v1"
"github.com/docker/docker/internal/ioutils"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive"
Expand All @@ -28,7 +30,13 @@ import (
"github.com/opencontainers/go-digest"
)

func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
func (l *tarexporter) Load(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) (outErr error) {
ctx, span := tracing.StartSpan(ctx, "tarexport.Load")
defer span.End()
defer func() {
span.SetStatus(outErr)
}()

var progressOutput progress.Output
if !quiet {
progressOutput = streamformatter.NewJSONProgressOutput(outStream, false)
Expand All @@ -41,9 +49,10 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}
defer os.RemoveAll(tmpDir)

if err := chrootarchive.Untar(inTar, tmpDir, nil); err != nil {
if err := untar(ctx, inTar, tmpDir); err != nil {
return err
}

// read manifest, if no file then load in legacy mode
manifestPath, err := safePath(tmpDir, manifestFileName)
if err != nil {
Expand Down Expand Up @@ -72,6 +81,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
var imageRefCount int

for _, m := range manifest {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
configPath, err := safePath(tmpDir, m.Config)
if err != nil {
return err
Expand All @@ -95,6 +109,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}

for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
layerPath, err := safePath(tmpDir, m.Layers[i])
if err != nil {
return err
Expand All @@ -103,7 +122,7 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
r.Append(diffID)
newLayer, err := l.lss.Get(r.ChainID())
if err != nil {
newLayer, err = l.loadLayer(layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
newLayer, err = l.loadLayer(ctx, layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,6 +174,15 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
return nil
}

func untar(ctx context.Context, inTar io.ReadCloser, tmpDir string) error {
_, trace := tracing.StartSpan(ctx, "chrootarchive.Untar")
defer trace.End()

err := chrootarchive.Untar(ioutils.NewCtxReader(ctx, inTar), tmpDir, nil)
trace.SetStatus(err)
return err
}

func (l *tarexporter) setParentID(id, parentID image.ID) error {
img, err := l.is.Get(id)
if err != nil {
Expand All @@ -170,7 +198,14 @@ func (l *tarexporter) setParentID(id, parentID image.ID) error {
return l.is.SetParent(id, parentID)
}

func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (layer.Layer, error) {
func (l *tarexporter) loadLayer(ctx context.Context, filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (_ layer.Layer, outErr error) {
ctx, span := tracing.StartSpan(ctx, "loadLayer")
span.SetAttributes(tracing.Attribute("image.id", id))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()

// We use sequential file access to avoid depleting the standby list on Windows.
// On Linux, this equates to a regular os.Open.
rawTar, err := sequential.Open(filename)
Expand All @@ -193,7 +228,7 @@ func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string,
r = rawTar
}

inflatedLayerData, err := archive.DecompressStream(r)
inflatedLayerData, err := archive.DecompressStream(ioutils.NewCtxReader(ctx, r))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,7 +367,7 @@ func (l *tarexporter) legacyLoadImage(oldID, sourceDir string, loadedMap map[str
if err != nil {
return err
}
newLayer, err := l.loadLayer(layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
newLayer, err := l.loadLayer(context.TODO(), layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
if err != nil {
return err
}
Expand Down
68 changes: 57 additions & 11 deletions image/tarexport/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"time"

"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/image"
v1 "github.com/docker/docker/image/v1"
"github.com/docker/docker/internal/ioutils"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/system"
Expand All @@ -42,20 +44,20 @@ type saveSession struct {
savedConfigs map[string]struct{}
}

func (l *tarexporter) Save(names []string, outStream io.Writer) error {
images, err := l.parseNames(names)
func (l *tarexporter) Save(ctx context.Context, names []string, outStream io.Writer) error {
images, err := l.parseNames(ctx, names)
if err != nil {
return err
}

// Release all the image top layer references
defer l.releaseLayerReferences(images)
return (&saveSession{tarexporter: l, images: images}).save(outStream)
return (&saveSession{tarexporter: l, images: images}).save(ctx, outStream)
}

// parseNames will parse the image names to a map which contains image.ID to *imageDescriptor.
// Each imageDescriptor holds an image top layer reference named 'layerRef'. It is taken here, should be released later.
func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
func (l *tarexporter) parseNames(ctx context.Context, names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
imgDescr := make(map[image.ID]*imageDescriptor)
defer func() {
if rErr != nil {
Expand Down Expand Up @@ -92,6 +94,12 @@ func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescri
}

for _, name := range names {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

ref, err := reference.ParseAnyReference(name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,7 +187,7 @@ func (l *tarexporter) releaseLayerReferences(imgDescr map[image.ID]*imageDescrip
return nil
}

func (s *saveSession) save(outStream io.Writer) error {
func (s *saveSession) save(ctx context.Context, outStream io.Writer) error {
s.savedConfigs = make(map[string]struct{})
s.savedLayers = make(map[layer.DiffID]distribution.Descriptor)

Expand All @@ -199,7 +207,13 @@ func (s *saveSession) save(outStream io.Writer) error {
var manifestDescriptors []ocispec.Descriptor

for id, imageDescr := range s.images {
foreignSrcs, err := s.saveImage(id)
select {
case <-ctx.Done():
return ctx.Err()
default:
}

foreignSrcs, err := s.saveImage(ctx, id)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,17 +384,34 @@ func (s *saveSession) save(outStream io.Writer) error {
return errors.Wrap(err, "error writing oci index file")
}

return s.writeTar(ctx, tempDir, outStream)
}

func (s *saveSession) writeTar(ctx context.Context, tempDir string, outStream io.Writer) error {
ctx, span := tracing.StartSpan(ctx, "writeTar")
defer span.End()

fs, err := archive.Tar(tempDir, archive.Uncompressed)
if err != nil {
span.SetStatus(err)
return err
}
defer fs.Close()

_, err = io.Copy(outStream, fs)
_, err = ioutils.CopyCtx(ctx, outStream, fs)

span.SetStatus(err)
return err
}

func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Descriptor, error) {
func (s *saveSession) saveImage(ctx context.Context, id image.ID) (_ map[layer.DiffID]distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveImage")
span.SetAttributes(tracing.Attribute("image.id", id.String()))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()

img := s.images[id].image
if len(img.RootFS.DiffIDs) == 0 {
return nil, fmt.Errorf("empty export - not implemented")
Expand All @@ -390,6 +421,11 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
var layers []layer.DiffID
var foreignSrcs map[layer.DiffID]distribution.Descriptor
for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
v1ImgCreated := time.Unix(0, 0)
v1Img := image.V1Image{
// This is for backward compatibility used for
Expand All @@ -412,7 +448,7 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
}

v1Img.OS = img.OS
src, err := s.saveConfigAndLayer(rootFS.ChainID(), v1Img, img.Created)
src, err := s.saveConfigAndLayer(ctx, rootFS.ChainID(), v1Img, img.Created)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -457,7 +493,17 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
return foreignSrcs, nil
}

func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (distribution.Descriptor, error) {
func (s *saveSession) saveConfigAndLayer(ctx context.Context, id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (_ distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveConfigAndLayer")
span.SetAttributes(
tracing.Attribute("layer.id", id.String()),
tracing.Attribute("image.id", legacyImg.ID),
)
defer span.End()
defer func() {
span.SetStatus(outErr)
}()

outDir := filepath.Join(s.outDir, ocispec.ImageBlobsDir)

if _, ok := s.savedConfigs[legacyImg.ID]; !ok {
Expand Down Expand Up @@ -512,7 +558,7 @@ func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Ima
digester := digest.Canonical.Digester()
digestedArch := io.TeeReader(arch, digester.Hash())

tarSize, err := io.Copy(tarFile, digestedArch)
tarSize, err := ioutils.CopyCtx(ctx, tarFile, digestedArch)
if err != nil {
return distribution.Descriptor{}, err
}
Expand Down
57 changes: 57 additions & 0 deletions internal/ioutils/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ioutils

import (
"context"
"io"
)

// CopyCtx copies from src to dst until either EOF is reached on src or a context is cancelled.
// The writer is not closed when the context is cancelled.
//
// After CopyCtx exits due to context cancellation, the goroutine that performed
// the copy may still be running if either the reader or writer blocks.
func CopyCtx(ctx context.Context, dst io.Writer, src io.Reader) (n int64, err error) {
copyDone := make(chan struct{})

src = &readerCtx{ctx: ctx, r: src}

go func() {
n, err = io.Copy(dst, src)
close(copyDone)
}()

select {
case <-ctx.Done():
return -1, ctx.Err()
case <-copyDone:
}

return n, err
}

type readerCtx struct {
ctx context.Context
r io.Reader
}

// NewCtxReader wraps the given reader with a reader that doesn't proceed with
// reading if the context is done.
//
// Note: Read will still block if the underlying reader blocks.
func NewCtxReader(ctx context.Context, r io.Reader) io.Reader {
return &readerCtx{ctx: ctx, r: r}
}

func (r *readerCtx) Read(p []byte) (n int, err error) {
if err := r.ctx.Err(); err != nil {
return 0, err
}

n, outErr := r.r.Read(p)

if err := r.ctx.Err(); err != nil {
return 0, err
}

return n, outErr
}

0 comments on commit ae976b9

Please sign in to comment.