Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-2238] Loki in testpachd fast-follow #9913

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
01b334b
add logs and proxy server
jrockway Mar 29, 2024
c0ef120
add MutateContext option to TestPachdOption
jrockway Mar 29, 2024
6b37e0c
add a TestPachdOption for logging to Loki
jrockway Mar 29, 2024
c531ee1
add -loki option to testpachd binary
jrockway Mar 29, 2024
73c5815
cleanup loki tmpdir
jrockway Mar 29, 2024
19ddc28
enable rpc logging; make sure testpachd binary's Run gets the loki co…
jrockway Mar 29, 2024
653b179
Merge branch 'master' into jonathan/core-2238-loki-in-testpachd
robert-uhl Apr 2, 2024
5b5ac2c
Unify context, wait on completion
robert-uhl Apr 3, 2024
de0c9a4
Handle verbosity
robert-uhl Apr 3, 2024
820a96c
Merge branch 'jonathan/core-2238-loki-in-testpachd' into rau/core-223…
robert-uhl Apr 3, 2024
eb2925c
Split log batching and cleaning
robert-uhl Apr 3, 2024
7fc1f43
Pull pachctl context restoration into its own function
robert-uhl Apr 3, 2024
fd7e870
Use Loki to log when building test pachd
robert-uhl Apr 3, 2024
969b2d8
Remove cleanup from main
robert-uhl Apr 3, 2024
08ee07f
Run testpachd in a goroutine
robert-uhl Apr 3, 2024
ebcae9c
Delint
robert-uhl Apr 3, 2024
182c7cd
Switch nontest dockertestenv PostgreSQL from cleanup
robert-uhl Apr 3, 2024
c820c29
Remove cleanup
robert-uhl Apr 3, 2024
0fc35f0
Gazelle
robert-uhl Apr 3, 2024
11530c9
Merge branch 'untangle-testpachd' into rau/core-2238-loki-in-testpachd
robert-uhl Apr 3, 2024
37b4b6f
Merge branch 'master' into jonathan/core-2238-loki-in-testpachd
robert-uhl Apr 3, 2024
e9ab0a8
Merge branch 'jonathan/core-2238-loki-in-testpachd' into rau/core-223…
robert-uhl Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions src/internal/lokiutil/testloki/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

# gazelle:go_test file

