Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move mockpachd's panic handler into a package; use in real and test pachds #9907

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/internal/middleware/logging/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
deps = [
"//src/internal/errors",
"//src/internal/log",
"//src/internal/middleware/auth",
"//src/internal/middleware/recovery",
"//src/internal/pctx",
"@com_github_fatih_camelcase//:camelcase",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
12 changes: 12 additions & 0 deletions src/internal/middleware/logging/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"reflect"
"runtime/trace"
"strings"
"time"

"go.uber.org/zap"
Expand All @@ -18,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
Expand Down Expand Up @@ -84,6 +86,14 @@ func isNilInterface(x interface{}) bool {
return x == nil || (val.Kind() == reflect.Ptr && val.IsNil())
}

// checkPanic upgrades panics to Error level.
func checkPanic(orig log.Level, err error) log.Level {
if s, ok := status.FromError(err); ok && s.Code() == recovery.PanicCode && strings.HasPrefix(s.Message(), "panic: ") {
return log.ErrorLevel
}
return orig
}

func getCommonLogger(ctx context.Context, service, method string) context.Context {
var f []log.Field
f = append(f, zap.String("service", service), zap.String("method", method))
Expand Down Expand Up @@ -203,6 +213,7 @@ func (li *LoggingInterceptor) UnaryAnnounce(ctx context.Context, req interface{}
if config.leveler != nil && !isNilInterface(resp) {
lvl = config.leveler(lvl, resp, retErr)
}
lvl = checkPanic(lvl, retErr)
li.logUnaryAfter(getResponseLogger(ctx, logResp, 1, 1, retErr), lvl, service, method, start, retErr)
}()

Expand Down Expand Up @@ -282,6 +293,7 @@ func (li *LoggingInterceptor) StreamAnnounce(srv interface{}, stream grpc.Server
if config.leveler != nil && !isNilInterface(resp) {
lvl = config.leveler(lvl, resp, retErr)
}
lvl = checkPanic(lvl, retErr)
li.logUnaryAfter(resCtx, lvl, service, method, start, retErr)
}()
return handler(srv, wrapper)
Expand Down
13 changes: 13 additions & 0 deletions src/internal/middleware/recovery/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "recovery",
srcs = ["recovery.go"],
importpath = "github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery",
visibility = ["//src:__subpackages__"],
deps = [
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
47 changes: 47 additions & 0 deletions src/internal/middleware/recovery/recovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Package recovery implements a GRPC server interceptor that recovers from panicking RPCs.
package recovery

import (
"context"
"runtime"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
PanicCode = codes.Aborted
panicLen = 32768
)

// UnaryServerInterceptor implements grpc.UnaryServerInterceptor. It recovers from a panicking RPC
// handler.
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, panicLen)
n := runtime.Stack(stack, false)
stack = stack[:n]
retErr = status.Errorf(PanicCode, "panic: %v\n%s", err, stack)
}
}()
return handler(ctx, req)
}

var _ grpc.UnaryServerInterceptor = UnaryServerInterceptor

// StreamServerInterceptor implements grpc.StreamServerInterceptor. It recovers from a panicking
// RPC handler.
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, panicLen)
runtime.Stack(stack, false)
retErr = status.Errorf(PanicCode, "panic: %v\n%s", err, stack)
}
}()
return handler(srv, ss)
}

