Skip to content

Commit

Permalink
move mockpachd's panic handler into a package; use in real and test p…
Browse files Browse the repository at this point in the history
…achds
  • Loading branch information
jrockway committed Mar 29, 2024
1 parent 19ddc28 commit 94d5f9d
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/internal/middleware/logging/BUILD.bazel
Expand Up @@ -13,6 +13,7 @@ go_library(
"//src/internal/errors",
"//src/internal/log",
"//src/internal/middleware/auth",
"//src/internal/middleware/recovery",
"//src/internal/pctx",
"@com_github_fatih_camelcase//:camelcase",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
14 changes: 13 additions & 1 deletion src/internal/middleware/logging/interceptor.go
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"reflect"
"runtime/trace"
"strings"
"time"

"go.uber.org/zap"
Expand All @@ -19,6 +20,7 @@ import (

"github.com/pachyderm/pachyderm/v2/src/internal/log"
mauth "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
Expand Down Expand Up @@ -85,6 +87,14 @@ func isNilInterface(x interface{}) bool {
return x == nil || (val.Kind() == reflect.Ptr && val.IsNil())
}

// checkPanic upgrades panics to Error level.
func checkPanic(orig log.Level, err error) log.Level {
if s, ok := status.FromError(err); ok && s.Code() == recovery.PanicCode && strings.HasPrefix(s.Message(), "panic: ") {
return log.ErrorLevel
}
return orig
}

func getCommonLogger(ctx context.Context, service, method string) context.Context {
var f []log.Field
f = append(f, zap.String("service", service), zap.String("method", method))
Expand Down Expand Up @@ -155,7 +165,7 @@ func getResponseLogger(ctx context.Context, res any, sent, rcvd int, err error)
f = append(f, zap.Error(err))
}
// FromError is pretty weird. It returns (status=nil, ok=true) for nil errors. It's OK to
// call methods on a nil status, though. It also returns stats=Unknown, ok=false if the
// call methods on a nil status, though. It also returns status=Unknown, ok=false if the
// error doesn't have a gRPC code in it. So we want to copy status information into the log
// even when ok is false.
s, _ := status.FromError(err)
Expand Down Expand Up @@ -203,6 +213,7 @@ func (li *LoggingInterceptor) UnaryServerInterceptor(ctx context.Context, req in
if config.leveler != nil && !isNilInterface(resp) {
lvl = config.leveler(lvl, resp, retErr)
}
lvl = checkPanic(lvl, retErr)
li.logUnaryAfter(getResponseLogger(ctx, logResp, 1, 1, retErr), lvl, service, method, start, retErr)
}()

Expand Down Expand Up @@ -274,6 +285,7 @@ func (li *LoggingInterceptor) StreamServerInterceptor(srv interface{}, stream gr
if config.leveler != nil && !isNilInterface(resp) {
lvl = config.leveler(lvl, resp, retErr)
}
lvl = checkPanic(lvl, retErr)
li.logUnaryAfter(resCtx, lvl, service, method, start, retErr)
}()

Expand Down
13 changes: 13 additions & 0 deletions src/internal/middleware/recovery/BUILD.bazel
@@ -0,0 +1,13 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "recovery",
srcs = ["recovery.go"],
importpath = "github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery",
visibility = ["//src:__subpackages__"],
deps = [
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
47 changes: 47 additions & 0 deletions src/internal/middleware/recovery/recovery.go
@@ -0,0 +1,47 @@
// Package recovery implements a GRPC server interceptor that recovers from panicking RPCs.
package recovery

import (
"context"
"runtime"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
PanicCode = codes.Aborted
panicLen = 32768
)

// UnaryServerInterceptor implements grpc.UnaryServerInterceptor. It recovers from a panicking RPC
// handler.
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, panicLen)
n := runtime.Stack(stack, false)
stack = stack[:n]
retErr = status.Errorf(PanicCode, "panic: %v\n%s", err, stack)
}
}()
return handler(ctx, req)
}

var _ grpc.UnaryServerInterceptor = UnaryServerInterceptor

// StreamServerInterceptor implements grpc.StreamServerInterceptor. It recovers from a panicking
// RPC handler.
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, panicLen)
runtime.Stack(stack, false)
retErr = status.Errorf(PanicCode, "panic: %v\n%s", err, stack)
}
}()
return handler(srv, ss)
}

