New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2124] Add support for response in JSON and legacy PPS LogMessage formats #9896
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is fine, other than the error handling. I kinda wonder if it’s worth extracting functions or methods for this, but that’s probably premature abstraction.
} | ||
jsonStruct := new(structpb.Struct) | ||
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err == nil { | ||
resp.GetLog().GetJson().Object = jsonStruct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typical pattern one sees is to bail if err != nil
. Do you want to continue in case of an error? If you do, do you want to log the error at least? I’m not certain if an empty response is terribly useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/server/logs/logs.go
Outdated
AllowPartial: true, | ||
DiscardUnknown: true, | ||
} | ||
if err := m.Unmarshal([]byte(e.Line), ppsLog); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same same here.
src/server/logs/logs.go
Outdated
DiscardUnknown: true, | ||
} | ||
if err := m.Unmarshal([]byte(e.Line), ppsLog); err != nil { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you should log the error here?
17085fa
to
9fa7636
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have some tests that cover the else
cases. You can get coverage locally with: https://github.com/pachyderm/pachyderm/blob/master/BAZEL.md#code-coverage or just get it out of the comment that CodeCov adds to the PR.
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
jsonStruct := new(structpb.Struct) | ||
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, there needs to be some sophistication here to correct for broken Loki installs, which are common in the wild: https://github.com/pachyderm/pachyderm/blob/master/src/server/pps/server/api_server.go#L1780
Basically, we have to try and unmarshal and look for known fields. If the fields are "log" and "timestamp", it needs to be unwrapped from docker's logging wrappper (unlikely as k8s doesn't use docker anymore). If the line isn't JSON, then it's probably the CRI format, so that needs to be unwrapped and then reparsed.
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
jsonStruct := new(structpb.Struct) | ||
if err := jsonStruct.UnmarshalJSON([]byte(e.Line)); err != nil { | ||
log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", e.Line)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should figure out what to do here; I think it would be better to synthesize something like {"line": e.Line, "timestamp": e.Timestamp}
. Clients won't be looking at the PPS message response if they requested JSON.
I removed INT and PFS; those reviewers were added because of a rebasing problem which is now fixed. |
No description provided.