var _ grpc.StreamServerInterceptor = StreamServerInterceptor
1 change: 1 addition & 0 deletions src/internal/pachd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/logging/client",
"//src/internal/middleware/recovery",
"//src/internal/middleware/validation",
"//src/internal/middleware/version",
"//src/internal/migrations",
Expand Down
5 changes: 5 additions & 0 deletions src/internal/pachd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
authmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
loggingmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/validation"
version_middleware "github.com/pachyderm/pachyderm/v2/src/internal/middleware/version"
"github.com/pachyderm/pachyderm/v2/src/internal/migrations"
Expand Down Expand Up @@ -177,6 +178,7 @@ func (b *builder) initInternalServer(ctx context.Context) error {
b.authInterceptor.InterceptUnary,
b.loggingInterceptor.UnaryAnnounce,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
b.loggingInterceptor.StreamSetup,
Expand All @@ -185,6 +187,7 @@ func (b *builder) initInternalServer(ctx context.Context) error {
b.authInterceptor.InterceptStream,
b.loggingInterceptor.StreamAnnounce,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
return err
Expand All @@ -203,6 +206,7 @@ func (b *builder) initExternalServer(ctx context.Context) error {
b.authInterceptor.InterceptUnary,
b.loggingInterceptor.UnaryAnnounce,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
b.loggingInterceptor.StreamSetup,
Expand All @@ -212,6 +216,7 @@ func (b *builder) initExternalServer(ctx context.Context) error {
b.authInterceptor.InterceptStream,
b.loggingInterceptor.StreamAnnounce,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
return err
Expand Down
3 changes: 3 additions & 0 deletions src/internal/pachd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
auth_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
log_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/validation"
version_middleware "github.com/pachyderm/pachyderm/v2/src/internal/middleware/version"
"github.com/pachyderm/pachyderm/v2/src/internal/migrations"
Expand Down Expand Up @@ -246,6 +247,7 @@ func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener,
authInterceptor.InterceptUnary,
loggingInterceptor.UnaryAnnounce,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
baseContextInterceptor.StreamServerInterceptor,
Expand All @@ -256,6 +258,7 @@ func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener,
authInterceptor.InterceptStream,
loggingInterceptor.StreamAnnounce,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
reg(gs)
Expand Down
3 changes: 1 addition & 2 deletions src/internal/testpachd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//src/internal/middleware/auth",
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/recovery",
"//src/internal/middleware/validation",
"//src/internal/pctx",
"//src/internal/require",
Expand All @@ -37,8 +38,6 @@ go_library(
"//src/transaction",
"//src/version/versionpb",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/emptypb",
"@org_golang_x_sync//errgroup",
],
Expand Down
27 changes: 3 additions & 24 deletions src/internal/testpachd/mock_pachd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package testpachd
import (
"context"
"net"
"runtime"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/pachyderm/pachyderm/v2/src/admin"
Expand All @@ -31,6 +28,7 @@ import (
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
loggingmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/transactionenv/txncontext"
authserver "github.com/pachyderm/pachyderm/v2/src/server/auth"
version "github.com/pachyderm/pachyderm/v2/src/version/versionpb"
Expand Down Expand Up @@ -2296,31 +2294,10 @@ func NewMockPachd(ctx context.Context, port uint16, options ...InterceptorOption

loggingInterceptor := loggingmw.NewLoggingInterceptor(ctx)
unaryOpts := []grpc.UnaryServerInterceptor{
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 16384)
n := runtime.Stack(stack, false)
stack = stack[:n]
retErr = status.Errorf(codes.Aborted, "panic: %v\n%s", err, stack)
}
}()
return handler(ctx, req)
},
errorsmw.UnaryServerInterceptor,
loggingInterceptor.UnarySetup,
}
streamOpts := []grpc.StreamServerInterceptor{
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 16384)
runtime.Stack(stack, false)
retErr = status.Errorf(codes.Aborted, "panic: %v\n%s", err, stack)
}
}()
return handler(srv, ss)
},
errorsmw.StreamServerInterceptor,
loggingInterceptor.StreamSetup,
}
Expand All @@ -2336,10 +2313,12 @@ func NewMockPachd(ctx context.Context, port uint16, options ...InterceptorOption
unaryOpts = append(unaryOpts,
loggingInterceptor.UnaryAnnounce,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
)
streamOpts = append(streamOpts,
loggingInterceptor.StreamAnnounce,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
)
server, err := grpcutil.NewServer(ctx, false,
grpc.ChainUnaryInterceptor(
Expand Down