forked from kopia/kopia
/
storage.go
284 lines (219 loc) · 7.99 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package blob
import (
"context"
"encoding/json"
"io"
"sync"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
// ErrSetTimeUnsupported is returned by implementations of Storage that don't support SetTime.
var ErrSetTimeUnsupported = errors.Errorf("SetTime is not supported")
// ErrInvalidRange is returned when the requested blob offset or length is invalid.
var ErrInvalidRange = errors.Errorf("invalid blob offset or length")
// ErrBlobAlreadyExists is returned when attempting to put a blob that already exists.
var ErrBlobAlreadyExists = errors.New("Blob already exists")
// Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers,
// which can be written sequentially or treated as a io.Reader.
type Bytes interface {
io.WriterTo
Length() int
Reader() io.ReadSeekCloser
}
// OutputBuffer is implemented by *gather.WriteBuffer.
type OutputBuffer interface {
io.Writer
Reset()
Length() int
}
// Reader defines read access API to blob storage.
type Reader interface {
// GetBlob returns full or partial contents of a blob with given ID.
// If length>0, the the function retrieves a range of bytes [offset,offset+length)
// If length<0, the entire blob must be fetched.
// Returns ErrInvalidRange if the fetched blob length is invalid.
GetBlob(ctx context.Context, blobID ID, offset, length int64, output OutputBuffer) error
// GetMetadata returns Metadata about single blob.
GetMetadata(ctx context.Context, blobID ID) (Metadata, error)
// ListBlobs invokes the provided callback for each blob in the storage.
// Iteration continues until the callback returns an error or until all matching blobs have been reported.
ListBlobs(ctx context.Context, blobIDPrefix ID, cb func(bm Metadata) error) error
// ConnectionInfo returns JSON-serializable data structure containing information required to
// connect to storage.
ConnectionInfo() ConnectionInfo
// Name of the storage used for quick identification by humans.
DisplayName() string
}
// PutOptions represents put-options for a single BLOB in a storage.
type PutOptions struct {
RetentionMode string
RetentionPeriod time.Duration
// if true, PutBlob will fail with ErrBlobAlready exists if a blob with the same ID exists.
DoNotRecreate bool
// if not empty, set the provided timestamp on the blob instead of server-assigned,
// if unsupported by the server return ErrSetTimeUnsupported
SetModTime time.Time
GetModTime *time.Time // if != nil, populate the value pointed at with the actual modification time
}
// HasRetentionOptions returns true when blob-retention settings have been
// specified, otherwise retruns false.
func (o PutOptions) HasRetentionOptions() bool {
return o.RetentionPeriod != 0 || o.RetentionMode != ""
}
// Storage encapsulates API for connecting to blob storage.
//
// The underlying storage system must provide:
//
// * high durability, availability and bit-rot protection
// * read-after-write - blob written using PubBlob() must be immediately readable using GetBlob() and ListBlobs()
// * atomicity - it mustn't be possible to observe partial results of PubBlob() via either GetBlob() or ListBlobs()
// * timestamps that don't go back in time (small clock skew up to minutes is allowed)
// * reasonably low latency for retrievals
//
// The required semantics are provided by existing commercial cloud storage products (Google Cloud, AWS, Azure).
type Storage interface {
Reader
// PutBlob uploads the blob with given data to the repository or replaces existing blob with the provided
// id with contents gathered from the specified list of slices.
PutBlob(ctx context.Context, blobID ID, data Bytes, opts PutOptions) error
// DeleteBlob removes the blob from storage. Future Get() operations will fail with ErrNotFound.
DeleteBlob(ctx context.Context, blobID ID) error
// Close releases all resources associated with storage.
Close(ctx context.Context) error
// FlushCaches flushes any local caches associated with storage.
FlushCaches(ctx context.Context) error
}
// ID is a string that represents blob identifier.
type ID string
// Metadata represents metadata about a single BLOB in a storage.
type Metadata struct {
BlobID ID `json:"id"`
Length int64 `json:"length"`
Timestamp time.Time `json:"timestamp"`
}
func (m *Metadata) String() string {
b, _ := json.Marshal(m)
return string(b)
}
// ErrBlobNotFound is returned when a BLOB cannot be found in storage.
var ErrBlobNotFound = errors.New("BLOB not found")
// ListAllBlobs returns Metadata for all blobs in a given storage that have the provided name prefix.
func ListAllBlobs(ctx context.Context, st Storage, prefix ID) ([]Metadata, error) {
var result []Metadata
err := st.ListBlobs(ctx, prefix, func(bm Metadata) error {
result = append(result, bm)
return nil
})
return result, errors.Wrap(err, "error listing all blobs")
}
// IterateAllPrefixesInParallel invokes the provided callback and returns the first error returned by the callback or nil.
func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Storage, prefixes []ID, callback func(Metadata) error) error {
if len(prefixes) == 1 {
// nolint:wrapcheck
return st.ListBlobs(ctx, prefixes[0], callback)
}
if parallelism <= 0 {
parallelism = 1
}
var wg sync.WaitGroup
semaphore := make(chan struct{}, parallelism)
errch := make(chan error, len(prefixes))
for _, prefix := range prefixes {
wg.Add(1)
prefix := prefix
// acquire semaphore
semaphore <- struct{}{}
go func() {
defer wg.Done()
defer func() {
<-semaphore // release semaphore
}()
if err := st.ListBlobs(ctx, prefix, callback); err != nil {
errch <- err
}
}()
}
wg.Wait()
close(errch)
// return first error or nil
return <-errch
}
// EnsureLengthExactly validates that length of the given slice is exactly the provided value.
// and returns ErrInvalidRange if the length is of the slice if not.
// As a special case length < 0 disables validation.
func EnsureLengthExactly(gotLength int, length int64) error {
if length < 0 {
return nil
}
if gotLength != int(length) {
return errors.Wrapf(ErrInvalidRange, "invalid length %v, expected %v", gotLength, length)
}
return nil
}
// IDsFromMetadata returns IDs for blobs in Metadata slice.
func IDsFromMetadata(mds []Metadata) []ID {
ids := make([]ID, len(mds))
for i, md := range mds {
ids[i] = md.BlobID
}
return ids
}
// TotalLength returns minimum timestamp for blobs in Metadata slice.
func TotalLength(mds []Metadata) int64 {
var total int64
for _, md := range mds {
total += md.Length
}
return total
}
// MinTimestamp returns minimum timestamp for blobs in Metadata slice.
func MinTimestamp(mds []Metadata) time.Time {
min := time.Time{}
for _, md := range mds {
if min.IsZero() || md.Timestamp.Before(min) {
min = md.Timestamp
}
}
return min
}
// MaxTimestamp returns maxinum timestamp for blobs in Metadata slice.
func MaxTimestamp(mds []Metadata) time.Time {
max := time.Time{}
for _, md := range mds {
if md.Timestamp.After(max) {
max = md.Timestamp
}
}
return max
}
// DeleteMultiple deletes multiple blobs in parallel.
func DeleteMultiple(ctx context.Context, st Storage, ids []ID, parallelism int) error {
eg, ctx := errgroup.WithContext(ctx)
sem := make(chan struct{}, parallelism)
for _, id := range ids {
// acquire semaphore
sem <- struct{}{}
id := id
eg.Go(func() error {
defer func() {
<-sem // release semaphore
}()
return errors.Wrapf(st.DeleteBlob(ctx, id), "deleting %v", id)
})
}
return errors.Wrap(eg.Wait(), "error deleting blobs")
}
// PutBlobAndGetMetadata invokes PutBlob and returns the resulting Metadata.
func PutBlobAndGetMetadata(ctx context.Context, st Storage, blobID ID, data Bytes, opts PutOptions) (Metadata, error) {
// ensure GetModTime is set, or reuse existing one.
if opts.GetModTime == nil {
opts.GetModTime = new(time.Time)
}
err := st.PutBlob(ctx, blobID, data, opts)
return Metadata{
BlobID: blobID,
Length: int64(data.Length()),
Timestamp: *opts.GetModTime,
}, err // nolint:wrapcheck
}