Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-2124] Add support for response in JSON and legacy PPS LogMessage formats #9896

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 60 additions & 3 deletions src/server/logs/logs.go
Expand Up @@ -5,13 +5,17 @@ import (
"fmt"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/pachyderm/pachyderm/v2/src/logs"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
loki "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
"github.com/pachyderm/pachyderm/v2/src/logs"
"github.com/pachyderm/pachyderm/v2/src/pps"
)

type ResponsePublisher interface {
Expand All @@ -29,6 +33,8 @@ var (
ErrUnimplemented = errors.New("unimplemented")
// ErrPublish is returned whenever publishing fails (say, due to a closed client).
ErrPublish = errors.New("error publishing")
// ErrLogFormat returned if log line does not match requested log format
ErrLogFormat = errors.New("error invalid log format")
)

// GetLogs gets logs according its request and publishes them. The pattern is
Expand Down Expand Up @@ -103,12 +109,63 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest,
Log: &logs.LogMessage{
LogType: &logs.LogMessage_Verbatim{
Verbatim: &logs.VerbatimLogMessage{
Line: []byte(e.Line),
Line: []byte(e.Line),
Timestamp: timestamppb.New(e.Timestamp),
},
},
},
},
}
case logs.LogFormat_LOG_FORMAT_PARSED_JSON:
resp = &logs.GetLogsResponse{
ResponseType: &logs.GetLogsResponse_Log{
Log: &logs.LogMessage{
LogType: &logs.LogMessage_Json{
Json: &logs.ParsedJSONLogMessage{
Verbatim: &logs.VerbatimLogMessage{
Line: []byte(e.Line),
Timestamp: timestamppb.New(e.Timestamp),
},
NativeTimestamp: timestamppb.New(e.Timestamp),
},
},
},
},
}
jsonStruct := new(structpb.Struct)
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there needs to be some sophistication here to correct for broken Loki installs, which are common in the wild: https://github.com/pachyderm/pachyderm/blob/master/src/server/pps/server/api_server.go#L1780

Basically, we have to try and unmarshal and look for known fields. If the fields are "log" and "timestamp", it needs to be unwrapped from docker's logging wrappper (unlikely as k8s doesn't use docker anymore). If the line isn't JSON, then it's probably the CRI format, so that needs to be unwrapped and then reparsed.

log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", e.Line))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should figure out what to do here; I think it would be better to synthesize something like {"line": e.Line, "timestamp": e.Timestamp}. Clients won't be looking at the PPS message response if they requested JSON.

} else {
resp.GetLog().GetJson().Object = jsonStruct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typical pattern one sees is to bail if err != nil. Do you want to continue in case of an error? If you do, do you want to log the error at least? I’m not certain if an empty response is terribly useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
ppsLog := new(pps.LogMessage)
m := protojson.UnmarshalOptions{
AllowPartial: true,
DiscardUnknown: true,
}
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil {
log.Error(ctx, "failed to unmarshal json into PpsLogMessage", zap.Error(err), zap.String("line", e.Line))
} else {
resp.GetLog().GetJson().PpsLogMessage = ppsLog
}
case logs.LogFormat_LOG_FORMAT_PPS_LOGMESSAGE:
ppsLog := new(pps.LogMessage)
m := protojson.UnmarshalOptions{
AllowPartial: true,
DiscardUnknown: true,
}
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil {
return errors.Wrapf(ErrLogFormat, "log line cannot be formatted as %v", request.LogFormat, zap.String("line", e.Line))
}
resp = &logs.GetLogsResponse{
ResponseType: &logs.GetLogsResponse_Log{
Log: &logs.LogMessage{
LogType: &logs.LogMessage_PpsLogMessage{
PpsLogMessage: ppsLog,
},
},
},
}
default:
return errors.Wrapf(ErrUnimplemented, "%v not supported", request.LogFormat)
}
Expand Down