Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand PutBlob API to allow for idempotent creates #341

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/command_repository_validate_provider.go
Expand Up @@ -23,6 +23,7 @@ func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, paren
cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers)
cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers)
cmd.Flag("get-metadata-workers", "Number of GetMetadata workers").IntVar(&c.opt.NumGetMetadataWorkers)
cmd.Flag("support-idempotent-creates", "Whether store supports idempotent blob creates").BoolVar(&c.opt.SupportIdempotentCreates)
c.out.setup(svc)

cmd.Action(c.out.svc.directRepositoryWriteAction(c.run))
Expand Down
5 changes: 4 additions & 1 deletion internal/blobtesting/map.go
Expand Up @@ -76,8 +76,11 @@ func (s *mapStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata
}

func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
if opts.HasRetentionOptions() {
switch {
case opts.HasRetentionOptions():
return errors.New("setting blob-retention is not supported")
case opts.DoNotRecreate:
return errors.New("setting blob do-not-recreate is not supported")
}

s.mutex.Lock()
Expand Down
28 changes: 18 additions & 10 deletions internal/blobtesting/verify.go
Expand Up @@ -108,14 +108,21 @@ func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob.
})

t.Run("OverwriteBlobs", func(t *testing.T) {
newContents := []byte{99}

for _, b := range blocks {
b := b

t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()

require.NoErrorf(t, r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{}), "can't put blob: %v", b)
AssertGetBlob(ctx, t, r, b.blk, b.contents)
err := r.PutBlob(ctx, b.blk, gather.FromSlice(newContents), opts)
if opts.DoNotRecreate {
require.ErrorIsf(t, err, blob.ErrBlobAlreadyExists, "overwrote blob: %v", b)
AssertGetBlob(ctx, t, r, b.blk, b.contents)
} else {
require.NoErrorf(t, err, "can't put blob: %v", b)
AssertGetBlob(ctx, t, r, b.blk, newContents)
}
})
}
})
Expand Down Expand Up @@ -209,11 +216,12 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St
// TestValidationOptions is the set of options used when running providing validation from tests.
// nolint:gomnd
var TestValidationOptions = providervalidation.Options{
MaxClockDrift: 3 * time.Minute,
ConcurrencyTestDuration: 15 * time.Second,
NumPutBlobWorkers: 3,
NumGetBlobWorkers: 3,
NumGetMetadataWorkers: 3,
NumListBlobsWorkers: 3,
MaxBlobLength: 10e6,
MaxClockDrift: 3 * time.Minute,
ConcurrencyTestDuration: 15 * time.Second,
NumPutBlobWorkers: 3,
NumGetBlobWorkers: 3,
NumGetMetadataWorkers: 3,
NumListBlobsWorkers: 3,
MaxBlobLength: 10e6,
SupportIdempotentCreates: false,
}
28 changes: 20 additions & 8 deletions internal/providervalidation/providervalidation.go
Expand Up @@ -31,18 +31,21 @@ type Options struct {
NumGetMetadataWorkers int
NumListBlobsWorkers int
MaxBlobLength int

SupportIdempotentCreates bool
}

// DefaultOptions is the default set of options.
// nolint:gomnd
var DefaultOptions = Options{
MaxClockDrift: 3 * time.Minute,
ConcurrencyTestDuration: 30 * time.Second,
NumPutBlobWorkers: 3,
NumGetBlobWorkers: 3,
NumGetMetadataWorkers: 3,
NumListBlobsWorkers: 3,
MaxBlobLength: 10e6,
MaxClockDrift: 3 * time.Minute,
ConcurrencyTestDuration: 30 * time.Second,
NumPutBlobWorkers: 3,
NumGetBlobWorkers: 3,
NumGetMetadataWorkers: 3,
NumListBlobsWorkers: 3,
MaxBlobLength: 10e6,
SupportIdempotentCreates: false,
}

const blobIDLength = 16
Expand All @@ -51,7 +54,7 @@ var log = logging.Module("providervalidation")

// ValidateProvider runs a series of tests against provided storage to validate that
// it can be used with Kopia.
// nolint:gomnd,funlen,gocyclo
// nolint:gomnd,funlen,gocyclo,cyclop
func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
if os.Getenv("KOPIA_SKIP_PROVIDER_VALIDATION") != "" {
return nil
Expand Down Expand Up @@ -183,6 +186,15 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
return errors.Wrap(err, "error validating concurrency")
}

log(ctx).Infof("Validating blob recreation...")

if !opt.SupportIdempotentCreates {
err := st.PutBlob(ctx, "dummy_id", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true})
if err == nil {
return errors.New("store should not support put-blob-no-overwrite, expected error")
}
}

log(ctx).Infof("All good.")

return nil
Expand Down
5 changes: 4 additions & 1 deletion repo/blob/azure/azure_storage.go
Expand Up @@ -113,8 +113,11 @@ func translateError(err error) error {
}

