diff --git a/profiler/profiler.go b/profiler/profiler.go index fdcfc2dd60f..54f2a98669d 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -35,6 +35,7 @@ package profiler import ( + "bufio" "bytes" "context" "errors" @@ -44,6 +45,7 @@ import ( "regexp" "runtime" "runtime/pprof" + "strconv" "sync" "time" @@ -78,6 +80,7 @@ var ( dialGRPC = gtransport.DialPool onGCE = gcemd.OnGCE serviceRegexp = regexp.MustCompile(`^[a-z]([-a-z0-9_.]{0,253}[a-z0-9])?$`) + mutexHeap sync.Mutex // For testing only. // When the profiling loop has exited without error and this channel is @@ -99,6 +102,9 @@ const ( maxBackoff = time.Hour backoffMultiplier = 1.3 // Backoff envelope increases by this factor on each retry. retryInfoMetadata = "google.rpc.retryinfo-bin" + memLimitFile = "/sys/fs/cgroup/memory/memory.limit_in_bytes" + memUsedFile = "/sys/fs/cgroup/memory/memory.usage_in_bytes" + eventLabel = "event" ) // Config is the profiler configuration. @@ -183,6 +189,19 @@ type Config struct { // the profile collection loop exits.When numProfiles is 0, profiles will // be collected for the duration of the program. For testing only. numProfiles int + + // When true, the agent collects a heap profile when memory usage hits a + // thresholds specified in OOMProfileThreshold. Used only when + // running in containers. + OOMProfile bool + + // OOMProfileThreshold sets thresholds for memory utilization, above which + // a heap profile is collected. Used only when running in containers. + OOMProfileThreshold float32 + + // OOMCheckInterval is the time interval between two consecutive memory + // utilization checks. + OOMCheckInterval time.Duration } // allowUntilSuccess is an object that will perform action till @@ -256,6 +275,7 @@ func start(cfg Config, options ...option.ClientOption) error { return err } go pollProfilerService(withXGoogHeader(ctx), a) + go pollMemoryUtilization(ctx, a) return nil } @@ -380,6 +400,8 @@ func (a *agent) profileAndUpload(ctx context.Context, p *pb.Profile) { sleep(ctx, duration) stopCPUProfile() case pb.ProfileType_HEAP: + mutexHeap.Lock() + defer mutexHeap.Unlock() if err := heapProfile(&prof); err != nil { debugLog("failed to write heap profile: %v", err) return @@ -594,6 +616,11 @@ func initializeConfig(cfg Config) error { if config.APIAddr == "" { config.APIAddr = apiAddress } + + if config.OOMProfileThreshold == 0 { + config.OOMProfileThreshold = 0.98 + } + return nil } @@ -612,3 +639,95 @@ func pollProfilerService(ctx context.Context, a *agent) { profilingDone <- true } } + +func readValueFromFile(ctx context.Context, file string) (int64, error) { + fh, err := os.Open(file) + if err != nil { + return 0, fmt.Errorf("failed to read file: %v", err) + } + defer fh.Close() + + fscanner := bufio.NewScanner(fh) + + var value int64 + for fscanner.Scan() { + value, err = strconv.ParseInt(fscanner.Text(), 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to read number from file: %v", err) + } + break + } + return value, nil +} + +func calculateMemoryUtilization(ctx context.Context) (float32, error) { + memLimitBytes, err := readValueFromFile(ctx, memLimitFile) + if err != nil { + return 0, fmt.Errorf("failed to read value from file: %v", err) + } + + memUsedBytes, err := readValueFromFile(ctx, memUsedFile) + if err != nil { + return 0, fmt.Errorf("failed to read value from file: %v", err) + } + return float32(float64(memUsedBytes) / float64(memLimitBytes)), nil +} + +// pollMemoryUtilization periodically calculates memory utilization. When the +// utilization crosses OOMProfileThreshold, a heap profile is collected and +// uploaded. +func pollMemoryUtilization(ctx context.Context, a *agent) { + // Check HEAP profile is not disabled. + heapEnabled := false + for _, enabled := range a.profileTypes { + if enabled == pb.ProfileType_HEAP { + heapEnabled = true + break + } + } + if !heapEnabled { + debugLog("HEAP profile collection is disabled. OOM profile collection will be disabled") + return + } + + // Start ticker + tick := time.NewTicker(config.OOMCheckInterval) + var ( + utilization float32 + err error + prof bytes.Buffer + ) + for range tick.C { + utilization, err = calculateMemoryUtilization(ctx) + if err != nil { + debugLog("failed to calculate memory utilization: %v", err) + } + if utilization >= config.OOMProfileThreshold { + mutexHeap.Lock() + if err := heapProfile(&prof); err != nil { + debugLog("failed to write OOM heap profile: %v", err) + } + mutexHeap.Unlock() + req := pb.CreateOfflineProfileRequest{ + Parent: "projects/" + a.deployment.ProjectId, + Profile: &pb.Profile{ + ProfileType: pb.ProfileType_HEAP, + Deployment: &pb.Deployment{ + ProjectId: a.deployment.ProjectId, + Target: a.deployment.Target + "-event", + Labels: a.deployment.Labels, + }, + ProfileBytes: prof.Bytes(), + Labels: map[string]string{ + eventLabel: "oom", + }, + }, + } + p, err := a.client.CreateOfflineProfile(ctx, &req) + if err != nil { + debugLog("failed to upload event profile: %v", err) + } + debugLog("uploaded event profile: %s", p.Name) + } + } +}