Skip to content

Commit

Permalink
[CORE-2212] Implement pachctl logs2 --pipeline PIPELINE (#9847)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Rockway <2367+jrockway@users.noreply.github.com>
  • Loading branch information
robert-uhl and jrockway committed Mar 26, 2024
1 parent abf3e30 commit c87ec40
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/server/logs/cmds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ go_test(
"//src/internal/require",
"//src/internal/testpachd/realenv",
"//src/internal/testutil",
"//src/pfs",
],
)
29 changes: 28 additions & 1 deletion src/server/logs/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,29 @@ func newLogQLRequest(logQL string) *logs.GetLogsRequest {
}
}

func newPipelineRequest(project, pipeline string) *logs.GetLogsRequest {
return &logs.GetLogsRequest{
LogFormat: logs.LogFormat_LOG_FORMAT_VERBATIM_WITH_TIMESTAMP,
Query: &logs.LogQuery{
QueryType: &logs.LogQuery_User{
User: &logs.UserLogQuery{
UserType: &logs.UserLogQuery_Pipeline{
Pipeline: &logs.PipelineLogQuery{
Project: project,
Pipeline: pipeline,
},
},
},
},
},
}
}

func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command {
var commands []*cobra.Command

var logQL string
var logQL, pipeline string
var project = pachCtx.Project
logsCmd := &cobra.Command{
// TODO(CORE-2200): remove references to “new.”
Short: "New logs functionality",
Expand All @@ -74,7 +93,13 @@ func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command
var req *logs.GetLogsRequest
switch {
case logQL != "":
if !(project == pachCtx.Project && pipeline == "") {
fmt.Fprintln(os.Stderr, "only one of [--logQL | --project PROJECT --pipeline PIPELINE] may be set")
os.Exit(1)
}
req = newLogQLRequest(logQL)
case pipeline != "":
req = newPipelineRequest(project, pipeline)
case isAdmin:
req = newLogQLRequest(`{suite="pachyderm"}`)
default:
Expand Down Expand Up @@ -115,6 +140,8 @@ func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command
Use: "logs2",
}
logsCmd.Flags().StringVar(&logQL, "logql", "", "LogQL query")
logsCmd.Flags().StringVar(&project, "project", project, "Project for pipeline query.")
logsCmd.Flags().StringVar(&pipeline, "pipeline", project, "Pipeline for pipeline query.")
commands = append(commands, logsCmd)
return commands
}
173 changes: 170 additions & 3 deletions src/server/logs/cmds/cmds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pachyderm/pachyderm/v2/src/admin"
"github.com/pachyderm/pachyderm/v2/src/auth"
"github.com/pachyderm/pachyderm/v2/src/pfs"

"github.com/pachyderm/pachyderm/v2/src/internal/dockertestenv"
"github.com/pachyderm/pachyderm/v2/src/internal/lokiutil"
Expand Down Expand Up @@ -48,7 +49,7 @@ func realEnvWithLoki(ctx context.Context, t testing.TB, entries []loki.Entry) *r
})
}

func TestGetLogs_noauth(t *testing.T) {
func TestGetLogs_default_noauth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
Expand All @@ -74,7 +75,7 @@ func TestGetLogs_noauth(t *testing.T) {
).Run())
}

func TestGetLogs_nonadmin(t *testing.T) {
func TestGetLogs_default_nonadmin(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestGetLogs_nonadmin(t *testing.T) {
).Run())
}

func TestGetLogs_admin(t *testing.T) {
func TestGetLogs_default_admin(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
Expand Down Expand Up @@ -132,3 +133,169 @@ func TestGetLogs_admin(t *testing.T) {
pachctl logs2 | match "12 baz"`,
).Run())
}

func TestGetLogs_pipeline_noauth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
var (
ctx = pctx.TestContext(t)
buildEntries = func() []loki.Entry {
var entries []loki.Entry
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("%v quux", i),
})
}
return entries
}
env = realEnvWithLoki(ctx, t, buildEntries())
c = env.PachClient
)
mockInspectCluster(env)

// TODO(CORE-2123): check for real logs
require.NoError(t, testutil.PachctlBashCmdCtx(ctx, t, c, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
{
"pipeline": {
"project": {
"name": "{{.ProjectName | js}}"
},
"name": "{{.PipelineName | js}}"
},
"transform": {
"cmd": ["cp", "r", "/pfs/in", "/pfs/out"]
},
"input": {
"pfs": {
"project": "default",
"repo": "{{.RepoName | js}}",
"glob": "/*",
"name": "in"
}
},
"resource_requests": {
"cpu": null,
"disk": "187Mi"
},
"autoscaling": false
}
EOF
pachctl logs2 --pipeline {{.PipelineName}} | match "24 quux"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
).Run())
}

func TestGetLogs_pipeline_user(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}

var (
ctx = pctx.TestContext(t)
buildEntries = func() []loki.Entry {
var entries []loki.Entry
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("%v foo", i),
})
}
return entries
}
env = realEnvWithLoki(ctx, t, buildEntries())
c = env.PachClient
)
mockInspectCluster(env)
peerPort := strconv.Itoa(int(env.ServiceEnv.Config().PeerPort))
alice := testutil.UniqueString("robot:alice")
aliceClient := testutil.AuthenticatedPachClient(t, c, alice, peerPort)

// TODO(CORE-2123): check for real logs
require.NoError(t, testutil.PachctlBashCmdCtx(aliceClient.Ctx(), t, aliceClient, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
{
"pipeline": {
"project": {
"name": "{{.ProjectName | js}}"
},
"name": "{{.PipelineName | js}}"
},
"transform": {
"cmd": ["cp", "r", "/pfs/in", "/pfs/out"]
},
"input": {
"pfs": {
"project": "default",
"repo": "{{.RepoName | js}}",
"glob": "/*",
"name": "in"
}
},
"resource_requests": {
"cpu": null,
"disk": "187Mi"
},
"autoscaling": false
}
EOF
pachctl logs2 --pipeline {{.PipelineName}} | match "64 foo"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
).Run())
}

func TestGetLogs_combination_error(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}

ctx := pctx.TestContext(t)
env := realenv.NewRealEnvWithIdentity(ctx, t, dockertestenv.NewTestDBConfig(t).PachConfigOption)
c := env.PachClient
mockInspectCluster(env)
peerPort := strconv.Itoa(int(env.ServiceEnv.Config().PeerPort))
alice := testutil.UniqueString("robot:alice")
aliceClient := testutil.AuthenticatedPachClient(t, c, alice, peerPort)

require.YesError(t, testutil.PachctlBashCmdCtx(aliceClient.Ctx(), t, aliceClient, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
{
"pipeline": {
"project": {
"name": "{{.ProjectName | js}}"
},
"name": "{{.PipelineName | js}}"
},
"transform": {
"cmd": ["cp", "r", "/pfs/in", "/pfs/out"]
},
"input": {
"pfs": {
"project": "default",
"repo": "{{.RepoName | js}}",
"glob": "/*",
"name": "in"
}
},
"resource_requests": {
"cpu": null,
"disk": "187Mi"
},
"autoscaling": false
}
EOF
pachctl logs2 --logql '{}' --pipeline {{.PipelineName}} | match "GetLogs dummy response"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
).Run())
}

0 comments on commit c87ec40

Please sign in to comment.