var _ grpc.StreamServerInterceptor = StreamServerInterceptor
1 change: 1 addition & 0 deletions src/internal/pachd/BUILD.bazel
Expand Up @@ -41,6 +41,7 @@ go_library(
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/logging/client",
"//src/internal/middleware/recovery",
"//src/internal/middleware/validation",
"//src/internal/middleware/version",
"//src/internal/migrations",
Expand Down
5 changes: 5 additions & 0 deletions src/internal/pachd/builder.go
Expand Up @@ -26,6 +26,7 @@ import (
authmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
loggingmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/validation"
version_middleware "github.com/pachyderm/pachyderm/v2/src/internal/middleware/version"
"github.com/pachyderm/pachyderm/v2/src/internal/migrations"
Expand Down Expand Up @@ -174,13 +175,15 @@ func (b *builder) initInternalServer(ctx context.Context) error {
b.authInterceptor.InterceptUnary,
b.loggingInterceptor.UnaryServerInterceptor,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
errorsmw.StreamServerInterceptor,
tracing.StreamServerInterceptor(),
b.authInterceptor.InterceptStream,
b.loggingInterceptor.StreamServerInterceptor,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
return err
Expand All @@ -198,6 +201,7 @@ func (b *builder) initExternalServer(ctx context.Context) error {
b.authInterceptor.InterceptUnary,
b.loggingInterceptor.UnaryServerInterceptor,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
errorsmw.StreamServerInterceptor,
Expand All @@ -206,6 +210,7 @@ func (b *builder) initExternalServer(ctx context.Context) error {
b.authInterceptor.InterceptStream,
b.loggingInterceptor.StreamServerInterceptor,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
return err
Expand Down
3 changes: 3 additions & 0 deletions src/internal/pachd/setup.go
Expand Up @@ -13,6 +13,7 @@ import (
auth_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
log_interceptor "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/validation"
version_middleware "github.com/pachyderm/pachyderm/v2/src/internal/middleware/version"
"github.com/pachyderm/pachyderm/v2/src/internal/migrations"
Expand Down Expand Up @@ -231,6 +232,7 @@ func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener,
baseContextInterceptor.UnaryServerInterceptor,
loggingInterceptor.UnaryServerInterceptor,
validation.UnaryServerInterceptor,
recovery.UnaryServerInterceptor,
),
grpc.ChainStreamInterceptor(
errorsmw.StreamServerInterceptor,
Expand All @@ -240,6 +242,7 @@ func newServeGRPC(authInterceptor *auth_interceptor.Interceptor, l net.Listener,
baseContextInterceptor.StreamServerInterceptor,
loggingInterceptor.StreamServerInterceptor,
validation.StreamServerInterceptor,
recovery.StreamServerInterceptor,
),
)
reg(gs)
Expand Down
3 changes: 1 addition & 2 deletions src/internal/testpachd/BUILD.bazel
Expand Up @@ -20,6 +20,7 @@ go_library(
"//src/internal/middleware/auth",
"//src/internal/middleware/errors",
"//src/internal/middleware/logging",
"//src/internal/middleware/recovery",
"//src/internal/middleware/validation",
"//src/internal/pctx",
"//src/internal/require",
Expand All @@ -36,8 +37,6 @@ go_library(
"//src/transaction",
"//src/version/versionpb",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/emptypb",
"@org_golang_x_sync//errgroup",
],
Expand Down
27 changes: 3 additions & 24 deletions src/internal/testpachd/mock_pachd.go
Expand Up @@ -3,11 +3,8 @@ package testpachd
import (
"context"
"net"
"runtime"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/pachyderm/pachyderm/v2/src/admin"
Expand All @@ -30,6 +27,7 @@ import (
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
errorsmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/errors"
loggingmw "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging"
"github.com/pachyderm/pachyderm/v2/src/internal/middleware/recovery"
"github.com/pachyderm/pachyderm/v2/src/internal/transactionenv/txncontext"
authserver "github.com/pachyderm/pachyderm/v2/src/server/auth"
version "github.com/pachyderm/pachyderm/v2/src/version/versionpb"
Expand Down Expand Up @@ -2217,32 +2215,13 @@ func NewMockPachd(ctx context.Context, port uint16, options ...InterceptorOption
errorsmw.UnaryServerInterceptor,
loggingInterceptor.UnaryServerInterceptor,
validation.UnaryServerInterceptor,
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 16384)
n := runtime.Stack(stack, false)
stack = stack[:n]
retErr = status.Errorf(codes.Aborted, "panic: %v\n%s", err, stack)
}
}()
return handler(ctx, req)
},
recovery.UnaryServerInterceptor,
}
streamOpts := []grpc.StreamServerInterceptor{
errorsmw.StreamServerInterceptor,
loggingInterceptor.StreamServerInterceptor,
validation.StreamServerInterceptor,
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (retErr error) {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 16384)
runtime.Stack(stack, false)
retErr = status.Errorf(codes.Aborted, "panic: %v\n%s", err, stack)
}
}()
return handler(srv, ss)
},
recovery.StreamServerInterceptor,
}
for _, opt := range options {
interceptor := opt(mock)
Expand Down

0 comments on commit 94d5f9d

Please sign in to comment.