Skip to content

Commit

Permalink
[SYSINFRA-595] reflow: Refactoring and cleanup
Browse files Browse the repository at this point in the history
Summary:
Most of the changes here are done to facilitate the eventual move of `tool.Runner`
into the `reflow/runtime` package.  This change cleans up the dependency chain
so that the move itself is easier to manage.

There is no functionality change in this revision.

Test Plan: Unit and Integration tests.

Reviewers: ghorrell

Reviewed By: ghorrell

Subscribers: dnicolaou, pboyapalli

Differential Revision: https://phabricator.grailbio.com/D70851

fbshipit-source-id: 61dc1a9
  • Loading branch information
swami-m authored and spolakh committed Mar 11, 2022
1 parent 4a1b008 commit f284e87
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 164 deletions.
67 changes: 0 additions & 67 deletions runtime/cluster.go

This file was deleted.

117 changes: 117 additions & 0 deletions runtime/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package runtime

import (
"fmt"
"net/http"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/grailbio/infra"
"github.com/grailbio/infra/tls"
"github.com/grailbio/reflow/blob/s3blob"
"github.com/grailbio/reflow/ec2cluster"
"github.com/grailbio/reflow/errors"
infra2 "github.com/grailbio/reflow/infra"
"github.com/grailbio/reflow/repository/blobrepo"
repositoryhttp "github.com/grailbio/reflow/repository/http"
"github.com/grailbio/reflow/runner"
"github.com/grailbio/reflow/taskdb"
"golang.org/x/net/http2"
)

// ClusterInstance returns a configured cluster and sets up repository
// credentials so that remote repositories can be dialed.
func ClusterInstance(config infra.Config) (runner.Cluster, error) {
var cluster runner.Cluster
err := config.Instance(&cluster)
if err != nil {
return nil, err
}
if ec, ok := cluster.(*ec2cluster.Cluster); ok {
ec.Configuration = config
ec.ExportStats()
if err = ec.Verify(); err != nil {
return nil, err
}
} else {
return cluster, nil
}
var sess *session.Session
err = config.Instance(&sess)
if err != nil {
return nil, err
}
// TODO(marius): handle this more elegantly, perhaps by avoiding
// such global registration altogether. The current way of doing this
// also ties the binary to specific implementations (e.g., s3), which
// should be avoided.
blobrepo.Register("s3", s3blob.New(sess))
// TODO(swami): Why is this needed and can we avoid this?
repositoryhttp.HTTPClient, err = HttpClient(config)
if err != nil {
return nil, err
}
return cluster, nil
}

func HttpClient(config infra.Config) (*http.Client, error) {
var ca tls.Certs
err := config.Instance(&ca)
if err != nil {
return nil, err
}
clientConfig, _, err := ca.HTTPS()
if err != nil {
return nil, err
}
transport := &http.Transport{TLSClientConfig: clientConfig}
if err := http2.ConfigureTransport(transport); err != nil {
return nil, err
}
return &http.Client{Transport: transport}, nil
}

// PredictorConfig returns a PredictorConfig (possibly nil) and an error (if any).
func PredictorConfig(cfg infra.Config) (*infra2.PredictorConfig, error) {
var (
predConfig *infra2.PredictorConfig
sess *session.Session
tdb taskdb.TaskDB
)
if err := cfg.Instance(&tdb); err != nil {
return nil, errors.E("predictor config: no taskdb", err)
}
if err := cfg.Instance(&predConfig); err != nil {
return nil, errors.E("predictor config: no predconfig", err)
}
if err := cfg.Instance(&sess); err != nil || sess == nil {
return nil, errors.E("predictor config: no session", err)
}
return predConfig, validatePredictorConfig(sess, tdb, predConfig)
}

