Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-2256] Fix hint return and display #9968

Merged
merged 46 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
99d9efd
Fix hint return and display
robert-uhl Apr 22, 2024
54cca02
Don’t double-print from/to
robert-uhl Apr 22, 2024
a2a50c6
Use verbatim timestamp
robert-uhl Apr 22, 2024
837b1a9
Add offset to time range filter
robert-uhl Apr 23, 2024
d50282e
Merge branch 'master' into rau/core-2256-debug-improve-hints
robert-uhl Apr 23, 2024
d18537e
Attempt to avoid off-by-one error in pages
robert-uhl Apr 23, 2024
d833bba
Merge branch 'rau/core-2256-debug-improve-hints' of github.com:pachyd…
robert-uhl Apr 23, 2024
53824dd
Correct command hint
robert-uhl Apr 23, 2024
1c061f6
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 23, 2024
fa6a0d1
Add first time range test case
robert-uhl Apr 24, 2024
f42ba09
Add test for expected log
robert-uhl Apr 24, 2024
97d1f86
Check for presence of hint
robert-uhl Apr 24, 2024
a4090c9
Check for expected older & newer hints
robert-uhl Apr 24, 2024
0061e89
Fix test
robert-uhl Apr 24, 2024
4f76888
Remove debug line and improve defaulting
robert-uhl Apr 24, 2024
911fb05
Add options when creating a testing Loki instance
robert-uhl Apr 24, 2024
a7d554e
Add a test for getting logs with no from or until
robert-uhl Apr 24, 2024
8da2348
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 24, 2024
29ec4d0
Pre-merge commit
robert-uhl Apr 25, 2024
e3efe31
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 25, 2024
3f2e8b0
Support limit and hint
robert-uhl Apr 25, 2024
d1e07ad
Remove unused errPublisher
robert-uhl Apr 25, 2024
7f3ff9f
Properly handle offset and limit
robert-uhl Apr 25, 2024
273670e
Add another offset-related test
robert-uhl Apr 25, 2024
bdb2659
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 25, 2024
5381ad4
Test that older hint also work
robert-uhl Apr 25, 2024
0360588
Remove commented-out lines
robert-uhl Apr 25, 2024
9cdeb85
Add some backwards-running tests
robert-uhl Apr 26, 2024
d4c7c67
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 26, 2024
1456559
Merge remote-tracking branch 'origin/master' into rau/core-2256-debug…
robert-uhl Apr 26, 2024
99d3cf0
Correct test & results
robert-uhl Apr 26, 2024
0737712
Correct test
robert-uhl Apr 26, 2024
d219581
Merge branch 'master' into rau/core-2256-debug-improve-hints
robert-uhl Apr 26, 2024
95f0c3f
Add debug logs for console team
robert-uhl Apr 29, 2024
b85e7e2
Merge branch 'rau/core-2256-debug-improve-hints' of github.com:pachyd…
robert-uhl Apr 29, 2024
8654f30
Another debugging log for console
robert-uhl Apr 29, 2024
5fa4568
Another debugging log
robert-uhl Apr 29, 2024
1301174
More debugging
robert-uhl Apr 29, 2024
ee257a2
Try a fix
robert-uhl Apr 29, 2024
0754524
|| ≠ &&
robert-uhl Apr 29, 2024
db728cd
Use getter
robert-uhl Apr 29, 2024
66bb4a4
Fix tests
robert-uhl Apr 29, 2024
c9dafc8
Remove missing test
robert-uhl Apr 29, 2024
bd67878
Merge branch 'master' of github.com:pachyderm/pachyderm into rau/core…
robert-uhl Apr 30, 2024
e7cb564
Merge branch 'master' into rau/core-2256-debug-improve-hints
robert-uhl Apr 30, 2024
d32ab66
Merge branch 'master' into rau/core-2256-debug-improve-hints
robert-uhl Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/logs/cmds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//src/internal/config",
"//src/internal/pachctl",
"//src/logs",
"@com_github_alessio_shellescape//:shellescape",
"@com_github_spf13_cobra//:cobra",
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//codes",
Expand Down
13 changes: 9 additions & 4 deletions src/server/logs/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"os"
"time"

"github.com/alessio/shellescape"
robert-uhl marked this conversation as resolved.
Show resolved Hide resolved
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/pachyderm/pachyderm/v2/src/auth"
"github.com/pachyderm/pachyderm/v2/src/logs"

"github.com/pachyderm/pachyderm/v2/src/internal/client"
"github.com/pachyderm/pachyderm/v2/src/internal/cmdutil"
"github.com/pachyderm/pachyderm/v2/src/internal/config"
"github.com/pachyderm/pachyderm/v2/src/internal/pachctl"
"github.com/pachyderm/pachyderm/v2/src/logs"
)

