Skip to content

Commit

Permalink
Handle OCI-Chunk-Max-Length header field in blob pushes
Browse files Browse the repository at this point in the history
This header comes from the proposal of adding a way for registries to
limit chunk sizes from the client.

The implementation is simple enough; we're just adding a io.LimitReader
and don't let it read more than the bytes that were specified by the
registry.

Signed-off-by: Gabi Villalonga Simon <gvillalongasimon@cloudflare.com>
  • Loading branch information
gabivlj committed Oct 31, 2023
1 parent 6c694cb commit f7355f4
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 0 deletions.
12 changes: 12 additions & 0 deletions internal/client/blob_writer.go
Expand Up @@ -23,6 +23,10 @@ type httpBlobUpload struct {
location string // always the last value of the location header.
offset int64
closed bool

// maxRange is a way to control the maximum size of the chunk that httpBlobUpload will send to the registry.
// Every ReadFrom and Write call won't read more bytes than the quantity specified in this field
maxRange int64
}

func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
Expand All @@ -37,6 +41,10 @@ func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
}

func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
if hbu.maxRange != 0 {
r = io.LimitReader(r, hbu.maxRange)
}

req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, io.NopCloser(r))
if err != nil {
return 0, err
Expand Down Expand Up @@ -73,6 +81,10 @@ func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
}

func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
if hbu.maxRange != 0 && hbu.maxRange < int64(len(p)) {
p = p[:hbu.maxRange]
}

req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, bytes.NewReader(p))
if err != nil {
return 0, err
Expand Down
144 changes: 144 additions & 0 deletions internal/client/blob_writer_test.go
Expand Up @@ -3,7 +3,9 @@ package client
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"testing"

Expand Down Expand Up @@ -500,3 +502,145 @@ func TestUploadWrite(t *testing.T) {
t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected)
}
}

// tests the case of sending only the bytes that we're limiting on
func TestUploadLimitRange(t *testing.T) {
const numberOfBlobs = 10
const blobSize = 5
const lastBlobOffset = 2

_, b := newRandomBlob(numberOfBlobs*5 + 2)
repo := "test/upload/write"
locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo)
requests := []testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
}

for blob := 0; blob < numberOfBlobs; blob++ {
start := blob * blobSize
end := ((blob + 1) * blobSize) - 1

requests = append(requests, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b[start : end+1],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {fmt.Sprintf("%d-%d", start, end)},
}),
},
})
}

requests = append(requests, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b[numberOfBlobs*blobSize:],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b)-1)},
}),
},
})

t.Run("reader chunked upload", func(t *testing.T) {
m := testutil.RequestResponseMap(requests)
e, c := testServer(m)
defer c()

blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
maxRange: int64(blobSize),
}

reader := bytes.NewBuffer(b)
for i := 0; i < numberOfBlobs; i++ {
blobUpload.location = e + locationPath
n, err := blobUpload.ReadFrom(reader)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}

if n != blobSize {
t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize)
}
}

n, err := blobUpload.ReadFrom(reader)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}

if n != lastBlobOffset {
t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n)
}

_, err = reader.Read([]byte{0, 0, 0})
if !errors.Is(err, io.EOF) {
t.Fatalf("Expected io.EOF when reading blob as the test should've read the whole thing")
}
})

t.Run("buffer chunked upload", func(t *testing.T) {
buff := b
m := testutil.RequestResponseMap(requests)
e, c := testServer(m)
defer c()

blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
maxRange: int64(blobSize),
}

for i := 0; i < numberOfBlobs; i++ {
blobUpload.location = e + locationPath
n, err := blobUpload.Write(buff)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}

if n != blobSize {
t.Fatalf("Unexpected n %v != %v blobSize", n, blobSize)
}

buff = buff[n:]
}

n, err := blobUpload.Write(buff)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}

if n != lastBlobOffset {
t.Fatalf("Expected last write to have written %v but wrote %v", lastBlobOffset, n)
}

