-
Notifications
You must be signed in to change notification settings - Fork 566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2124] Add support for response in JSON and legacy PPS LogMessage formats #9896
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,17 @@ import ( | |
"fmt" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
"google.golang.org/protobuf/encoding/protojson" | ||
"google.golang.org/protobuf/proto" | ||
"google.golang.org/protobuf/types/known/structpb" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
|
||
"github.com/pachyderm/pachyderm/v2/src/logs" | ||
|
||
"github.com/pachyderm/pachyderm/v2/src/internal/errors" | ||
"github.com/pachyderm/pachyderm/v2/src/internal/log" | ||
loki "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client" | ||
"github.com/pachyderm/pachyderm/v2/src/logs" | ||
"github.com/pachyderm/pachyderm/v2/src/pps" | ||
) | ||
|
||
type ResponsePublisher interface { | ||
|
@@ -29,6 +33,8 @@ var ( | |
ErrUnimplemented = errors.New("unimplemented") | ||
// ErrPublish is returned whenever publishing fails (say, due to a closed client). | ||
ErrPublish = errors.New("error publishing") | ||
// ErrLogFormat returned if log line does not match requested log format | ||
ErrLogFormat = errors.New("error invalid log format") | ||
) | ||
|
||
// GetLogs gets logs according its request and publishes them. The pattern is | ||
|
@@ -103,12 +109,63 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest, | |
Log: &logs.LogMessage{ | ||
LogType: &logs.LogMessage_Verbatim{ | ||
Verbatim: &logs.VerbatimLogMessage{ | ||
Line: []byte(e.Line), | ||
Line: []byte(e.Line), | ||
Timestamp: timestamppb.New(e.Timestamp), | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
case logs.LogFormat_LOG_FORMAT_PARSED_JSON: | ||
resp = &logs.GetLogsResponse{ | ||
ResponseType: &logs.GetLogsResponse_Log{ | ||
Log: &logs.LogMessage{ | ||
LogType: &logs.LogMessage_Json{ | ||
Json: &logs.ParsedJSONLogMessage{ | ||
Verbatim: &logs.VerbatimLogMessage{ | ||
Line: []byte(e.Line), | ||
Timestamp: timestamppb.New(e.Timestamp), | ||
}, | ||
NativeTimestamp: timestamppb.New(e.Timestamp), | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
jsonStruct := new(structpb.Struct) | ||
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err != nil { | ||
log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", e.Line)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should figure out what to do here; I think it would be better to synthesize something like |
||
} else { | ||
resp.GetLog().GetJson().Object = jsonStruct | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The typical pattern one sees is to bail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
ppsLog := new(pps.LogMessage) | ||
m := protojson.UnmarshalOptions{ | ||
AllowPartial: true, | ||
DiscardUnknown: true, | ||
} | ||
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil { | ||
log.Error(ctx, "failed to unmarshal json into PpsLogMessage", zap.Error(err), zap.String("line", e.Line)) | ||
} else { | ||
resp.GetLog().GetJson().PpsLogMessage = ppsLog | ||
} | ||
case logs.LogFormat_LOG_FORMAT_PPS_LOGMESSAGE: | ||
ppsLog := new(pps.LogMessage) | ||
m := protojson.UnmarshalOptions{ | ||
AllowPartial: true, | ||
DiscardUnknown: true, | ||
} | ||
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil { | ||
return errors.Wrapf(ErrLogFormat, "log line cannot be formatted as %v", request.LogFormat, zap.String("line", e.Line)) | ||
} | ||
resp = &logs.GetLogsResponse{ | ||
ResponseType: &logs.GetLogsResponse_Log{ | ||
Log: &logs.LogMessage{ | ||
LogType: &logs.LogMessage_PpsLogMessage{ | ||
PpsLogMessage: ppsLog, | ||
}, | ||
}, | ||
}, | ||
} | ||
default: | ||
return errors.Wrapf(ErrUnimplemented, "%v not supported", request.LogFormat) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, there needs to be some sophistication here to correct for broken Loki installs, which are common in the wild: https://github.com/pachyderm/pachyderm/blob/master/src/server/pps/server/api_server.go#L1780
Basically, we have to try and unmarshal and look for known fields. If the fields are "log" and "timestamp", it needs to be unwrapped from docker's logging wrappper (unlikely as k8s doesn't use docker anymore). If the line isn't JSON, then it's probably the CRI format, so that needs to be unwrapped and then reparsed.