diff --git a/cli/command_repository_validate_provider.go b/cli/command_repository_validate_provider.go index dd0f8b4fcf..caf48dbdac 100644 --- a/cli/command_repository_validate_provider.go +++ b/cli/command_repository_validate_provider.go @@ -23,6 +23,7 @@ func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, paren cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers) cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers) cmd.Flag("get-metadata-workers", "Number of GetMetadata workers").IntVar(&c.opt.NumGetMetadataWorkers) + cmd.Flag("support-idempotent-creates", "Whether store supports idempotent blob creates").BoolVar(&c.opt.SupportIdempotentCreates) c.out.setup(svc) cmd.Action(c.out.svc.directRepositoryWriteAction(c.run)) diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 76a7c8a7f5..ba778c46d6 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -76,8 +76,11 @@ func (s *mapStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata } func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error { - if opts.HasRetentionOptions() { + switch { + case opts.HasRetentionOptions(): return errors.New("setting blob-retention is not supported") + case opts.DoNotRecreate: + return errors.New("setting blob do-not-recreate is not supported") } s.mutex.Lock() diff --git a/internal/blobtesting/verify.go b/internal/blobtesting/verify.go index 664f118ec6..9d8c865100 100644 --- a/internal/blobtesting/verify.go +++ b/internal/blobtesting/verify.go @@ -108,14 +108,21 @@ func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob. }) t.Run("OverwriteBlobs", func(t *testing.T) { + newContents := []byte{99} + for _, b := range blocks { b := b t.Run(string(b.blk), func(t *testing.T) { t.Parallel() - - require.NoErrorf(t, r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{}), "can't put blob: %v", b) - AssertGetBlob(ctx, t, r, b.blk, b.contents) + err := r.PutBlob(ctx, b.blk, gather.FromSlice(newContents), opts) + if opts.DoNotRecreate { + require.ErrorIsf(t, err, blob.ErrBlobAlreadyExists, "overwrote blob: %v", b) + AssertGetBlob(ctx, t, r, b.blk, b.contents) + } else { + require.NoErrorf(t, err, "can't put blob: %v", b) + AssertGetBlob(ctx, t, r, b.blk, newContents) + } }) } }) @@ -209,11 +216,12 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St // TestValidationOptions is the set of options used when running providing validation from tests. // nolint:gomnd var TestValidationOptions = providervalidation.Options{ - MaxClockDrift: 3 * time.Minute, - ConcurrencyTestDuration: 15 * time.Second, - NumPutBlobWorkers: 3, - NumGetBlobWorkers: 3, - NumGetMetadataWorkers: 3, - NumListBlobsWorkers: 3, - MaxBlobLength: 10e6, + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 15 * time.Second, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, + SupportIdempotentCreates: false, } diff --git a/internal/providervalidation/providervalidation.go b/internal/providervalidation/providervalidation.go index 31c0d41708..39d762c38e 100644 --- a/internal/providervalidation/providervalidation.go +++ b/internal/providervalidation/providervalidation.go @@ -31,18 +31,21 @@ type Options struct { NumGetMetadataWorkers int NumListBlobsWorkers int MaxBlobLength int + + SupportIdempotentCreates bool } // DefaultOptions is the default set of options. // nolint:gomnd var DefaultOptions = Options{ - MaxClockDrift: 3 * time.Minute, - ConcurrencyTestDuration: 30 * time.Second, - NumPutBlobWorkers: 3, - NumGetBlobWorkers: 3, - NumGetMetadataWorkers: 3, - NumListBlobsWorkers: 3, - MaxBlobLength: 10e6, + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 30 * time.Second, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, + SupportIdempotentCreates: false, } const blobIDLength = 16 @@ -51,7 +54,7 @@ var log = logging.Module("providervalidation") // ValidateProvider runs a series of tests against provided storage to validate that // it can be used with Kopia. -// nolint:gomnd,funlen,gocyclo +// nolint:gomnd,funlen,gocyclo,cyclop func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { if os.Getenv("KOPIA_SKIP_PROVIDER_VALIDATION") != "" { return nil @@ -183,6 +186,15 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { return errors.Wrap(err, "error validating concurrency") } + log(ctx).Infof("Validating blob recreation...") + + if !opt.SupportIdempotentCreates { + err := st.PutBlob(ctx, "dummy_id", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true}) + if err == nil { + return errors.New("store should not support put-blob-no-overwrite, expected error") + } + } + log(ctx).Infof("All good.") return nil diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index cd03c47020..39a0c6a212 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -113,8 +113,11 @@ func translateError(err error) error { } func (az *azStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error { - if opts.HasRetentionOptions() { + switch { + case opts.HasRetentionOptions(): return errors.New("setting blob-retention is not supported") + case opts.DoNotRecreate: + return errors.New("setting blob do-not-recreate is not supported") } ctx, cancel := context.WithCancel(ctx) diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index b63bb798e9..a98998e600 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -84,8 +84,11 @@ func translateError(err error) error { var ae *googleapi.Error if errors.As(err, &ae) { - if ae.Code == http.StatusRequestedRangeNotSatisfiable { + switch ae.Code { + case http.StatusRequestedRangeNotSatisfiable: return blob.ErrInvalidRange + case http.StatusPreconditionFailed: + return blob.ErrBlobAlreadyExists } } @@ -106,7 +109,8 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, ctx, cancel := context.WithCancel(ctx) - obj := gcs.bucket.Object(gcs.getObjectNameString(b)) + conds := gcsclient.Conditions{DoesNotExist: opts.DoNotRecreate} + obj := gcs.bucket.Object(gcs.getObjectNameString(b)).If(conds) writer := obj.NewWriter(ctx) writer.ChunkSize = writerChunkSize writer.ContentType = "application/x-kopia" diff --git a/repo/blob/gcs/gcs_storage_test.go b/repo/blob/gcs/gcs_storage_test.go index 8dee07b2c2..10b062797a 100644 --- a/repo/blob/gcs/gcs_storage_test.go +++ b/repo/blob/gcs/gcs_storage_test.go @@ -44,9 +44,19 @@ func TestGCSStorage(t *testing.T) { defer st.Close(ctx) defer blobtesting.CleanupOldData(ctx, t, st, 0) - blobtesting.VerifyStorage(ctx, t, st, blob.PutOptions{}) + options := []blob.PutOptions{ + {}, + {DoNotRecreate: true}, + } + + for _, opt := range options { + blobtesting.VerifyStorage(ctx, t, st, opt) + } + blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) - require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) + validateOpts := blobtesting.TestValidationOptions + validateOpts.SupportIdempotentCreates = true + require.NoError(t, providervalidation.ValidateProvider(ctx, st, validateOpts)) } func TestGCSStorageInvalid(t *testing.T) { diff --git a/repo/blob/retrying/retrying_storage.go b/repo/blob/retrying/retrying_storage.go index 918a012f44..ec796d025a 100644 --- a/repo/blob/retrying/retrying_storage.go +++ b/repo/blob/retrying/retrying_storage.go @@ -71,6 +71,9 @@ func isRetriable(err error) bool { case errors.Is(err, blob.ErrSetTimeUnsupported): return false + case errors.Is(err, blob.ErrBlobAlreadyExists): + return false + default: return true } diff --git a/repo/blob/s3/s3_storage.go b/repo/blob/s3/s3_storage.go index a3c31ce985..bf3ddd32e5 100644 --- a/repo/blob/s3/s3_storage.go +++ b/repo/blob/s3/s3_storage.go @@ -122,7 +122,10 @@ func (s *s3Storage) getVersionMetadata(ctx context.Context, b blob.ID, version s } func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error { - if !opts.SetModTime.IsZero() { + switch { + case opts.DoNotRecreate: + return errors.New("setting blob do-not-recreate is not supported") + case !opts.SetModTime.IsZero(): return blob.ErrSetTimeUnsupported } diff --git a/repo/blob/sharded/sharded.go b/repo/blob/sharded/sharded.go index 8da012b0b3..d04c7eed35 100644 --- a/repo/blob/sharded/sharded.go +++ b/repo/blob/sharded/sharded.go @@ -178,8 +178,11 @@ func (s *Storage) GetMetadata(ctx context.Context, blobID blob.ID) (blob.Metadat // PutBlob implements blob.Storage. func (s *Storage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes, opts blob.PutOptions) error { - if opts.HasRetentionOptions() { + switch { + case opts.HasRetentionOptions(): return errors.New("setting blob-retention is not supported") + case opts.DoNotRecreate: + return errors.New("setting blob do-not-recreate is not supported") } dirPath, filePath, err := s.GetShardedPathAndFilePath(ctx, blobID) diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 5c236a2498..654f807e5f 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -17,6 +17,9 @@ var ErrSetTimeUnsupported = errors.Errorf("SetTime is not supported") // ErrInvalidRange is returned when the requested blob offset or length is invalid. var ErrInvalidRange = errors.Errorf("invalid blob offset or length") +// ErrBlobAlreadyExists is returned when attempting to put a blob that already exists. +var ErrBlobAlreadyExists = errors.New("Blob already exists") + // Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers, // which can be written sequentially or treated as a io.Reader. type Bytes interface { @@ -62,6 +65,9 @@ type PutOptions struct { RetentionMode string RetentionPeriod time.Duration + // if true, PutBlob will fail with ErrBlobAlreadyExists if a blob with the same ID exists. + DoNotRecreate bool + // if not empty, set the provided timestamp on the blob instead of server-assigned, // if unsupported by the server return ErrSetTimeUnsupported SetModTime time.Time