Skip to content

Commit

Permalink
[CORE-2213] Implement pachctl logs2 --project PROJECT (#9848)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Rockway <2367+jrockway@users.noreply.github.com>
  • Loading branch information
robert-uhl and jrockway committed Mar 26, 2024
1 parent c87ec40 commit 9245e70
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 10 deletions.
25 changes: 21 additions & 4 deletions src/server/logs/cmds/cmds.go
Expand Up @@ -67,6 +67,21 @@ func newPipelineRequest(project, pipeline string) *logs.GetLogsRequest {
}
}

func newProjectRequest(project string) *logs.GetLogsRequest {
return &logs.GetLogsRequest{
LogFormat: logs.LogFormat_LOG_FORMAT_VERBATIM_WITH_TIMESTAMP,
Query: &logs.LogQuery{
QueryType: &logs.LogQuery_User{
User: &logs.UserLogQuery{
UserType: &logs.UserLogQuery_Project{
Project: project,
},
},
},
},
}
}

func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command {
var commands []*cobra.Command

Expand All @@ -92,14 +107,16 @@ func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command

var req *logs.GetLogsRequest
switch {
case logQL != "":
if !(project == pachCtx.Project && pipeline == "") {
case cmd.Flag("logql").Changed:
if cmd.Flag("project").Changed || cmd.Flag("pipeline").Changed {
fmt.Fprintln(os.Stderr, "only one of [--logQL | --project PROJECT --pipeline PIPELINE] may be set")
os.Exit(1)
}
req = newLogQLRequest(logQL)
case pipeline != "":
case cmd.Flag("pipeline").Changed:
req = newPipelineRequest(project, pipeline)
case cmd.Flag("project").Changed:
req = newProjectRequest(project)
case isAdmin:
req = newLogQLRequest(`{suite="pachyderm"}`)
default:
Expand Down Expand Up @@ -141,7 +158,7 @@ func Cmds(pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command
}
logsCmd.Flags().StringVar(&logQL, "logql", "", "LogQL query")
logsCmd.Flags().StringVar(&project, "project", project, "Project for pipeline query.")
logsCmd.Flags().StringVar(&pipeline, "pipeline", project, "Pipeline for pipeline query.")
logsCmd.Flags().StringVar(&pipeline, "pipeline", pipeline, "Pipeline for pipeline query.")
commands = append(commands, logsCmd)
return commands
}
148 changes: 142 additions & 6 deletions src/server/logs/cmds/cmds_test.go
Expand Up @@ -155,7 +155,6 @@ func TestGetLogs_pipeline_noauth(t *testing.T) {
)
mockInspectCluster(env)

// TODO(CORE-2123): check for real logs
require.NoError(t, testutil.PachctlBashCmdCtx(ctx, t, c, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
Expand Down Expand Up @@ -216,7 +215,6 @@ func TestGetLogs_pipeline_user(t *testing.T) {
alice := testutil.UniqueString("robot:alice")
aliceClient := testutil.AuthenticatedPachClient(t, c, alice, peerPort)

// TODO(CORE-2123): check for real logs
require.NoError(t, testutil.PachctlBashCmdCtx(aliceClient.Ctx(), t, aliceClient, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
Expand Down Expand Up @@ -252,14 +250,145 @@ func TestGetLogs_pipeline_user(t *testing.T) {
).Run())
}

func TestGetLogs_project_noauth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}

var (
ctx = pctx.TestContext(t)
buildEntries = func() []loki.Entry {
var entries []loki.Entry
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("%v %s", i, t.Name()),
})
}
return entries
}
env = realEnvWithLoki(ctx, t, buildEntries())
c = env.PachClient
)
mockInspectCluster(env)

require.NoError(t, testutil.PachctlBashCmdCtx(ctx, t, c, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
{
"pipeline": {
"project": {
"name": "{{.ProjectName | js}}"
},
"name": "{{.PipelineName | js}}"
},
"transform": {
"cmd": ["cp", "r", "/pfs/in", "/pfs/out"]
},
"input": {
"pfs": {
"project": "default",
"repo": "{{.RepoName | js}}",
"glob": "/*",
"name": "in"
}
},
"resource_requests": {
"cpu": null,
"disk": "187Mi"
},
"autoscaling": false
}
EOF
pachctl logs2 --project {{.ProjectName}} | match "16 {{.TestName}}"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
"TestName", t.Name(),
).Run())
}

func TestGetLogs_project_user(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}

var (
ctx = pctx.TestContext(t)
buildEntries = func() []loki.Entry {
var entries []loki.Entry
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("%v %s", i, t.Name()),
})
}
return entries
}
env = realEnvWithLoki(ctx, t, buildEntries())
c = env.PachClient
)
mockInspectCluster(env)
peerPort := strconv.Itoa(int(env.ServiceEnv.Config().PeerPort))
alice := testutil.UniqueString("robot:alice")
aliceClient := testutil.AuthenticatedPachClient(t, c, alice, peerPort)

require.NoError(t, testutil.PachctlBashCmdCtx(aliceClient.Ctx(), t, aliceClient, `
pachctl create repo {{.RepoName}}
pachctl create pipeline <<EOF
{
"pipeline": {
"project": {
"name": "{{.ProjectName | js}}"
},
"name": "{{.PipelineName | js}}"
},
"transform": {
"cmd": ["cp", "r", "/pfs/in", "/pfs/out"]
},
"input": {
"pfs": {
"project": "default",
"repo": "{{.RepoName | js}}",
"glob": "/*",
"name": "in"
}
},
"resource_requests": {
"cpu": null,
"disk": "187Mi"
},
"autoscaling": false
}
EOF
pachctl logs2 --project {{.ProjectName}} | match "8 {{.TestName}}"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
"TestName", t.Name(),
).Run())
}

func TestGetLogs_combination_error(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}

ctx := pctx.TestContext(t)
env := realenv.NewRealEnvWithIdentity(ctx, t, dockertestenv.NewTestDBConfig(t).PachConfigOption)
c := env.PachClient
var (
ctx = pctx.TestContext(t)
buildEntries = func() []loki.Entry {
var entries []loki.Entry
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("%v %s", i, t.Name()),
})
}
return entries
}
env = realEnvWithLoki(ctx, t, buildEntries())
c = env.PachClient
)
mockInspectCluster(env)
peerPort := strconv.Itoa(int(env.ServiceEnv.Config().PeerPort))
alice := testutil.UniqueString("robot:alice")
Expand Down Expand Up @@ -293,9 +422,16 @@ func TestGetLogs_combination_error(t *testing.T) {
"autoscaling": false
}
EOF
pachctl logs2 --logql '{}' --pipeline {{.PipelineName}} | match "GetLogs dummy response"`,
pachctl logs2 --logql '{}' --pipeline {{.PipelineName}} | match "1 {{.TestName}}"`,
"ProjectName", pfs.DefaultProjectName,
"RepoName", "input",
"PipelineName", "pipeline",
"TestName", t.Name(),
).Run())

require.YesError(t, testutil.PachctlBashCmdCtx(aliceClient.Ctx(), t, aliceClient, `
pachctl logs2 --logql '{}' --project {{.ProjectName}}} | match "16 {{.TestName}}"`,
"ProjectName", pfs.DefaultProjectName,
"TestName", t.Name(),
).Run())
}

0 comments on commit 9245e70

Please sign in to comment.