func isAdmin(ctx context.Context, client *client.APIClient) (bool, error) {
Expand Down Expand Up @@ -197,13 +199,16 @@ func toFlags(flags map[string]string, hint *logs.GetLogsRequest) string {
var result string

if from := hint.GetFilter().GetTimeRange().GetFrom(); !from.AsTime().IsZero() {
flags["from"] = from.AsTime().Format(time.RFC3339Nano)
result += " --from " + shellescape.Quote(from.AsTime().Format(time.RFC3339Nano))
}
if until := hint.GetFilter().GetTimeRange().GetUntil(); !until.AsTime().IsZero() {
flags["to"] = until.AsTime().Format(time.RFC3339Nano)
result += " --to " + shellescape.Quote(until.AsTime().Format(time.RFC3339Nano))
}
for flag, arg := range flags {
result += " --" + flag + " " + arg
if flag == "from" || flag == "to" {
continue
}
result += " --" + flag + " " + shellescape.Quote(arg)
}
return result
}
Expand Down
67 changes: 49 additions & 18 deletions src/server/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest,
}
}
if request.WantPagingHint {
var newer, older loki.Entry
var newer, older time.Time
switch direction {
case forwardLogDirection:
newer = adapter.last
Expand All @@ -117,30 +117,60 @@ func (ls LogService) GetLogs(ctx context.Context, request *logs.GetLogsRequest,
return errors.Errorf("invalid direction %q", direction)
}
// request a record immediately prior to the page
err := doQuery(ctx, c, logQL, 1, start.Add(-700*time.Hour), start, backwardLogDirection, func(ctx context.Context, e loki.Entry) (bool, error) {
older = e
err := doQuery(ctx, c, logQL, 1, start.Add(-700*time.Hour), start, backwardLogDirection, func(ctx context.Context, entry loki.Entry) (bool, error) {

var msg = &logs.LogMessage{
Verbatim: &logs.VerbatimLogMessage{
Line: []byte(entry.Line),
Timestamp: timestamppb.New(entry.Timestamp),
},
}
msg.Object = new(structpb.Struct)
if err := msg.Object.UnmarshalJSON([]byte(entry.Line)); err != nil {
log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", entry.Line))
msg.Object = nil
} else if val := msg.Object.Fields["time"].GetStringValue(); val != "" {
if t, err := time.Parse(time.RFC3339Nano, val); err == nil {
msg.NativeTimestamp = timestamppb.New(t)
}
}
msg.PpsLogMessage = new(pps.LogMessage)
m := protojson.UnmarshalOptions{
AllowPartial: true,
DiscardUnknown: true,
}
if err := m.Unmarshal([]byte(entry.Line), msg.PpsLogMessage); err != nil {
log.Error(ctx, "failed to unmarshal json into PpsLogMessage", zap.Error(err), zap.String("line", entry.Line))
msg.PpsLogMessage = nil
} else if msg.PpsLogMessage.Ts != nil {
msg.NativeTimestamp = msg.PpsLogMessage.Ts
}

older = msg.NativeTimestamp.AsTime()
return false, nil
})
if err != nil {
return errors.Wrap(err, "hint doQuery failed")
}
if older.IsZero() {
older = request.GetFilter().TimeRange.From.AsTime()
}
if newer.IsZero() {
newer = request.GetFilter().TimeRange.Until.AsTime()
}
hint := &logs.PagingHint{
Older: proto.Clone(request).(*logs.GetLogsRequest),
Newer: proto.Clone(request).(*logs.GetLogsRequest),
}
if !older.Timestamp.IsZero() {
hint.Older.Filter.TimeRange.Until = timestamppb.New(older.Timestamp)
}
if !newer.Timestamp.IsZero() {
hint.Newer.Filter.TimeRange.From = timestamppb.New(newer.Timestamp)
}
hint.Older.Filter.TimeRange.Until = timestamppb.New(older)
hint.Newer.Filter.TimeRange.From = timestamppb.New(newer)
if request.Filter.TimeRange.From != nil && request.Filter.TimeRange.Until != nil {
delta := request.Filter.TimeRange.Until.AsTime().Sub(request.Filter.TimeRange.From.AsTime())
if !older.Timestamp.IsZero() {
hint.Older.Filter.TimeRange.From = timestamppb.New(older.Timestamp.Add(-delta))
if !older.IsZero() {
hint.Older.Filter.TimeRange.From = timestamppb.New(older.Add(-delta))
}
if !newer.Timestamp.IsZero() {
hint.Newer.Filter.TimeRange.Until = timestamppb.New(newer.Timestamp.Add(delta))
if !newer.IsZero() {
hint.Newer.Filter.TimeRange.Until = timestamppb.New(newer.Add(delta))
}
}
if err := publisher.Publish(ctx, &logs.GetLogsResponse{
Expand Down Expand Up @@ -215,7 +245,7 @@ type ResponsePublisher interface {
// An adapter publishes log entries to a ResponsePublisher in a specified format.
type adapter struct {
responsePublisher ResponsePublisher
first, last loki.Entry
first, last time.Time
gotFirst bool
pass func(*logs.LogMessage) bool
}
Expand All @@ -227,10 +257,6 @@ func (a *adapter) publish(ctx context.Context, entry loki.Entry) (bool, error) {
Timestamp: timestamppb.New(entry.Timestamp),
},
}
if !a.gotFirst {
a.gotFirst = true
a.first = entry
}
msg.Object = new(structpb.Struct)
if err := msg.Object.UnmarshalJSON([]byte(entry.Line)); err != nil {
log.Error(ctx, "failed to unmarshal json into protobuf Struct", zap.Error(err), zap.String("line", entry.Line))
Expand Down Expand Up @@ -262,5 +288,10 @@ func (a *adapter) publish(ctx context.Context, entry loki.Entry) (bool, error) {
}); err != nil {
return false, errors.WithStack(fmt.Errorf("%w response with parsed json object: %w", ErrPublish, err))
}
if !a.gotFirst {
a.gotFirst = true
a.first = msg.NativeTimestamp.AsTime()
}
a.last = msg.NativeTimestamp.AsTime()
return false, nil
}