Skip to content

Commit

Permalink
enable rpc logging; make sure testpachd binary's Run gets the loki co…
Browse files Browse the repository at this point in the history
…ntext
  • Loading branch information
jrockway committed Apr 5, 2024
1 parent 2a16528 commit f02a218
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 32 deletions.
75 changes: 48 additions & 27 deletions src/internal/lokiutil/testloki/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testloki

import (
"context"
"fmt"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
Expand All @@ -25,15 +26,24 @@ func WithTestLoki(l *TestLoki) pachd.TestPachdOption {
config.LokiLogging = true
},
MutateContext: func(ctx context.Context) context.Context {
return pctx.Child(ctx, "", pctx.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(c, l.NewZapCore(ctx))
})))
templateHash := randutil.UniqueString("")[0:10]
procHash := randutil.UniqueString("")[0:5]
return pctx.Child(ctx, "", l.WithLoki(ctx, map[string]string{
"host": "localhost",
"app": "pachd",
"container": "pachd",
"node_name": "localhost",
"pod": fmt.Sprintf("pachd-%v-%v", templateHash, procHash),
"pod_template_hash": templateHash,
"stream": "stderr",
"suite": "pachyderm",
}))
},
}
}

// NewZapCore returns a zapcore.Core that sends logs to this Loki instance.
func (l *TestLoki) NewZapCore(ctx context.Context) zapcore.Core {
// newZapCore returns a zapcore.Core that sends logs to this Loki instance.
func (l *TestLoki) newZapCore(ctx context.Context) *lokiCore {
return &lokiCore{
ctx: ctx,
l: l,
Expand All @@ -51,16 +61,29 @@ func (l *TestLoki) NewZapCore(ctx context.Context) zapcore.Core {
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
podTemplateHash: randutil.UniqueString("")[0:10],
labels: map[string]string{"suite": "pachyderm"},
}
}

// WithLoki returns a pctx context option that will send logs to this Loki instance.
func (l *TestLoki) WithLoki(sendCtx context.Context, lokiLabels map[string]string) pctx.Option {
lc := l.newZapCore(sendCtx)
lc.labels = lokiLabels
return pctx.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(c, lc)
}))
}

// lokiCore sends JSON log messages to the provided TestLoki instance. If you were going to use
// this code in production, you would want to skip the JSON serialization step, buffer lines to send
// them in batches, and read the time from the Entry instead of using time.Now(). (We use
// time.Now() here because it's more like what promtail does; reads the line and adds its own
// timestamp.)
type lokiCore struct {
ctx context.Context
l *TestLoki
enc zapcore.Encoder
podTemplateHash string
fields []zapcore.Field
ctx context.Context
l *TestLoki
enc zapcore.Encoder
labels map[string]string
}

var _ zapcore.Core = (*lokiCore)(nil)
Expand All @@ -71,11 +94,12 @@ func (c *lokiCore) Enabled(zapcore.Level) bool {
}