buff = buff[n:]
if len(buff) != 0 {
t.Fatalf("Expected length 0 on the buffer body as we should've read the whole thing, but got %v", len(buff))
}
})
}
6 changes: 6 additions & 0 deletions internal/client/repository.go
Expand Up @@ -809,13 +809,19 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO
return nil, err
}

maxRange, err := v2.GetOCIMaxRange(resp)
if err != nil {
return nil, err
}

return &httpBlobUpload{
ctx: ctx,
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
maxRange: maxRange,
}, nil
default:
return nil, HandleHTTPResponseError(resp)
Expand Down
157 changes: 157 additions & 0 deletions internal/client/repository_test.go
Expand Up @@ -551,6 +551,163 @@ func TestBlobUploadChunked(t *testing.T) {
}
}

func TestBlobUploadChunkedOCIChunkMaxLength(t *testing.T) {
const numberOfBlobs = 10
const blobSize = 5
const lastBlobOffset = 2
const totalSize = numberOfBlobs*blobSize + lastBlobOffset
dgst, b1 := newRandomBlob(totalSize)
originalBlob := b1
var m testutil.RequestResponseMap
repo, _ := reference.WithName("test.example.com/uploadrepo")
uuids := []string{uuid.NewString()}
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPost,
Route: "/v2/" + repo.Name() + "/blobs/uploads/",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[0]},
"Docker-Upload-UUID": {uuids[0]},
"Range": {"0-0"},
"OCI-Chunk-Max-Length": {fmt.Sprintf("%d", blobSize)},
}),
},
})
for blob := 0; blob < numberOfBlobs; blob++ {
start := blob * blobSize
end := ((blob + 1) * blobSize) - 1
uuids = append(uuids, uuid.NewString())
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob],
Body: b1[start : end+1],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[blob+1]},
"Docker-Upload-UUID": {uuids[blob+1]},
"Range": {fmt.Sprintf("%d-%d", start, end)},
}),
},
})
}

uuids = append(uuids, uuid.NewString())
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPatch,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-2],
Body: b1[numberOfBlobs*blobSize:],
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1]},
"Docker-Upload-UUID": {uuids[len(uuids)-1]},
"Range": {fmt.Sprintf("%d-%d", numberOfBlobs*blobSize, len(b1)-1)},
}),
},
})

m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodPut,
Route: "/v2/" + repo.Name() + "/blobs/uploads/" + uuids[len(uuids)-1],
QueryParams: map[string][]string{
"digest": {dgst.String()},
},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
"Content-Range": {fmt.Sprintf("0-%d", totalSize-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: http.MethodHead,
Route: "/v2/" + repo.Name() + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(totalSize)},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})

e, c := testServer(m)
defer c()

ctx := context.Background()
r, err := NewRepository(repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)

upload, err := l.Create(ctx)
if err != nil {
t.Fatal(err)
}

if upload.ID() != uuids[0] {
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uuids[0])
}

for i := 0; i < numberOfBlobs; i++ {
n, err := upload.Write(b1)
if err != nil {
t.Fatalf("unexpected error in blob %v: %+v", i, err)
}

if n != blobSize {
t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, blobSize)
}

b1 = b1[n:]
}

n, err := upload.Write(b1)
if err != nil {
t.Fatal(err)
}

if n != lastBlobOffset {
t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, lastBlobOffset)
}

b1 = b1[n:]

blob, err := upload.Commit(ctx, distribution.Descriptor{
Digest: dgst,
Size: int64(len(b1)),
})
if err != nil {
t.Fatal(err)
}

if blob.Size != int64(len(originalBlob)) {
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Size, len(originalBlob))
}

if len(b1) != 0 {
t.Fatalf("Expected to have consumed to the entire buffer at the end, got size %v", len(b1))
}
}

func TestBlobUploadMonolithic(t *testing.T) {
dgst, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap
Expand Down

0 comments on commit f7355f4

Please sign in to comment.