// validatePredictorConfig validates if the Predictor can be used by reflow.
// The Predictor can only be used if the following conditions are true:
// 1. A taskdb is present in the provided config, for querying tasks.
// (and the taskdb must return a valid `Repository()`)
// 2. Reflow is being run from an ec2 instance OR the Predictor config (using NonEC2Ok)
// gives explicit permission to run the Predictor on non-ec2-instance machines.
// This is because the Predictor is network-intensive and its performance will be hampered by poor network.
func validatePredictorConfig(sess *session.Session, tdb taskdb.TaskDB, predConfig *infra2.PredictorConfig) error {
if tdb == nil {
return fmt.Errorf("validatePredictorConfig: no taskdb")
}
if tdb.Repository() == nil {
return fmt.Errorf("validatePredictorConfig: no repo")
}
if predConfig == nil {
return fmt.Errorf("validatePredictorConfig: no predconfig")
}
if !predConfig.NonEC2Ok {
if md := ec2metadata.New(sess, &aws.Config{MaxRetries: aws.Int(3)}); !md.Available() {
return fmt.Errorf("not running on ec2 instance (and nonEc2Ok is not true)")
}
}
return nil
}
3 changes: 2 additions & 1 deletion tool/pred.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grailbio/base/traverse"
"github.com/grailbio/reflow"
"github.com/grailbio/reflow/predictor"
"github.com/grailbio/reflow/runtime"
"github.com/grailbio/reflow/taskdb"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ Valid values for -name are "mem" and "duration".
flags.Usage()
}