go_library(
name = "testloki",
testonly = 1,
srcs = ["testloki.go"],
srcs = [
"logger.go",
"testloki.go",
],
data = [
"config.yaml",
"//tools/loki",
Expand All @@ -14,10 +18,14 @@ go_library(
"//src/internal/errors",
"//src/internal/log",
"//src/internal/lokiutil/client",
"//src/internal/pachconfig",
"//src/internal/pachd",
"//src/internal/pctx",
"//src/internal/promutil",
"//src/internal/randutil",
"@in_gopkg_yaml_v3//:yaml_v3",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
"@rules_go//go/runfiles:go_default_library",
"@rules_go//go/tools/bazel:go_default_library",
],
Expand All @@ -34,3 +42,16 @@ go_test(
"@com_github_google_go_cmp//cmp",
],
)

go_test(
name = "logger_test",
size = "small",
srcs = ["logger_test.go"],
deps = [
":testloki",
"//src/internal/pachd",
"//src/internal/pctx",
"//src/logs",
"@org_golang_google_protobuf//encoding/protojson",
],
)
138 changes: 138 additions & 0 deletions src/internal/lokiutil/testloki/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package testloki

import (
"context"
"fmt"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
"github.com/pachyderm/pachyderm/v2/src/internal/pachconfig"
"github.com/pachyderm/pachyderm/v2/src/internal/pachd"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/randutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func WithTestLoki(l *TestLoki) pachd.TestPachdOption {
return pachd.TestPachdOption{
MutateEnv: func(env *pachd.Env) {
env.GetLokiClient = func() (*client.Client, error) {
return l.Client, nil
}
},
MutateConfig: func(config *pachconfig.PachdFullConfiguration) {
config.LokiLogging = true
},
MutateContext: func(ctx context.Context) context.Context {
templateHash := randutil.UniqueString("")[0:10]
procHash := randutil.UniqueString("")[0:5]
return pctx.Child(ctx, "", l.WithLoki(ctx, map[string]string{
"host": "localhost",
"app": "pachd",
"container": "pachd",
"node_name": "localhost",
"pod": fmt.Sprintf("pachd-%v-%v", templateHash, procHash),
"pod_template_hash": templateHash,
"stream": "stderr",
"suite": "pachyderm",
}))
},
}
}

// newZapCore returns a zapcore.Core that sends logs to this Loki instance.
func (l *TestLoki) newZapCore(ctx context.Context) *lokiCore {
return &lokiCore{
ctx: ctx,
l: l,
enc: zapcore.NewJSONEncoder(zapcore.EncoderConfig{
TimeKey: "time",
EncodeTime: zapcore.RFC3339NanoTimeEncoder,
LevelKey: "severity",
EncodeLevel: zapcore.LowercaseLevelEncoder,
MessageKey: "message",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
labels: map[string]string{"suite": "pachyderm"},
}
}

// WithLoki returns a pctx context option that will send logs to this Loki instance.
func (l *TestLoki) WithLoki(sendCtx context.Context, lokiLabels map[string]string) pctx.Option {
lc := l.newZapCore(sendCtx)
lc.labels = lokiLabels
return pctx.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(c, lc)
}))
}

// lokiCore sends JSON log messages to the provided TestLoki instance. If you were going to use
// this code in production, you would want to skip the JSON serialization step, buffer lines to send
// them in batches, and read the time from the Entry instead of using time.Now(). (We use
// time.Now() here because it's more like what promtail does; reads the line and adds its own
// timestamp.)
type lokiCore struct {
ctx context.Context
l *TestLoki
enc zapcore.Encoder
labels map[string]string
}

var _ zapcore.Core = (*lokiCore)(nil)

// Enabled implements zapcore.LevelEnabler.
func (c *lokiCore) Enabled(zapcore.Level) bool {
return true
}

// With implements zapcore.Core.
func (c *lokiCore) With(fields []zapcore.Field) zapcore.Core {
enc := c.enc.Clone()
for _, f := range fields {
f.AddTo(enc)
}
return &lokiCore{ctx: c.ctx, l: c.l, enc: enc, labels: c.labels}
}

// Check implements zapcore.Core.
func (c *lokiCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(e.Level) {
return ce.AddCore(e, c)
}
return ce
}

// Write implements zapcore.Core.
func (c *lokiCore) Write(e zapcore.Entry, fields []zapcore.Field) error {
select {
case <-c.ctx.Done():
// For tests, we do not care about writes failing beacuse the root context is done.
return nil
default:
}
buf, err := c.enc.EncodeEntry(e, fields)
if err != nil {
return errors.Wrap(err, "encode log entry")
}
if err := c.l.AddLog(c.ctx, &Log{
Time: time.Now(),
Labels: c.labels,
Message: string(buf.Bytes()),
}); err != nil {
return errors.Wrap(err, "send log to loki")
}
return nil
}

// Sync implements zapcore.Core.
func (c *lokiCore) Sync() error {
return nil
}
54 changes: 54 additions & 0 deletions src/internal/lokiutil/testloki/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package testloki_test

import (
"context"
"testing"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/testloki"
"github.com/pachyderm/pachyderm/v2/src/internal/pachd"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/logs"
"google.golang.org/protobuf/encoding/protojson"
)

func TestTestPachd(t *testing.T) {
ctx := pctx.TestContext(t)
l, err := testloki.New(ctx, t.TempDir())
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := l.Close(); err != nil {
t.Fatalf("close loki: %v", err)
}
})
pd := pachd.NewTestPachd(t, testloki.WithTestLoki(l))

tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
gls, err := pd.LogsClient.GetLogs(tctx, &logs.GetLogsRequest{
Query: &logs.LogQuery{
QueryType: &logs.LogQuery_Admin{
Admin: &logs.AdminLogQuery{
AdminType: &logs.AdminLogQuery_Logql{
Logql: `{host=~ ".+"}`,
},
},
},
},
Filter: &logs.LogFilter{
Limit: 1,
},
LogFormat: logs.LogFormat_LOG_FORMAT_VERBATIM_WITH_TIMESTAMP,
})
if err != nil {
t.Fatalf("GetLogs: %v", err)
}
res, err := gls.Recv()
if err != nil {
// io.EOF is an error here.
t.Fatalf("Recv: %v", err)
}
t.Log(protojson.Format(res))
}
1 change: 1 addition & 0 deletions src/internal/pachd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//src/internal/middleware/auth",
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/logging/client",
"//src/internal/middleware/validation",
"//src/internal/middleware/version",
"//src/internal/migrations",
Expand Down
49 changes: 47 additions & 2 deletions src/internal/pachd/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,33 @@ import (
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
lokiclient "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
auth_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
clientlog_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging/client"
"github.com/pachyderm/pachyderm/v2/src/internal/obj"
"github.com/pachyderm/pachyderm/v2/src/internal/pachconfig"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/task"
"github.com/pachyderm/pachyderm/v2/src/internal/transactionenv"
"github.com/pachyderm/pachyderm/v2/src/license"
"github.com/pachyderm/pachyderm/v2/src/logs"
"github.com/pachyderm/pachyderm/v2/src/metadata"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
"github.com/pachyderm/pachyderm/v2/src/proxy"
admin_server "github.com/pachyderm/pachyderm/v2/src/server/admin/server"
auth_iface "github.com/pachyderm/pachyderm/v2/src/server/auth"
auth_server "github.com/pachyderm/pachyderm/v2/src/server/auth/server"
debug_server "github.com/pachyderm/pachyderm/v2/src/server/debug/server"
entiface "github.com/pachyderm/pachyderm/v2/src/server/enterprise"
ent_server "github.com/pachyderm/pachyderm/v2/src/server/enterprise/server"
license_server "github.com/pachyderm/pachyderm/v2/src/server/license/server"
logs_server "github.com/pachyderm/pachyderm/v2/src/server/logs/server"
metadata_server "github.com/pachyderm/pachyderm/v2/src/server/metadata/server"
pfsiface "github.com/pachyderm/pachyderm/v2/src/server/pfs"
pfs_server "github.com/pachyderm/pachyderm/v2/src/server/pfs/server"
ppsiface "github.com/pachyderm/pachyderm/v2/src/server/pps"
pps_server "github.com/pachyderm/pachyderm/v2/src/server/pps/server"
proxy_server "github.com/pachyderm/pachyderm/v2/src/server/proxy/server"
txn_server "github.com/pachyderm/pachyderm/v2/src/server/transaction/server"
"github.com/pachyderm/pachyderm/v2/src/transaction"
version_server "github.com/pachyderm/pachyderm/v2/src/version"
Expand Down Expand Up @@ -194,6 +199,8 @@ type Full struct {
enterpriseSrv enterprise.APIServer
licenseSrv license.APIServer
debugSrv debug.DebugServer
proxySrv proxy.APIServer
logsSrv logs.APIServer

pfsWorker *pfs_server.Worker
ppsWorker *pps_server.Worker
Expand Down Expand Up @@ -423,6 +430,28 @@ func NewFull(env Env, config pachconfig.PachdFullConfiguration) *Full {
return nil
},
},
setupStep{
Name: "initProxyServer",
Fn: func(ctx context.Context) error {
pd.proxySrv = proxy_server.NewAPIServer(proxy_server.Env{
Listener: pd.dbListener,
})
return nil
},
},
setupStep{
Name: "initLogsServer",
Fn: func(ctx context.Context) error {
var err error
pd.logsSrv, err = logs_server.NewAPIServer(logs_server.Env{
GetLokiClient: env.GetLokiClient,
})
if err != nil {
return errors.Wrap(err, "logs_server.NewAPIServer")
}
return nil
},
},

// Workers
initPFSWorker(&pd.pfsWorker, config.StorageConfiguration, func() pfs_server.WorkerEnv {
Expand Down Expand Up @@ -454,9 +483,11 @@ func NewFull(env Env, config pachconfig.PachdFullConfiguration) *Full {
enterprise.RegisterAPIServer(gs, pd.enterpriseSrv)
grpc_health_v1.RegisterHealthServer(gs, pd.healthSrv)
license.RegisterAPIServer(gs, pd.licenseSrv)
logs.RegisterAPIServer(gs, pd.logsSrv)
metadata.RegisterAPIServer(gs, pd.metadataSrv)
pfs.RegisterAPIServer(gs, pd.pfsSrv)
pps.RegisterAPIServer(gs, pd.ppsSrv)
proxy.RegisterAPIServer(gs, pd.proxySrv)
version.RegisterAPIServer(gs, pd.version)
}))
pd.addBackground("bootstrap", func(ctx context.Context) error {
Expand All @@ -483,7 +514,14 @@ func (pd *Full) PachClient(ctx context.Context) (*client.APIClient, error) {
if err != nil {
return nil, errors.Wrap(err, "parse pachd address")
}
c, err := client.NewFromPachdAddress(ctx, addr)
c, err := client.NewFromPachdAddress(ctx, addr,
client.WithAdditionalUnaryClientInterceptors(
clientlog_interceptor.LogUnary,
),
client.WithAdditionalStreamClientInterceptors(
clientlog_interceptor.LogStream,
),
)
if err != nil {
return nil, errors.Wrap(err, "NewPachdFromAddress")
}
Expand All @@ -499,7 +537,14 @@ func (pd *Full) mustGetPachClient(ctx context.Context) *client.APIClient {
if err != nil {
panic(fmt.Sprintf("parse pachd address: %v", err))
}
c, err := client.NewFromPachdAddress(ctx, addr)
c, err := client.NewFromPachdAddress(ctx, addr,
client.WithAdditionalUnaryClientInterceptors(
clientlog_interceptor.LogUnary,
),
client.WithAdditionalStreamClientInterceptors(
clientlog_interceptor.LogStream,
),
)
if err != nil {
panic(fmt.Sprintf("NewFromPachdAddress: %v", err))
}
Expand Down