Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a bug where AzCopy was not reporting performance numbers #2636

Merged
merged 12 commits into from
May 21, 2024
6 changes: 3 additions & 3 deletions cmd/credentialUtil.go
Expand Up @@ -376,7 +376,7 @@ func isPublic(ctx context.Context, blobResourceURL string, cpkOptions common.Cpk
MaxRetryDelay: ste.UploadMaxRetryDelay,
}, policy.TelemetryOptions{
ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent),
}, nil, nil, ste.LogOptions{}, nil)
}, nil, ste.LogOptions{}, nil)

blobClient, _ := blob.NewClientWithNoCredential(bURLParts.String(), &blob.ClientOptions{ClientOptions: clientOptions})
bURLParts.BlobName = ""
Expand Down Expand Up @@ -409,7 +409,7 @@ func mdAccountNeedsOAuth(ctx context.Context, blobResourceURL string, cpkOptions
MaxRetryDelay: ste.UploadMaxRetryDelay,
}, policy.TelemetryOptions{
ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent),
}, nil, nil, ste.LogOptions{}, nil)
}, nil, ste.LogOptions{}, nil)

blobClient, _ := blob.NewClientWithNoCredential(blobResourceURL, &blob.ClientOptions{ClientOptions: clientOptions})
_, err := blobClient.GetProperties(ctx, &blob.GetPropertiesOptions{CPKInfo: cpkOptions.GetCPKInfo()})
Expand Down Expand Up @@ -603,7 +603,7 @@ func createClientOptions(logger common.ILoggerResetable, srcCred *common.ScopedC
MaxRetryDelay: ste.UploadMaxRetryDelay,
}, policy.TelemetryOptions{
ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent),
}, ste.NewAzcopyHTTPClient(frontEndMaxIdleConnectionsPerHost), nil, logOptions, srcCred)
}, ste.NewAzcopyHTTPClient(frontEndMaxIdleConnectionsPerHost), logOptions, srcCred)
}

const frontEndMaxIdleConnectionsPerHost = http.DefaultMaxIdleConnsPerHost
11 changes: 11 additions & 0 deletions e2etest/newe2e_task_resourcemanagement.go
Expand Up @@ -220,3 +220,14 @@ func ValidateErrorOutput(a Asserter, stdout AzCopyStdout, errorMsg string) {
fmt.Println(stdout.String())
a.Error("expected error message not found in azcopy output")
}

func ValidateStatsReturned(a Asserter, stdout AzCopyStdout) {
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() {
return
}
csrStdout, ok := stdout.(*AzCopyParsedCopySyncRemoveStdout)
a.AssertNow("stdout must be AzCopyParsedCopySyncRemoveStdout", Equal{}, ok, true)
// Check for any of the stats. It's possible for average iops, server busy percentage, network error percentage to be 0, but average e2e milliseconds should never be 0.
statsFound := csrStdout.FinalStatus.AverageIOPS != 0 || csrStdout.FinalStatus.AverageE2EMilliseconds != 0 || csrStdout.FinalStatus.ServerBusyPercentage != 0 || csrStdout.FinalStatus.NetworkErrorPercentage != 0
a.Assert("stats must be returned", Equal{}, statsFound, true)
}
11 changes: 7 additions & 4 deletions e2etest/zt_newe2e_basic_functionality_test.go
Expand Up @@ -11,7 +11,7 @@ func init() {

type BasicFunctionalitySuite struct{}

func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *ScenarioVariationManager) {
func (s *BasicFunctionalitySuite) Scenario_SingleFile(svm *ScenarioVariationManager) {
azCopyVerb := ResolveVariation(svm, []AzCopyVerb{AzCopyVerbCopy, AzCopyVerbSync}) // Calculate verb early to create the destination object early
// Scale up from service to object
dstObj := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob()})), ResourceDefinitionContainer{}).GetObject(svm, "test", common.EEntityType.File())
Expand All @@ -32,15 +32,15 @@ func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *Scenari
Body: body,
})