cfg, err := getPredictorConfig(c.Config, false)
cfg, err := runtime.PredictorConfig(c.Config)
if err != nil {
c.Fatal(err)
}
Expand Down
11 changes: 3 additions & 8 deletions tool/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/grailbio/reflow/runner"
"github.com/grailbio/reflow/runtime"
"github.com/grailbio/reflow/syntax"
"github.com/grailbio/reflow/taskdb"
"github.com/grailbio/reflow/trace"
"github.com/grailbio/reflow/wg"
)
Expand Down Expand Up @@ -130,7 +129,8 @@ func (c *Cmd) runCommon(ctx context.Context, runFlags RunFlags, file string, arg
r.status = c.Status

// Set up run transcript and log files.
base := c.Runbase(r.RunID)
base, err := r.Runbase()
c.must(err)
c.must(os.MkdirAll(filepath.Dir(base), 0777))
var (
logfile, dotfile *os.File
Expand Down Expand Up @@ -191,18 +191,13 @@ func (c *Cmd) runCommon(ctx context.Context, runFlags RunFlags, file string, arg

// rundir returns the directory that stores run state, creating it if necessary.
func (c *Cmd) rundir() string {
rundir, err := Rundir()
rundir, err := reflow.Rundir()
if err != nil {
c.Fatalf("failed to create temporary directory: %v", err)
}
return rundir
}

// Runbase returns the base path for the run with the provided name
func (c Cmd) Runbase(runID taskdb.RunID) string {
return Runbase(c.rundir(), runID)
}

// WaitForBackgroundTasks waits until all background tasks complete, or if the provided
// timeout expires.
func (c Cmd) WaitForBackgroundTasks(wg *wg.WaitGroup, timeout time.Duration) {
Expand Down
86 changes: 4 additions & 82 deletions tool/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"time"

aws2 "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/grailbio/base/digest"
"github.com/grailbio/base/state"
Expand Down Expand Up @@ -49,7 +46,7 @@ func NewRunner(infraRunConfig infra.Config, runConfig RunConfig, logger *log.Log
// Configure the Predictor.
var pred *predictor.Predictor
if runConfig.RunFlags.Pred {
if cfg, cerr := getPredictorConfig(infraRunConfig, false); cerr != nil {
if cfg, cerr := runtime.PredictorConfig(infraRunConfig); cerr != nil {
logger.Errorf("error while configuring predictor: %s", cerr)
} else {
pred = predictor.New(scheduler.TaskDB, logger.Tee(nil, "predictor: "), cfg.MinData, cfg.MaxInspect, cfg.MemPercentile)
Expand Down Expand Up @@ -283,8 +280,7 @@ func (r *Runner) setRunComplete(ctx context.Context, tdb taskdb.TaskDB, endTime
runLog, dotFile, trace digest.Digest
rc io.ReadCloser
)
runbase, err := r.Runbase()
if err == nil {
if runbase, err := r.Runbase(); err == nil {
if rc, err = os.Open(runbase + ".runlog"); err == nil {
pctx, cancel := context.WithTimeout(ctx, 30*time.Second)
if runLog, err = r.repo.Put(pctx, rc); err != nil {
Expand Down Expand Up @@ -313,7 +309,7 @@ func (r *Runner) setRunComplete(ctx context.Context, tdb taskdb.TaskDB, endTime
} else {
r.Log.Debugf("unable to determine runbase: %v", err)
}
err = tdb.SetRunComplete(ctx, r.RunID, runLog, dotFile, trace, endTime)
err := tdb.SetRunComplete(ctx, r.RunID, runLog, dotFile, trace, endTime)
if err == nil {
var ds []string
if !runLog.IsZero() {
Expand Down Expand Up @@ -384,80 +380,6 @@ func (r Runner) waitForBackgroundTasks(timeout time.Duration) {
}
}

// Runbase returns the base path for the run
func (r Runner) Runbase() (string, error) {
rundir, err := Rundir()
if err != nil {
return "", err
}
return Runbase(rundir, r.RunID), nil
}

func Rundir() (string, error) {
var rundir string
if home, ok := os.LookupEnv("HOME"); ok {
rundir = filepath.Join(home, ".reflow", "runs")
if err := os.MkdirAll(rundir, 0777); err != nil {
return "", err
}
} else {
var err error
rundir, err = ioutil.TempDir("", "prefix")
if err != nil {
return "", errors.E("failed to create temporary directory: %v", err)
}
}
return rundir, nil
}

func Runbase(rundir string, runID taskdb.RunID) string {
return filepath.Join(rundir, runID.Hex())
}

// getPredictorConfig gets a PredictorConfig and/or an error (if any).
func getPredictorConfig(cfg infra.Config, nonEC2ok bool) (*infra2.PredictorConfig, error) {
var (
predConfig *infra2.PredictorConfig
repo reflow.Repository
sess *session.Session
tdb taskdb.TaskDB
)
if err := cfg.Instance(&tdb); err != nil {
return nil, errors.E("predictor config: no taskdb", err)
}
if err := cfg.Instance(&repo); err != nil {
return nil, errors.E("predictor config: no repo", err)
}
if err := cfg.Instance(&predConfig); err != nil {
return nil, errors.E("predictor config: no predconfig", err)
}
if err := cfg.Instance(&sess); err != nil || sess == nil {
return nil, errors.E("predictor config: no session", err)
}
return predConfig, validatePredictorConfig(sess, repo, tdb, predConfig, nonEC2ok)
}

// validatePredictorConfig validates if the Predictor can be used by reflow.
// The Predictor can only be used if the following conditions are true:
// 1. A repo is provided for retrieving cached ExecInspects.
// 2. A taskdb is present in the provided config, for querying tasks.
// 3. Reflow is being run from an ec2 instance OR either the Predictor config (using NonEC2Ok)
// or the flag nonEC2ok gives explicit permission to run the Predictor non-ec2-instance machines.
// This is because the Predictor is network-intensive and its performance will be hampered by poor network.
func validatePredictorConfig(sess *session.Session, repo reflow.Repository, tdb taskdb.TaskDB, predConfig *infra2.PredictorConfig, nonEC2ok bool) error {
if tdb == nil {
return fmt.Errorf("validatePredictorConfig: no taskdb")
}
if repo == nil {
return fmt.Errorf("validatePredictorConfig: no repo")
}
if predConfig == nil {
return fmt.Errorf("validatePredictorConfig: no predconfig")
}
if !predConfig.NonEC2Ok && !nonEC2ok {
if md := ec2metadata.New(sess, &aws2.Config{MaxRetries: aws2.Int(3)}); !md.Available() {
return fmt.Errorf("not running on ec2 instance (and nonEc2Ok is not true)")
}
}
return nil
return reflow.Runbase(digest.Digest(r.RunID))
}
13 changes: 7 additions & 6 deletions trace/localtrace/localtracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"sync/atomic"
"time"

"github.com/grailbio/base/digest"
"github.com/grailbio/infra"
"github.com/grailbio/reflow"
"github.com/grailbio/reflow/taskdb"
"github.com/grailbio/reflow/tool"
"github.com/grailbio/reflow/trace"
)

Expand All @@ -34,12 +35,12 @@ type LocalTracer struct {
// Init implements infra.Provider and gets called when an instance of LocalTracer
// is created with infra.Config.Instance(...)
func (lt *LocalTracer) Init(runID *taskdb.RunID) error {
dir, err := tool.Rundir()
if err == nil {
base := tool.Runbase(dir, *runID)
lt.tracefilepath = base + ".trace"
base, err := reflow.Runbase(digest.Digest(*runID))
if err != nil {
return err
}
return err
lt.tracefilepath = base + ".trace"
return nil
}

// New returns a new LocalTracer instance. This function is intended for cases
Expand Down

0 comments on commit f284e87

Please sign in to comment.