// With implements zapcore.Core.
func (c *lokiCore) With(f []zapcore.Field) zapcore.Core {
var fields []zapcore.Field
fields = append(fields, c.fields...)
fields = append(fields, f...)
return &lokiCore{ctx: c.ctx, l: c.l, enc: c.enc, podTemplateHash: c.podTemplateHash, fields: fields}
func (c *lokiCore) With(fields []zapcore.Field) zapcore.Core {
enc := c.enc.Clone()
for _, f := range fields {
f.AddTo(enc)
}
return &lokiCore{ctx: c.ctx, l: c.l, enc: enc, labels: c.labels}
}

// Check implements zapcore.Core.
Expand All @@ -88,22 +112,19 @@ func (c *lokiCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.Che

// Write implements zapcore.Core.
func (c *lokiCore) Write(e zapcore.Entry, fields []zapcore.Field) error {
select {
case <-c.ctx.Done():
// For tests, we do not care about writes failing beacuse the root context is done.
return nil
default:
}
buf, err := c.enc.EncodeEntry(e, fields)
if err != nil {
return errors.Wrap(err, "encode log entry")
}
if err := c.l.AddLog(c.ctx, &Log{
Time: time.Now(),
Labels: map[string]string{
"host": "localhost",
"app": "pachd",
"container": "pachd",
"node_name": "localhost",
"pod": "pachd-" + c.podTemplateHash + c.podTemplateHash[0:5],
"pod_template_hash": c.podTemplateHash,
"stream": "stderr",
"suite": "pachyderm",
},
Time: time.Now(),
Labels: c.labels,
Message: string(buf.Bytes()),
}); err != nil {
return errors.Wrap(err, "send log to loki")
Expand Down
1 change: 1 addition & 0 deletions src/internal/pachd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//src/internal/middleware/auth",
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/logging/client",
"//src/internal/middleware/validation",
"//src/internal/middleware/version",
"//src/internal/migrations",
Expand Down
19 changes: 17 additions & 2 deletions src/internal/pachd/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
lokiclient "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
auth_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
clientlog_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging/client"
"github.com/pachyderm/pachyderm/v2/src/internal/obj"
"github.com/pachyderm/pachyderm/v2/src/internal/pachconfig"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
Expand Down Expand Up @@ -513,7 +514,14 @@ func (pd *Full) PachClient(ctx context.Context) (*client.APIClient, error) {
if err != nil {
return nil, errors.Wrap(err, "parse pachd address")
}
c, err := client.NewFromPachdAddress(ctx, addr)
c, err := client.NewFromPachdAddress(ctx, addr,
client.WithAdditionalUnaryClientInterceptors(
clientlog_interceptor.LogUnary,
),
client.WithAdditionalStreamClientInterceptors(
clientlog_interceptor.LogStream,
),
)
if err != nil {
return nil, errors.Wrap(err, "NewPachdFromAddress")
}
Expand All @@ -529,7 +537,14 @@ func (pd *Full) mustGetPachClient(ctx context.Context) *client.APIClient {
if err != nil {
panic(fmt.Sprintf("parse pachd address: %v", err))
}
c, err := client.NewFromPachdAddress(ctx, addr)
c, err := client.NewFromPachdAddress(ctx, addr,
client.WithAdditionalUnaryClientInterceptors(
clientlog_interceptor.LogUnary,
),
client.WithAdditionalStreamClientInterceptors(
clientlog_interceptor.LogStream,
),
)
if err != nil {
panic(fmt.Sprintf("NewFromPachdAddress: %v", err))
}
Expand Down
6 changes: 5 additions & 1 deletion src/internal/pachd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ func initMetadataServer(out *metadata.APIServer, env func() metadata_server.Env)
// reg is called to register functions with the server.
func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener, reg func(gs grpc.ServiceRegistrar)) func(ctx context.Context) error {
return func(ctx context.Context) error {
loggingInterceptor := log_interceptor.NewBaseContextInterceptor(ctx)
baseContextInterceptor := log_interceptor.NewBaseContextInterceptor(ctx)
loggingInterceptor := log_interceptor.NewLoggingInterceptor(ctx)
loggingInterceptor.Level = log.DebugLevel
gs := grpc.NewServer(
grpc.ChainUnaryInterceptor(
errorsmw.UnaryServerInterceptor,
version_middleware.UnaryServerInterceptor,
tracing.UnaryServerInterceptor(),
authInterceptor.InterceptUnary,
baseContextInterceptor.UnaryServerInterceptor,
loggingInterceptor.UnaryServerInterceptor,
validation.UnaryServerInterceptor,
),
Expand All @@ -234,6 +237,7 @@ func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener,
version_middleware.StreamServerInterceptor,
tracing.StreamServerInterceptor(),
authInterceptor.InterceptStream,
baseContextInterceptor.StreamServerInterceptor,
loggingInterceptor.StreamServerInterceptor,
validation.StreamServerInterceptor,
),
Expand Down
7 changes: 5 additions & 2 deletions src/testing/testpachd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func main() {

// Handle cleaning up on exit.
ctx, cancel := pctx.Interactive()
runCtx := ctx
var exitErr error
var clean cleanup.Cleaner
defer func() {
Expand Down Expand Up @@ -73,7 +74,9 @@ func main() {
return
}
clean.AddCleanup("loki", l.Close)
opts = append(opts, testloki.WithTestLoki(l))
opt := testloki.WithTestLoki(l)
opts = append(opts, opt)
runCtx = opt.MutateContext(runCtx)
}

// Build pachd.
Expand All @@ -90,7 +93,7 @@ func main() {
errCh := make(chan error)
go func() {
defer close(errCh)
if err := pd.Run(ctx); err != nil {
if err := pd.Run(runCtx); err != nil {
// If pachd exits, send the error on errCh. errCh is read after the context
// that cancel() cancels is done, so cancel it first so the write doesn't
// block.
Expand Down

0 comments on commit f02a218

Please sign in to comment.