Skip to content

Commit

Permalink
[SYSINFRA-1028] reflow/flow: Reflect resolved filesets of Exec depend…
Browse files Browse the repository at this point in the history
…encies in log output.

Summary:
Since the `scheduler` resolves filesets of the dependencies of an `Exec` flow
only during the `Load` stage, the resolved filesets are never reflected in the flow graph
(but only available in the scheduler task's config).

This change makese these resolved filesets available in the `Flow` for subsequent logging.

Test Plan:
Unit and integration tests.

See [[ https://jira.ti-apps.aws.grail.com/browse/SYSINFRA-1029 | SYSINFRA-1029 ]].

Reviewers: pboyapalli

Reviewed By: pboyapalli

Subscribers: dnicolaou

Differential Revision: https://phabricator.grailbio.com/D69257

fbshipit-source-id: 8cf72c6
  • Loading branch information
swami-m authored and prb2 committed Jan 31, 2022
1 parent 6e4528d commit 277cdaf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
26 changes: 24 additions & 2 deletions flow/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,16 +1705,25 @@ var statusPrinters = [maxOp]struct {
fmt.Fprintln(w, " ", line)
}
fmt.Fprintln(w, "where:")
seen := make(map[string]bool)
for i, arg := range f.Argstrs {
if f.ExecArg(i).Out {
earg := f.ExecArg(i)
if earg.Out || seen[arg] {
continue
}
fmt.Fprintf(w, " %s = \n", arg)
if fs, ok := f.Deps[f.ExecArg(i).Index].Value.(reflow.Fileset); ok {
var fs reflow.Fileset
if len(f.resolvedFs) > earg.Index {
fs = f.resolvedFs[earg.Index]
}
if !fs.Empty() {
printFileset(w, " ", fs)
} else if fs, ok := f.Deps[earg.Index].Value.(reflow.Fileset); ok {
printFileset(w, " ", fs)
} else {
fmt.Fprintln(w, " (cached)")
}
seen[arg] = true
}
if f.State != Done {
return
Expand Down Expand Up @@ -1908,6 +1917,19 @@ func (e *Eval) taskWait(ctx context.Context, f *Flow, task *sched.Task) error {
}
// Grab the task's exec so that it can be logged properly.
f.Exec = task.Exec
if f.Op == Exec && f.Argmap != nil {
// If this is an Exec and f.Argmap is defined, then
// update the flow's resolved filesets.
n := f.NExecArg()
f.resolvedFs = make([]reflow.Fileset, n)
for i := 0; i < n; i++ {
earg, arg := f.ExecArg(i), task.Config.Args[i]
if earg.Out {
continue
}
f.resolvedFs[earg.Index] = *arg.Fileset
}
}
e.LogFlow(ctx, f)
if err := task.Wait(ctx, sched.TaskDone); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ type Flow struct {

// ArgMap maps exec arguments to dependencies. (OpExec).
Argmap []ExecArg
// resolvedFs maps exec arguments to resolved filesets. (OpExec).
resolvedFs []reflow.Fileset
// OutputIsDir tells whether the output i is a directory.
OutputIsDir []bool

Expand Down

0 comments on commit 277cdaf

Please sign in to comment.