func (az *azStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error {
if opts.HasRetentionOptions() {
switch {
case opts.HasRetentionOptions():
return errors.New("setting blob-retention is not supported")
case opts.DoNotRecreate:
return errors.New("setting blob do-not-recreate is not supported")
}

ctx, cancel := context.WithCancel(ctx)
Expand Down
8 changes: 6 additions & 2 deletions repo/blob/gcs/gcs_storage.go
Expand Up @@ -84,8 +84,11 @@ func translateError(err error) error {
var ae *googleapi.Error

if errors.As(err, &ae) {
if ae.Code == http.StatusRequestedRangeNotSatisfiable {
switch ae.Code {
case http.StatusRequestedRangeNotSatisfiable:
return blob.ErrInvalidRange
case http.StatusPreconditionFailed:
return blob.ErrBlobAlreadyExists
Comment on lines +90 to +91

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this implementation, it is the case that a precondition failure means the blob already exists, since it is the only condition being set.

But a StatusPreconditionFailed could be returned if a different precondition were set. How can we future proof this so a bug is not inadvertently introduced if for some reason we end up setting other pre condition checks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @julio-lopez, the context needed for this is not available within translateError(), so we'd have to pass the put options opts into translateError(), or do some special error handling outside said function. Neither solution is particularly neat, but I am leaning towards the former. Thoughts?

Copy link

@Shrekster Shrekster Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to say this, but how about a simple error string match ?

I think passing in an error code or wrapping the error with more context in the same function where the pre-conditions are set is a better option, but I would avoid putting too much scope on this one PR.

Doing the wrap closer to the condition setup is the key. Passing in put-options is also a neat idea, but it may not be quite intuitive to code readers why a generic-op translateError() needs the put-opts. Not saying it cannot/should not be done though.

}
}

Expand All @@ -106,7 +109,8 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes,

ctx, cancel := context.WithCancel(ctx)

obj := gcs.bucket.Object(gcs.getObjectNameString(b))
conds := gcsclient.Conditions{DoesNotExist: opts.DoNotRecreate}
obj := gcs.bucket.Object(gcs.getObjectNameString(b)).If(conds)
writer := obj.NewWriter(ctx)
writer.ChunkSize = writerChunkSize
writer.ContentType = "application/x-kopia"
Expand Down
14 changes: 12 additions & 2 deletions repo/blob/gcs/gcs_storage_test.go
Expand Up @@ -44,9 +44,19 @@ func TestGCSStorage(t *testing.T) {
defer st.Close(ctx)
defer blobtesting.CleanupOldData(ctx, t, st, 0)

blobtesting.VerifyStorage(ctx, t, st, blob.PutOptions{})
options := []blob.PutOptions{
{},
{DoNotRecreate: true},
}

for _, opt := range options {
blobtesting.VerifyStorage(ctx, t, st, opt)
}

blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
validateOpts := blobtesting.TestValidationOptions
validateOpts.SupportIdempotentCreates = true
require.NoError(t, providervalidation.ValidateProvider(ctx, st, validateOpts))
}

func TestGCSStorageInvalid(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions repo/blob/retrying/retrying_storage.go
Expand Up @@ -71,6 +71,9 @@ func isRetriable(err error) bool {
case errors.Is(err, blob.ErrSetTimeUnsupported):
return false

case errors.Is(err, blob.ErrBlobAlreadyExists):
return false

default:
return true
}
Expand Down
5 changes: 4 additions & 1 deletion repo/blob/s3/s3_storage.go
Expand Up @@ -122,7 +122,10 @@ func (s *s3Storage) getVersionMetadata(ctx context.Context, b blob.ID, version s
}

func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error {
if !opts.SetModTime.IsZero() {
switch {
case opts.DoNotRecreate:
return errors.New("setting blob do-not-recreate is not supported")
case !opts.SetModTime.IsZero():
return blob.ErrSetTimeUnsupported
}

Expand Down
5 changes: 4 additions & 1 deletion repo/blob/sharded/sharded.go
Expand Up @@ -178,8 +178,11 @@ func (s *Storage) GetMetadata(ctx context.Context, blobID blob.ID) (blob.Metadat

// PutBlob implements blob.Storage.
func (s *Storage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes, opts blob.PutOptions) error {
if opts.HasRetentionOptions() {
switch {
case opts.HasRetentionOptions():
return errors.New("setting blob-retention is not supported")
case opts.DoNotRecreate:
return errors.New("setting blob do-not-recreate is not supported")
Comment on lines +184 to +185

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be a function of the actual storage backend? It is true that currently the sharded provider is only used with storage backends that do not support DoNotRecreate, but still, it seems that it should be relayed to the backend.

Copy link
Author

@adowair adowair Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point @julio-lopez. I can push this logic down to the individual storage providers.

@Shrekster , I see you implement handling of opts.HasRetentionOpts() in a similar way to me. Should that be pushed down to the storage implementations too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Yes, my bad. I should have taken opts.HasRetentionOpts() down to the three implementations of the shared.Impl.PutBlobInPath interface.

@adowair Can you move both the checks to the storage backend implementations of the shared interface ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd may be a good idea to break down and restructure the PR in self-contained changes (commits) that can be individually reviewed. For example, the change about passing down the options to the storage backends can be one of those steps. Merely a suggestion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the right way to do things. Rebasing upstream.

}

dirPath, filePath, err := s.GetShardedPathAndFilePath(ctx, blobID)
Expand Down
6 changes: 6 additions & 0 deletions repo/blob/storage.go
Expand Up @@ -17,6 +17,9 @@ var ErrSetTimeUnsupported = errors.Errorf("SetTime is not supported")
// ErrInvalidRange is returned when the requested blob offset or length is invalid.
var ErrInvalidRange = errors.Errorf("invalid blob offset or length")

// ErrBlobAlreadyExists is returned when attempting to put a blob that already exists.
var ErrBlobAlreadyExists = errors.New("Blob already exists")

// Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers,
// which can be written sequentially or treated as a io.Reader.
type Bytes interface {
Expand Down Expand Up @@ -62,6 +65,9 @@ type PutOptions struct {
RetentionMode string
RetentionPeriod time.Duration

// if true, PutBlob will fail with ErrBlobAlreadyExists if a blob with the same ID exists.
DoNotRecreate bool

// if not empty, set the provided timestamp on the blob instead of server-assigned,
// if unsupported by the server return ErrSetTimeUnsupported
SetModTime time.Time
Expand Down