Skip to content

Commit

Permalink
Merge branch 'master' into jonathan/core-2238-loki-in-testpachd
Browse files Browse the repository at this point in the history
  • Loading branch information
robert-uhl committed Apr 3, 2024
2 parents 653b179 + 410f2b2 commit 37b4b6f
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 39 deletions.
6 changes: 3 additions & 3 deletions src/internal/pachd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (b *builder) registerPFSServer(ctx context.Context) error {
if err != nil {
return err
}
apiServer, err := pfs_server.NewAPIServer(*env)
apiServer, err := pfs_server.NewAPIServer(ctx, *env)
if err != nil {
return err
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (b *builder) startPFSWorker(ctx context.Context) error {
config := pfs_server.WorkerConfig{
Storage: b.env.Config().StorageConfiguration,
}
w, err := pfs_server.NewWorker(*env, config)
w, err := pfs_server.NewWorker(ctx, *env, config)
if err != nil {
return err
}
Expand All @@ -450,7 +450,7 @@ func (b *builder) startPFSMaster(ctx context.Context) error {
if err != nil {
return err
}
m, err := pfs_server.NewMaster(*env)
m, err := pfs_server.NewMaster(ctx, *env)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/pachd/pachw.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (pachwb *pachwBuilder) registerPFSServer(ctx context.Context) error {
if err != nil {
return err
}
apiServer, err := pfs_server.NewAPIServer(*env)
apiServer, err := pfs_server.NewAPIServer(ctx, *env)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions src/internal/pachd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func initPFSAPIServer(out *pfs.APIServer, outMaster **pfs_server.Master, env fun
return setupStep{
Name: "initPFSAPIServer",
Fn: func(ctx context.Context) error {
apiServer, err := pfs_server.NewAPIServer(env())
apiServer, err := pfs_server.NewAPIServer(ctx, env())
if err != nil {
return errors.Wrap(err, "pfs api server")
}
*out = apiServer
master, err := pfs_server.NewMaster(env())
master, err := pfs_server.NewMaster(ctx, env())
if err != nil {
return errors.Wrap(err, "pfs master")
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func initPFSWorker(out **pfs_server.Worker, config pachconfig.StorageConfigurati
return setupStep{
Name: "initPFSWorker",
Fn: func(ctx context.Context) error {
w, err := pfs_server.NewWorker(env(), pfs_server.WorkerConfig{Storage: config})
w, err := pfs_server.NewWorker(ctx, env(), pfs_server.WorkerConfig{Storage: config})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/pachd/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (sb *sidecarBuilder) registerPFSServer(ctx context.Context) error {
if err != nil {
return err
}
apiServer, err := pfs_server.NewAPIServer(*env)
apiServer, err := pfs_server.NewAPIServer(ctx, *env)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions src/internal/storage/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"database/sql"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/obj"
Expand Down Expand Up @@ -38,13 +39,13 @@ type Server struct {
}

// New creates a new Server
func New(env Env, config pachconfig.StorageConfiguration) (*Server, error) {
func New(ctx context.Context, env Env, config pachconfig.StorageConfiguration) (*Server, error) {
// Setup tracker
tracker := track.NewPostgresTracker(env.DB)

// chunk
keyStore := chunk.NewPostgresKeyStore(env.DB)
secret, err := getOrCreateKey(context.TODO(), keyStore, "default")
secret, err := getOrCreateKey(ctx, keyStore, "default")
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions src/internal/storage/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ func TestServer(t *testing.T) {
fileset.NewTestStorage(ctx, t, db, tracker)

var config pachconfig.StorageConfiguration
s, err := New(Env{
DB: db,
Bucket: b,
}, config)
s, err := New(ctx,
Env{
DB: db,
Bucket: b,
},
config)
require.NoError(t, err)

w := s.Filesets.NewWriter(ctx)
Expand Down
24 changes: 14 additions & 10 deletions src/internal/testpachd/realenv/real_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,28 @@ func newRealEnv(ctx context.Context, t testing.TB, mockPPSTransactionServer bool
pfsEnv, err := pachd.PFSEnv(realEnv.ServiceEnv, txnEnv)
require.NoError(t, err)
pfsEnv.EtcdPrefix = ""
realEnv.PFSServer, err = pfsserver.NewAPIServer(*pfsEnv)
realEnv.PFSServer, err = pfsserver.NewAPIServer(ctx, *pfsEnv)
require.NoError(t, err)
w, err := pfsserver.NewWorker(pfsserver.WorkerEnv{
DB: pfsEnv.DB,
ObjClient: pfsEnv.ObjectClient,
Bucket: pfsEnv.Bucket,
TaskService: pfsEnv.TaskService,
}, pfsserver.WorkerConfig{
Storage: pfsEnv.StorageConfig,
})
w, err := pfsserver.NewWorker(
ctx,
pfsserver.WorkerEnv{
DB: pfsEnv.DB,
ObjClient: pfsEnv.ObjectClient,
Bucket: pfsEnv.Bucket,
TaskService: pfsEnv.TaskService,
},
pfsserver.WorkerConfig{
Storage: pfsEnv.StorageConfig,
},
)
require.NoError(t, err)
go func() {
if err := w.Run(ctx); err != nil {
log.Error(ctx, "from worker", zap.Error(err))
}
}()
realEnv.ServiceEnv.SetPfsServer(realEnv.PFSServer)
pfsMaster, err := pfsserver.NewMaster(*pfsEnv)
pfsMaster, err := pfsserver.NewMaster(ctx, *pfsEnv)
require.NoError(t, err)
go pfsMaster.Run(ctx) //nolint:errcheck

Expand Down
4 changes: 2 additions & 2 deletions src/server/pfs/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type apiServer struct {
driver *driver
}

func newAPIServer(env Env) (*apiServer, error) {
d, err := newDriver(env)
func newAPIServer(ctx context.Context, env Env) (*apiServer, error) {
d, err := newDriver(ctx, env)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions src/server/pfs/server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ type driver struct {
cache *fileset.Cache
}

func newDriver(env Env) (*driver, error) {
func newDriver(ctx context.Context, env Env) (*driver, error) {
// test object storage.
if err := func() error {
ctx, cf := context.WithTimeout(pctx.Background("newDriver"), 30*time.Second)
ctx, cf := context.WithTimeout(pctx.Child(ctx, "newDriver"), 30*time.Second)
defer cf()
return obj.TestStorage(ctx, env.Bucket, env.ObjectClient)
}(); err != nil {
Expand All @@ -111,7 +111,7 @@ func newDriver(env Env) (*driver, error) {
} else {
storageEnv.ObjectStore = env.ObjectClient
}
storageSrv, err := storage.New(storageEnv, env.StorageConfig)
storageSrv, err := storage.New(ctx, storageEnv, env.StorageConfig)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/pfs/server/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Master struct {
driver *driver
}

func NewMaster(env Env) (*Master, error) {
d, err := newDriver(env)
func NewMaster(ctx context.Context, env Env) (*Master, error) {
d, err := newDriver(ctx, env)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/pfs/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type Env struct {
}

// NewAPIServer creates an APIServer.
func NewAPIServer(env Env) (pfsserver.APIServer, error) {
a, err := newAPIServer(env)
func NewAPIServer(ctx context.Context, env Env) (pfsserver.APIServer, error) {
a, err := newAPIServer(ctx, env)
if err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions src/server/pfs/server/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type Worker struct {
storage *storage.Server
}

func NewWorker(env WorkerEnv, config WorkerConfig) (*Worker, error) {
ss, err := storage.New(storage.Env{
DB: env.DB,
Bucket: env.Bucket,
ObjectStore: env.ObjClient,
}, config.Storage)
func NewWorker(ctx context.Context, env WorkerEnv, config WorkerConfig) (*Worker, error) {
ss, err := storage.New(ctx,
storage.Env{
DB: env.DB,
Bucket: env.Bucket,
ObjectStore: env.ObjClient,
},
config.Storage)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 37b4b6f

Please sign in to comment.