Skip to content

Commit

Permalink
log line format errors
Browse files Browse the repository at this point in the history
  • Loading branch information
zmajeed committed Apr 2, 2024
1 parent b7358b6 commit 17085fa
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/server/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"fmt"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/pachyderm/pachyderm/v2/src/logs"
"github.com/pachyderm/pachyderm/v2/src/pps"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
loki "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
"github.com/pachyderm/pachyderm/v2/src/logs"
"github.com/pachyderm/pachyderm/v2/src/pps"
)

type ResponsePublisher interface {
Expand All @@ -32,6 +33,8 @@ var (
ErrUnimplemented = errors.New("unimplemented")
// ErrPublish is returned whenever publishing fails (say, due to a closed client).
ErrPublish = errors.New("error publishing")
// ErrLogFormat returned if log line does not match requested log format
ErrLogFormat = errors.New("error invalid log format")
)

// GetLogs gets logs according its request and publishes them. The pattern is
Expand Down Expand Up @@ -130,15 +133,19 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest,
},
}
jsonStruct := new(structpb.Struct)
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err == nil {
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err != nil {
log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", e.Line))
} else {
resp.GetLog().GetJson().Object = jsonStruct
}
ppsLog := new(pps.LogMessage)
m := protojson.UnmarshalOptions{
AllowPartial: true,
DiscardUnknown: true,
}
if err := m.Unmarshal([]byte(e.Line), ppsLog); err == nil {
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil {
log.Error(ctx, "failed to unmarshal json into PpsLogMessage", zap.Error(err), zap.String("line", e.Line))
} else {
resp.GetLog().GetJson().PpsLogMessage = ppsLog
}
case logs.LogFormat_LOG_FORMAT_PPS_LOGMESSAGE:
Expand All @@ -148,7 +155,7 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest,
DiscardUnknown: true,
}
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil {
continue
return errors.Wrapf(ErrLogFormat, "log line cannot be formatted as %v", request.LogFormat, zap.String("line", e.Line))
}
resp = &logs.GetLogsResponse{
ResponseType: &logs.GetLogsResponse_Log{
Expand Down

0 comments on commit 17085fa

Please sign in to comment.