From ae9413b92e20ec6af5eb4c1498dd264f1eab40d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandro=20J=C3=A4ckel?= Date: Fri, 22 Mar 2024 16:07:15 +0100 Subject: [PATCH] Add jobloop for managed accounts --- cmd/janitor/main.go | 6 +- internal/api/registry/manifests_test.go | 2 +- internal/drivers/basic/account_management.go | 2 +- internal/keppel/account_management_driver.go | 17 ++++- internal/tasks/account_management.go | 79 ++++++++++++++++++++ internal/tasks/janitor.go | 5 +- internal/tasks/shared_test.go | 4 +- internal/test/setup.go | 3 + 8 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 internal/tasks/account_management.go diff --git a/cmd/janitor/main.go b/cmd/janitor/main.go index f46272d7..d8ee6d23 100644 --- a/cmd/janitor/main.go +++ b/cmd/janitor/main.go @@ -56,8 +56,9 @@ func run(cmd *cobra.Command, args []string) { cfg := keppel.ParseConfiguration() auditor := keppel.InitAuditTrail() - db := must.Return(keppel.InitDB(cfg.DatabaseURL)) ad := must.Return(keppel.NewAuthDriver(osext.MustGetenv("KEPPEL_DRIVER_AUTH"), nil)) + amd := must.Return(keppel.NewAccountManagementDriver(osext.MustGetenv("KEPPEL_DRIVER_ACCOUNT_MANAGEMENT"))) + db := must.Return(keppel.InitDB(cfg.DatabaseURL)) fd := must.Return(keppel.NewFederationDriver(osext.MustGetenv("KEPPEL_DRIVER_FEDERATION"), ad, cfg)) sd := must.Return(keppel.NewStorageDriver(osext.MustGetenv("KEPPEL_DRIVER_STORAGE"), ad, cfg)) icd := must.Return(keppel.NewInboundCacheDriver(osext.MustGetenv("KEPPEL_DRIVER_INBOUND_CACHE"), cfg)) @@ -67,9 +68,10 @@ func run(cmd *cobra.Command, args []string) { ctx := httpext.ContextWithSIGINT(context.Background(), 10*time.Second) // start task loops - janitor := tasks.NewJanitor(cfg, fd, sd, icd, db, auditor) + janitor := tasks.NewJanitor(cfg, fd, sd, icd, db, amd, auditor) go janitor.AccountFederationAnnouncementJob(nil).Run(ctx) go janitor.AbandonedUploadCleanupJob(nil).Run(ctx) + go janitor.CreateManagedAccounts(nil).Run(ctx) go janitor.ManifestGarbageCollectionJob(nil).Run(ctx) go janitor.BlobMountSweepJob(nil).Run(ctx) go janitor.BlobSweepJob(nil).Run(ctx) diff --git a/internal/api/registry/manifests_test.go b/internal/api/registry/manifests_test.go index 99cd2868..36d92f31 100644 --- a/internal/api/registry/manifests_test.go +++ b/internal/api/registry/manifests_test.go @@ -693,7 +693,7 @@ func TestImageManifestWrongBlobSize(t *testing.T) { func TestImageManifestCmdEntrypointAsString(t *testing.T) { testWithPrimary(t, nil, func(s test.Setup) { - j := tasks.NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) + j := tasks.NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) j.DisableJitter() validateManifestJob := j.ManifestValidationJob(s.Registry) diff --git a/internal/drivers/basic/account_management.go b/internal/drivers/basic/account_management.go index b47fb640..6cc9d0c3 100644 --- a/internal/drivers/basic/account_management.go +++ b/internal/drivers/basic/account_management.go @@ -61,7 +61,7 @@ func init() { func (a *AccountManagementDriver) PluginTypeID() string { return "basic" } // ConfigureAccount implements the keppel.AccountManagementDriver interface. -func (a *AccountManagementDriver) Init(envToConfigFile string) error { +func (a *AccountManagementDriver) Init() error { a.configPath = os.Getenv("KEPPEL_ACCOUNT_MANAGEMENT_FILE") if a.configPath == "" { return errors.New("KEPPEL_ACCOUNT_MANAGEMENT_FILE is not set") diff --git a/internal/keppel/account_management_driver.go b/internal/keppel/account_management_driver.go index 2ccba2e8..ac121e4a 100644 --- a/internal/keppel/account_management_driver.go +++ b/internal/keppel/account_management_driver.go @@ -19,6 +19,9 @@ package keppel import ( + "errors" + + "github.com/sapcc/go-bits/logg" "github.com/sapcc/go-bits/pluggable" "github.com/sapcc/keppel/internal/models" ) @@ -30,7 +33,7 @@ type AccountManagementDriver interface { pluggable.Plugin // Init is called before any other interface methods, and allows the plugin to // perform first-time initialization. - Init(string) error + Init() error // Called by a jobloop for every account every once in a while (e.g. every hour). // @@ -50,3 +53,15 @@ type AccountManagementDriver interface { // AccountManagementDriverRegistry is a pluggable.Registry for AccountManagementDriver implementations. var AccountManagementDriverRegistry pluggable.Registry[AccountManagementDriver] + +// NewAccountManagementDriver creates a new AuthDriver using one of the plugins registered +// with AccountManagementDriver. +func NewAccountManagementDriver(pluginTypeID string) (AccountManagementDriver, error) { + logg.Debug("initializing account management driver %q...", pluginTypeID) + + amd := AccountManagementDriverRegistry.Instantiate(pluginTypeID) + if amd == nil { + return nil, errors.New("no such account management driver: " + pluginTypeID) + } + return amd, amd.Init() +} diff --git a/internal/tasks/account_management.go b/internal/tasks/account_management.go new file mode 100644 index 000000000..7f5b8215 --- /dev/null +++ b/internal/tasks/account_management.go @@ -0,0 +1,79 @@ +/****************************************************************************** +* +* Copyright 2024 SAP SE +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +******************************************************************************/ + +package tasks + +import ( + "context" + "database/sql" + "slices" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-bits/jobloop" +) + +// CreateManagedAccounts is a job. Each task creates newly discovered accounts from the driver. +func (j *Janitor) CreateManagedAccounts(registerer prometheus.Registerer) jobloop.Job { + return (&jobloop.ProducerConsumerJob[[]string]{ + Metadata: jobloop.JobMetadata{ + ReadableName: "create new managed accounts", + CounterOpts: prometheus.CounterOpts{ + Name: "keppel_managed_account_creations", + Help: "Counter for managed account creations.", + }, + }, + DiscoverTask: j.discoverAccountsToCreate, + ProcessTask: j.createNewAccounts, + }).Setup(registerer) +} + +func (j *Janitor) discoverAccountsToCreate(_ context.Context, _ prometheus.Labels) (accounts []string, err error) { + managedAccounts, err := j.amd.ManagedAccountNames() + if err != nil { + return nil, err + } + + var existingAccounts []string + _, err = j.db.Select(&existingAccounts, "SELECT name FROM accounts WHERE is_managed = true") + if err != nil { + return nil, err + } + + for _, account := range managedAccounts { + if !slices.Contains(existingAccounts, account) { + accounts = append(accounts, account) + } + } + + if len(accounts) == 0 { + return nil, sql.ErrNoRows + } + + return accounts, nil +} + +func (j *Janitor) createNewAccounts(ctx context.Context, accounts []string, labels prometheus.Labels) error { + for _, account := range accounts { + _, err := j.db.Exec(accountAnnouncementDoneQuery, account, j.timeNow().Add(j.addJitter(1*time.Hour))) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/tasks/janitor.go b/internal/tasks/janitor.go index 0aa9ee99..ea674228 100644 --- a/internal/tasks/janitor.go +++ b/internal/tasks/janitor.go @@ -53,6 +53,7 @@ type Janitor struct { sd keppel.StorageDriver icd keppel.InboundCacheDriver db *keppel.DB + amd keppel.AccountManagementDriver auditor keppel.Auditor // non-pure functions that can be replaced by deterministic doubles for unit tests @@ -62,8 +63,8 @@ type Janitor struct { } // NewJanitor creates a new Janitor. -func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor keppel.Auditor) *Janitor { - j := &Janitor{cfg, fd, sd, icd, db, auditor, time.Now, keppel.GenerateStorageID, addJitter} +func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, amd keppel.AccountManagementDriver, auditor keppel.Auditor) *Janitor { + j := &Janitor{cfg, fd, sd, icd, db, amd, auditor, time.Now, keppel.GenerateStorageID, addJitter} return j } diff --git a/internal/tasks/shared_test.go b/internal/tasks/shared_test.go index 69f404c1..b133b02e 100644 --- a/internal/tasks/shared_test.go +++ b/internal/tasks/shared_test.go @@ -40,7 +40,7 @@ func setup(t *testing.T, opts ...test.SetupOption) (*Janitor, test.Setup) { test.WithQuotas, } s := test.NewSetup(t, append(params, opts...)...) - j := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) + j := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) j.DisableJitter() return j, s } @@ -75,7 +75,7 @@ func setupReplica(t *testing.T, s1 test.Setup, strategy string) (*Janitor, test. test.WithQuotas, ) - j2 := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) + j2 := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next) j2.DisableJitter() return j2, s } diff --git a/internal/test/setup.go b/internal/test/setup.go index 51a582bc..3690faff 100644 --- a/internal/test/setup.go +++ b/internal/test/setup.go @@ -40,6 +40,7 @@ import ( keppelv1 "github.com/sapcc/keppel/internal/api/keppel" peerv1 "github.com/sapcc/keppel/internal/api/peer" registryv2 "github.com/sapcc/keppel/internal/api/registry" + "github.com/sapcc/keppel/internal/drivers/basic" "github.com/sapcc/keppel/internal/drivers/trivial" "github.com/sapcc/keppel/internal/keppel" "github.com/sapcc/keppel/internal/models" @@ -152,6 +153,7 @@ type Setup struct { SIDGenerator *StorageIDGenerator Auditor *Auditor AD *AuthDriver + AMD *basic.AccountManagementDriver FD *FederationDriver SD *trivial.StorageDriver ICD *InboundCacheDriver @@ -301,6 +303,7 @@ func NewSetup(t *testing.T, opts ...SetupOption) Setup { // setup essential test doubles s.Clock = mock.NewClock() s.SIDGenerator = &StorageIDGenerator{} + s.AMD = &basic.AccountManagementDriver{} s.Auditor = &Auditor{} // if we are secondary and we know the primary, share the clock with it