Skip to content

Commit

Permalink
Add logic to detect and optimise random reads.
Browse files Browse the repository at this point in the history
Signed-off-by: Bjorn Leffler <leffler@google.com>
  • Loading branch information
bjornleffler committed Mar 26, 2019
1 parent 8b519ab commit 4c9f926
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions internal/gcsx/random_reader.go
Expand Up @@ -22,9 +22,22 @@ import (
"golang.org/x/net/context"
)

const MB = 1 << 20

// Min read size in bytes for random reads.
// We will not send a request to GCS for less than this many bytes (unless the
// end of the object comes first).
const minReadSize = MB

// Max read size in bytes for random reads.
// If the average read size (between seeks) is below this number, reads will
// optimised for random access.
// We will skip forwards in a GCS response at most this many bytes.
// About 6 MB of data is buffered anyway, so 8 MB seems like a good round number.
const skipForwardThreshold = 8 * 1 << 20
const maxReadSize = 8 * MB

// Minimum number of seeks before evaluating if the read pattern is random.
const minSeeksForRandom = 2

// An object that knows how to read ranges within a particular generation of a
// particular GCS object. Optimised for (large) sequential reads.
Expand Down Expand Up @@ -56,6 +69,8 @@ func NewRandomReader(
bucket: bucket,
start: -1,
limit: -1,
seeks: 0,
totalReadBytes: 0,
}

return
Expand All @@ -79,6 +94,8 @@ type randomReader struct {
// INVARIANT: limit < 0 implies reader != nil
start int64
limit int64
seeks uint64
totalReadBytes uint64
}

func (rr *randomReader) CheckInvariants() {
Expand Down Expand Up @@ -114,12 +131,12 @@ func (rr *randomReader) ReadAt(
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15x improvement in throughput.
if rr.reader != nil && rr.start < offset && offset - rr.start < skipForwardThreshold {
bytes_to_skip := int64(offset - rr.start)
p := make([]byte, bytes_to_skip)
n, _ := rr.reader.Read(p)
rr.start += int64(n)
// is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s.
if rr.reader != nil && rr.start < offset && offset - rr.start < maxReadSize {
bytes_to_skip := int64(offset - rr.start)
p := make([]byte, bytes_to_skip)
n, _ := rr.reader.Read(p)
rr.start += int64(n)
}

// If we have an existing reader but it's positioned at the wrong place,
Expand All @@ -128,6 +145,7 @@ func (rr *randomReader) ReadAt(
rr.reader.Close()
rr.reader = nil
rr.cancel = nil
rr.seeks += 1
}

// If we don't have a reader, start a read operation.
Expand All @@ -148,6 +166,7 @@ func (rr *randomReader) ReadAt(
p = p[tmp:]
rr.start += int64(tmp)
offset += int64(tmp)
rr.totalReadBytes += uint64(tmp)

// Sanity check.
if rr.start > rr.limit {
Expand Down Expand Up @@ -257,15 +276,31 @@ func (rr *randomReader) startRead(
return
}

// GCS requests are expensive. Always issue read requests to the end of
// GCS requests are expensive. Prefer to issue read requests to the end of
// the object. Sequential reads will simply sip from the fire house
// with each call to ReadAt. In practice, GCS will fill the TCP buffers
// with about 6 MB of data. Requests from outside GCP will be charged
// about 6MB of egress data, even if less data is read. Inside GCP
// regions, GCS egress is free. This logic should limit the number of
// GCS read requests, which are not free.

// TODO: Investigate impact on random read workloads.
// But if we notice random read patterns after a minimum number of seeks,
// optimise for random reads. Random reads will read data in chunks of
// (average read size in bytes rounded up to the next MB).
end := int64(rr.object.Size)
if rr.seeks >= minSeeksForRandom {
averageReadBytes := rr.totalReadBytes / rr.seeks
if averageReadBytes < maxReadSize {
randomReadSize := int64(((averageReadBytes / MB) + 1 ) * MB)
if randomReadSize < minReadSize {
randomReadSize = minReadSize
}
if randomReadSize > maxReadSize {
randomReadSize = maxReadSize
}
end = start + randomReadSize
}
}

// Begin the read.
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -276,7 +311,7 @@ func (rr *randomReader) startRead(
Generation: rr.object.Generation,
Range: &gcs.ByteRange{
Start: uint64(start),
Limit: uint64(rr.object.Size),
Limit: uint64(end),
},
})

Expand All @@ -288,7 +323,7 @@ func (rr *randomReader) startRead(
rr.reader = rc
rr.cancel = cancel
rr.start = start
rr.limit = int64(rr.object.Size)
rr.limit = end

return
}

0 comments on commit 4c9f926

Please sign in to comment.