// no s2s, no local->local
if srcObj.Location().IsRemote() == dstObj.Location().IsRemote() {
// no local->local
if srcObj.Location().IsLocal() && dstObj.Location().IsLocal() {
svm.InvalidateScenario()
return
}

sasOpts := GenericAccountSignatureValues{}

RunAzCopy(
stdout, _ := RunAzCopy(
svm,
AzCopyCommand{
// Sync is not included at this moment, because sync requires
Expand All @@ -63,4 +63,7 @@ func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *Scenari
ValidateResource[ObjectResourceManager](svm, dstObj, ResourceDefinitionObject{
Body: body,
}, true)

// Validate that the network stats were updated
ValidateStatsReturned(svm, stdout)
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 7 additions & 6 deletions ste/mgr-JobPartMgr.go
Expand Up @@ -129,12 +129,12 @@ func (d *dialRateLimiter) DialContext(ctx context.Context, network, address stri
return d.dialer.DialContext(ctx, network, address)
}

func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, statsAcc *PipelineNetworkStats, log LogOptions, srcCred *common.ScopedCredential) azcore.ClientOptions {
func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, log LogOptions, srcCred *common.ScopedCredential) azcore.ClientOptions {
// Pipeline will look like
// [includeResponsePolicy, newAPIVersionPolicy (ignored), NewTelemetryPolicy, perCall, NewRetryPolicy, perRetry, NewLogPolicy, httpHeaderPolicy, bodyDownloadPolicy]
perCallPolicies := []policy.Policy{azruntime.NewRequestIDPolicy(), NewVersionPolicy(), newFileUploadRangeFromURLFixPolicy()}
// TODO : Default logging policy is not equivalent to old one. tracing HTTP request
perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy(statsAcc)}
perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy()}
if srcCred != nil {
perRetryPolicies = append(perRetryPolicies, NewSourceAuthPolicy(srcCred))
}
Expand Down Expand Up @@ -184,10 +184,9 @@ type jobPartMgr struct {
srcServiceClient *common.ServiceClient
dstServiceClient *common.ServiceClient


credInfo common.CredentialInfo
srcIsOAuth bool // true if source is authenticated via oauth
credOption *common.CredentialOpOptions
credInfo common.CredentialInfo
srcIsOAuth bool // true if source is authenticated via oauth
credOption *common.CredentialOpOptions
// When the part is schedule to run (inprogress), the below fields are used
planMMF *JobPartPlanMMF // This Job part plan's MMF

Expand Down Expand Up @@ -364,6 +363,8 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {

// Each transfer gets its own context (so any chunk can cancel the whole transfer) based off the job's context
transferCtx, transferCancel := context.WithCancel(jobCtx)
// Add the pipeline network stats to the context. This will be manually unset for all sourceInfoProvider contexts.
transferCtx = withPipelineNetworkStats(transferCtx, jpm.jobMgr.PipelineNetworkStats())
// Initialize a job part transfer manager
jptm := &jobPartTransferMgr{
jobPartMgr: jpm,
Expand Down
20 changes: 12 additions & 8 deletions ste/sourceInfoProvider-Blob.go
Expand Up @@ -21,6 +21,7 @@
package ste

import (
"context"
"crypto/md5"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
Expand All @@ -33,6 +34,7 @@ import (
type blobSourceInfoProvider struct {
defaultRemoteSourceInfoProvider
source *blob.Client
ctx context.Context
}

func (p *blobSourceInfoProvider) IsDFSSource() bool {
Expand All @@ -48,17 +50,15 @@ func (p *blobSourceInfoProvider) RawSource() string {
}

func (p *blobSourceInfoProvider) ReadLink() (string, error) {
ctx := p.jptm.Context()

resp, err := p.source.DownloadStream(ctx, &blob.DownloadStreamOptions{
resp, err := p.source.DownloadStream(p.ctx, &blob.DownloadStreamOptions{
CPKInfo: p.jptm.CpkInfo(),
CPKScopeInfo: p.jptm.CpkScopeInfo(),
})
if err != nil {
return "", err
}

symlinkBuf, err := io.ReadAll(resp.NewRetryReader(ctx, &blob.RetryReaderOptions{
symlinkBuf, err := io.ReadAll(resp.NewRetryReader(p.ctx, &blob.RetryReaderOptions{
MaxRetries: 5,
OnFailedRead: common.NewBlobReadLogFunc(p.jptm, p.jptm.Info().Source),
}))
Expand Down Expand Up @@ -125,6 +125,10 @@ func newBlobSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, e

ret.source = blobClient

ctx := jptm.Context()
ctx = withPipelineNetworkStats(ctx, nil)
ret.ctx = ctx

return ret, nil
}

Expand All @@ -136,7 +140,7 @@ func (p *blobSourceInfoProvider) AccessControl() (*string, error) {

sourceDatalakeClient := dsc.NewFileSystemClient(p.jptm.Info().SrcContainer).NewFileClient(p.jptm.Info().SrcFilePath)

resp, err := sourceDatalakeClient.GetAccessControl(p.jptm.Context(), nil)
resp, err := sourceDatalakeClient.GetAccessControl(p.ctx, nil)
if err != nil {
return nil, err
}
Expand All @@ -156,7 +160,7 @@ func (p *blobSourceInfoProvider) BlobType() blob.BlobType {

func (p *blobSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) {
// We can't set a custom LMT on HNS, so it doesn't make sense to swap here.
properties, err := p.source.GetProperties(p.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: p.jptm.CpkInfo()})
properties, err := p.source.GetProperties(p.ctx, &blob.GetPropertiesOptions{CPKInfo: p.jptm.CpkInfo()})
if err != nil {
return time.Time{}, err
}
Expand All @@ -168,7 +172,7 @@ func (p *blobSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) {
if count <= common.MaxRangeGetSize {
rangeGetContentMD5 = to.Ptr(true)
}
response, err := p.source.DownloadStream(p.jptm.Context(),
response, err := p.source.DownloadStream(p.ctx,
&blob.DownloadStreamOptions{
Range: blob.HTTPRange{Offset: offset, Count: count},
RangeGetContentMD5: rangeGetContentMD5,
Expand All @@ -182,7 +186,7 @@ func (p *blobSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) {
return response.ContentMD5, nil
} else {
// compute md5
body := response.NewRetryReader(p.jptm.Context(), &blob.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody})
body := response.NewRetryReader(p.ctx, &blob.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody})
defer body.Close()
h := md5.New()
if _, err = io.Copy(h, body); err != nil {
Expand Down
7 changes: 5 additions & 2 deletions ste/sourceInfoProvider-File.go
Expand Up @@ -191,9 +191,12 @@ func newFileSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, e
// due to the REST parity feature added in 2019-02-02, the File APIs are no longer backward compatible
// so we must use the latest SDK version to stay safe
//TODO: Should we do that?
ctx := jptm.Context()
ctx = withPipelineNetworkStats(ctx, nil)

return &fileSourceInfoProvider{
defaultRemoteSourceInfoProvider: *base,
ctx: jptm.Context(),
ctx: ctx,
cacheOnce: &sync.Once{},
srcShareClient: s.NewShareClient(jptm.Info().SrcContainer),
sourceURL: source.URL()}, nil
Expand Down Expand Up @@ -341,7 +344,7 @@ func (p *fileSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) {
return response.ContentMD5, nil
} else {
// compute md5
body := response.NewRetryReader(p.jptm.Context(), &file.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody})
body := response.NewRetryReader(p.ctx, &file.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody})
defer body.Close()
h := md5.New()
if _, err = io.Copy(h, body); err != nil {
Expand Down
14 changes: 10 additions & 4 deletions ste/sourceInfoProvider-GCP.go
Expand Up @@ -2,6 +2,7 @@ package ste

import (
gcpUtils "cloud.google.com/go/storage"
"context"
"crypto/md5"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/common"
Expand All @@ -21,6 +22,7 @@ type gcpSourceInfoProvider struct {

gcpClient *gcpUtils.Client
gcpURLParts common.GCPURLParts
ctx context.Context
}

var gcpClientFactory = common.NewGCPClientFactory()
Expand All @@ -39,8 +41,12 @@ func newGCPSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, er
return nil, err
}

ctx := jptm.Context()
ctx = withPipelineNetworkStats(ctx, nil)
p.ctx = ctx

p.gcpClient, err = gcpClientFactory.GetGCPClient(
p.jptm.Context(),
p.ctx,
common.CredentialInfo{
CredentialType: common.ECredentialType.GoogleAppCredentials(),
GCPCredentialInfo: common.GCPCredentialInfo{},
Expand Down Expand Up @@ -88,7 +94,7 @@ func (p *gcpSourceInfoProvider) Properties() (*SrcProperties, error) {
SrcMetadata: p.transferInfo.SrcMetadata,
}
if p.transferInfo.S2SGetPropertiesInBackend {
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.jptm.Context())
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,7 +157,7 @@ func (p *gcpSourceInfoProvider) IsLocal() bool {
}

func (p *gcpSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) {
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.jptm.Context())
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx)
if err != nil {
return time.Time{}, err
}
Expand All @@ -164,7 +170,7 @@ func (p *gcpSourceInfoProvider) EntityType() common.EntityType {

func (p *gcpSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) {
// gcp does not support getting range md5
body, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).NewRangeReader(p.jptm.Context(), offset, count)
body, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).NewRangeReader(p.ctx, offset, count)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion ste/sourceInfoProvider-S3.go
Expand Up @@ -69,7 +69,9 @@ func newS3SourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, err
} else {
p.credType = common.ECredentialType.S3AccessKey()
}
p.s3Client, err = s3ClientFactory.GetS3Client(p.jptm.Context(), common.CredentialInfo{
ctx := jptm.Context()
ctx = withPipelineNetworkStats(ctx, nil)
p.s3Client, err = s3ClientFactory.GetS3Client(ctx, common.CredentialInfo{
CredentialType: p.credType,
S3CredentialInfo: common.S3CredentialInfo{
Endpoint: p.s3URLPart.Endpoint,
Expand Down
32 changes: 21 additions & 11 deletions ste/xferStatsPolicy.go
Expand Up @@ -22,6 +22,7 @@ package ste

import (
"bytes"
"context"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-storage-azcopy/v10/common"
"io"
Expand Down Expand Up @@ -173,33 +174,42 @@ func transparentlyReadBody(r *http.Response) string {
return string(buf) // copy to string
}

var pipelineNetworkStatsContextKey = contextKey{"pipelineNetworkStats"}

// withPipelineNetworkStats returns a context that contains a pipeline network stats. The retryNotificationPolicy
// will then invoke the pipeline network stats object when necessary
func withPipelineNetworkStats(ctx context.Context, stats *PipelineNetworkStats) context.Context {
return context.WithValue(ctx, pipelineNetworkStatsContextKey, stats)
}

type statsPolicy struct {
stats *PipelineNetworkStats
}

func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) {
start := time.Now()

response, err := req.Next()
if s.stats != nil {
if s.stats.IsStarted() {
atomic.AddInt64(&s.stats.atomicOperationCount, 1)
atomic.AddInt64(&s.stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000))
// Grab the notification callback out of the context and, if its there, call it
stats, ok := req.Raw().Context().Value(pipelineNetworkStatsContextKey).(*PipelineNetworkStats)
if ok && stats != nil {
if stats.IsStarted() {
atomic.AddInt64(&stats.atomicOperationCount, 1)
atomic.AddInt64(&stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000))

if err != nil && !isContextCancelledError(err) {
// no response from server
atomic.AddInt64(&s.stats.atomicNetworkErrorCount, 1)
atomic.AddInt64(&stats.atomicNetworkErrorCount, 1)
}
}

// always look at retries, even if not started, because concurrency tuner needs to know about them
// TODO should we also count status 500? It is mentioned here as timeout:https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets
if response != nil && response.StatusCode == http.StatusServiceUnavailable {
s.stats.tunerInterface.recordRetry() // always tell the tuner
if s.stats.IsStarted() { // but only count it here, if we have started
stats.tunerInterface.recordRetry() // always tell the tuner
if stats.IsStarted() { // but only count it here, if we have started
// To find out why the server was busy we need to look at the response
responseBodyText := transparentlyReadBody(response)
s.stats.recordRetry(responseBodyText)
stats.recordRetry(responseBodyText)
}

}
Expand All @@ -208,6 +218,6 @@ func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) {
return response, err
}

func newStatsPolicy(accumulator *PipelineNetworkStats) policy.Policy {
return statsPolicy{stats: accumulator}
func newStatsPolicy() policy.Policy {
return statsPolicy{}
}