From 9a2310f97941e651730fd5f72823580e3210d983 Mon Sep 17 00:00:00 2001 From: Timofey Kirillov Date: Wed, 13 Jul 2022 17:30:19 +0300 Subject: [PATCH] feat(telemetry): fix telemetry lags; telemetry logs; ignore commands list * Optionally write telemetry log to the provided WERF_TELEMETRY_LOG_FILE. * Introduce ignore commands list, werf will not send telemetry for following commands: * werf version; * werf completion; * werf synchronization. * Lower overall sending timeout to 1sec. Signed-off-by: Timofey Kirillov --- cmd/werf/common/telemetry.go | 46 +++++++++++++++++--------------- cmd/werf/main.go | 8 +----- pkg/telemetry/telemetry.go | 30 ++++++++++++++++++--- pkg/telemetry/telemetrywerfio.go | 19 ++++++++----- 4 files changed, 66 insertions(+), 37 deletions(-) diff --git a/cmd/werf/common/telemetry.go b/cmd/werf/common/telemetry.go index 92bdae207d..63a20cfb1f 100644 --- a/cmd/werf/common/telemetry.go +++ b/cmd/werf/common/telemetry.go @@ -3,7 +3,6 @@ package common import ( "context" "fmt" - "os" "strings" "github.com/go-git/go-git/v5/plumbing/transport" @@ -14,41 +13,49 @@ import ( "github.com/werf/werf/pkg/util" ) +var telemetryIgnoreCommands = []string{ + "werf version", + "werf synchronization", + "werf completion", +} + func InitTelemetry(ctx context.Context) { if err := telemetry.Init(ctx, telemetry.TelemetryOptions{ ErrorHandlerFunc: func(err error) { if err == nil { return } - logTelemetryError(err.Error()) + + telemetry.LogF("error: %s", err) }, }); err != nil { - logTelemetryError(fmt.Sprintf("unable to init: %s", err)) + telemetry.LogF("error: %s", err) } } func ShutdownTelemetry(ctx context.Context, exitCode int) { if err := telemetry.Shutdown(ctx); err != nil { - logTelemetryError(fmt.Sprintf("unable to shutdown: %s", err)) - } -} - -func logTelemetryError(msg string) { - if !telemetry.IsLogsEnabled() { - return + telemetry.LogF("unable to shutdown: %s", err) } - fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", msg) } func TelemetryPreRun(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - telemetry.GetTelemetryWerfIO().SetCommand(ctx, getTelemetryCommand(cmd)) + command := getTelemetryCommand(cmd) - if projectID, err := getTelemetryProjectID(ctx); err != nil { - if telemetry.IsLogsEnabled() { - fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", err) + for _, c := range telemetryIgnoreCommands { + if command == c { + return nil } + } + + InitTelemetry(ctx) + + telemetry.GetTelemetryWerfIO().SetCommand(ctx, command) + + if projectID, err := getTelemetryProjectID(ctx); err != nil { + telemetry.LogF("error: %s", err) } else { telemetry.GetTelemetryWerfIO().SetProjectID(ctx, projectID) } @@ -84,9 +91,7 @@ func getTelemetryProjectID(ctx context.Context) (string, error) { } if repo, err := getTelemetryLocalRepo(ctx, workingDir, gitWorkTree); err != nil { - if telemetry.IsLogsEnabled() { - fmt.Fprintf(os.Stderr, "Telemetry: unable to open local repo: %s\n", err) - } + telemetry.LogF("unable to detect projectID: unable to open local repo: %s", err) } else { url, err := repo.RemoteOriginUrl(ctx) if err != nil { @@ -100,9 +105,8 @@ func getTelemetryProjectID(ctx context.Context) (string, error) { hashParts := []string{ep.Protocol, ep.Host, fmt.Sprintf("%d", ep.Port), ep.Path} - if telemetry.IsLogsEnabled() { - fmt.Fprintf(os.Stderr, "Telemetry: calculate projectID based on repo origin url\n") - } + telemetry.LogF("calculate projectID based on repo origin url") + projectID = util.Sha256Hash(hashParts...) } diff --git a/cmd/werf/main.go b/cmd/werf/main.go index 28746c05cb..1d0c87fe56 100644 --- a/cmd/werf/main.go +++ b/cmd/werf/main.go @@ -56,11 +56,8 @@ import ( func main() { ctx := common.GetContextWithLogger() - common.InitTelemetry(ctx) - shouldTerminate, err := common.ContainerBackendProcessStartupHook() if err != nil { - common.ShutdownTelemetry(ctx, 1) common.TerminateWithError(err.Error(), 1) } if shouldTerminate { @@ -72,7 +69,6 @@ func main() { logrus.StandardLogger().SetOutput(logboek.OutStream()) if err := process_exterminator.Init(); err != nil { - common.ShutdownTelemetry(ctx, 1) common.TerminateWithError(fmt.Sprintf("process exterminator initialization failed: %s", err), 1) } @@ -284,9 +280,7 @@ func setupTelemetryInit(rootCmd *cobra.Command) { cmd.RunE = func(cmd *cobra.Command, args []string) error { if err := common.TelemetryPreRun(cmd, args); err != nil { - if telemetry.IsLogsEnabled() { - fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", err) - } + telemetry.LogF("error: %s\n", err) } if oldRunE != nil { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 99a167b150..89f80156c4 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -3,6 +3,8 @@ package telemetry import ( "context" "fmt" + "os" + "time" "go.opentelemetry.io/otel" @@ -13,7 +15,10 @@ const ( TracesURL = "https://telemetry.werf.io/v1/traces" ) -var telemetrywerfio *TelemetryWerfIO +var ( + telemetrywerfio *TelemetryWerfIO + logFile *os.File +) func GetTelemetryWerfIO() TelemetryWerfIOInterface { if telemetrywerfio == nil { @@ -31,6 +36,14 @@ func Init(ctx context.Context, opts TelemetryOptions) error { return nil } + if path := os.Getenv("WERF_TELEMETRY_LOG_FILE"); path != "" { + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + return fmt.Errorf("unable to open log file %q: %w", path, err) + } + logFile = f + } + if t, err := NewTelemetryWerfIO(TracesURL, TelemetryWerfIOOptions{ HandleErrorFunc: opts.ErrorHandlerFunc, }); err != nil { @@ -60,6 +73,14 @@ func Shutdown(ctx context.Context) error { if !IsEnabled() { return nil } + if telemetrywerfio == nil { + return nil + } + + if logFile != nil { + defer logFile.Close() + } + return telemetrywerfio.Shutdown(ctx) } @@ -67,6 +88,9 @@ func IsEnabled() bool { return util.GetBoolEnvironmentDefaultFalse("WERF_TELEMETRY") } -func IsLogsEnabled() bool { - return util.GetBoolEnvironmentDefaultFalse("WERF_TELEMETRY_LOGS") +func LogF(f string, args ...interface{}) { + if logFile == nil { + return + } + fmt.Fprintf(logFile, "[%d][%s] Telemetry: %s\n", os.Getpid(), time.Now(), fmt.Sprintf(f, args...)) } diff --git a/pkg/telemetry/telemetrywerfio.go b/pkg/telemetry/telemetrywerfio.go index 515a7b433f..b9b51fbe87 100644 --- a/pkg/telemetry/telemetrywerfio.go +++ b/pkg/telemetry/telemetrywerfio.go @@ -52,8 +52,8 @@ func NewTelemetryWerfIO(url string, opts TelemetryWerfIOOptions) (*TelemetryWerf handleErrorFunc: opts.HandleErrorFunc, tracerProvider: sdktrace.NewTracerProvider( sdktrace.WithBatcher(e, - sdktrace.WithBatchTimeout(0), - sdktrace.WithExportTimeout(3*time.Second), + sdktrace.WithBatchTimeout(1*time.Millisecond), // send all available events immediately + sdktrace.WithExportTimeout(1000*time.Millisecond), ), ), traceExporter: e, @@ -62,6 +62,7 @@ func NewTelemetryWerfIO(url string, opts TelemetryWerfIOOptions) (*TelemetryWerf } func (t *TelemetryWerfIO) Start(ctx context.Context) error { + LogF("start trace exporter") if err := t.traceExporter.Start(ctx); err != nil { return fmt.Errorf("error starting telemetry trace exporter: %w", err) } @@ -69,18 +70,25 @@ func (t *TelemetryWerfIO) Start(ctx context.Context) error { } func (t *TelemetryWerfIO) Shutdown(ctx context.Context) error { + LogF("start shutdown") + + LogF("flush trace provider") if err := t.tracerProvider.ForceFlush(ctx); err != nil { return fmt.Errorf("unable to force flush tracer provider: %w", err) } + LogF("shutdown trace exporter") if err := t.traceExporter.Shutdown(ctx); err != nil { return fmt.Errorf("unable to shutdown trace exporter: %w", err) } + LogF("shutdown trace provider") if err := t.tracerProvider.Shutdown(ctx); err != nil { return fmt.Errorf("unable to shutdown trace provider: %w", err) } + LogF("shutdown complete") + return nil } @@ -117,6 +125,8 @@ func (t *TelemetryWerfIO) sendEvent(ctx context.Context, eventType EventType, ev trc := t.getTracer() _, span := trc.Start(ctx, spanName) + LogF("start sending event") + ts := time.Now().UnixMilli() span.SetAttributes(attribute.Key("ts").Int64(ts)) @@ -138,12 +148,9 @@ func (t *TelemetryWerfIO) sendEvent(ctx context.Context, eventType EventType, ev } span.SetAttributes(attribute.Key("eventData").String(string(rawEventData))) span.SetAttributes(attribute.Key("schemaVersion").Int64(schemaVersion)) - span.End() - if IsLogsEnabled() { - fmt.Printf("Telemetry: sent event: ts=%d executionID=%q projectID=%q command=%q attributes=%q eventType=%q eventData=%q schemaVersion=%d\n", ts, t.executionID, t.projectID, t.command, string(rawAttributes), string(eventType), string(rawEventData), schemaVersion) - } + LogF("sent event: ts=%d executionID=%q projectID=%q command=%q attributes=%q eventType=%q eventData=%q schemaVersion=%d", ts, t.executionID, t.projectID, t.command, string(rawAttributes), string(eventType), string(rawEventData), schemaVersion) return nil }