Skip to content

Commit

Permalink
Add jobloop for managed accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperSandro2000 committed Apr 8, 2024
1 parent 2e7d1a9 commit ae9413b
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 9 deletions.
6 changes: 4 additions & 2 deletions cmd/janitor/main.go
Expand Up @@ -56,8 +56,9 @@ func run(cmd *cobra.Command, args []string) {
cfg := keppel.ParseConfiguration()
auditor := keppel.InitAuditTrail()

db := must.Return(keppel.InitDB(cfg.DatabaseURL))
ad := must.Return(keppel.NewAuthDriver(osext.MustGetenv("KEPPEL_DRIVER_AUTH"), nil))
amd := must.Return(keppel.NewAccountManagementDriver(osext.MustGetenv("KEPPEL_DRIVER_ACCOUNT_MANAGEMENT")))
db := must.Return(keppel.InitDB(cfg.DatabaseURL))
fd := must.Return(keppel.NewFederationDriver(osext.MustGetenv("KEPPEL_DRIVER_FEDERATION"), ad, cfg))
sd := must.Return(keppel.NewStorageDriver(osext.MustGetenv("KEPPEL_DRIVER_STORAGE"), ad, cfg))
icd := must.Return(keppel.NewInboundCacheDriver(osext.MustGetenv("KEPPEL_DRIVER_INBOUND_CACHE"), cfg))
Expand All @@ -67,9 +68,10 @@ func run(cmd *cobra.Command, args []string) {
ctx := httpext.ContextWithSIGINT(context.Background(), 10*time.Second)

// start task loops
janitor := tasks.NewJanitor(cfg, fd, sd, icd, db, auditor)
janitor := tasks.NewJanitor(cfg, fd, sd, icd, db, amd, auditor)
go janitor.AccountFederationAnnouncementJob(nil).Run(ctx)
go janitor.AbandonedUploadCleanupJob(nil).Run(ctx)
go janitor.CreateManagedAccounts(nil).Run(ctx)
go janitor.ManifestGarbageCollectionJob(nil).Run(ctx)
go janitor.BlobMountSweepJob(nil).Run(ctx)
go janitor.BlobSweepJob(nil).Run(ctx)
Expand Down
2 changes: 1 addition & 1 deletion internal/api/registry/manifests_test.go
Expand Up @@ -693,7 +693,7 @@ func TestImageManifestWrongBlobSize(t *testing.T) {

func TestImageManifestCmdEntrypointAsString(t *testing.T) {
testWithPrimary(t, nil, func(s test.Setup) {
j := tasks.NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j := tasks.NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j.DisableJitter()
validateManifestJob := j.ManifestValidationJob(s.Registry)

Expand Down
2 changes: 1 addition & 1 deletion internal/drivers/basic/account_management.go
Expand Up @@ -61,7 +61,7 @@ func init() {
func (a *AccountManagementDriver) PluginTypeID() string { return "basic" }

// ConfigureAccount implements the keppel.AccountManagementDriver interface.
func (a *AccountManagementDriver) Init(envToConfigFile string) error {
func (a *AccountManagementDriver) Init() error {
a.configPath = os.Getenv("KEPPEL_ACCOUNT_MANAGEMENT_FILE")
if a.configPath == "" {
return errors.New("KEPPEL_ACCOUNT_MANAGEMENT_FILE is not set")
Expand Down
17 changes: 16 additions & 1 deletion internal/keppel/account_management_driver.go
Expand Up @@ -19,6 +19,9 @@
package keppel

import (
"errors"

"github.com/sapcc/go-bits/logg"
"github.com/sapcc/go-bits/pluggable"

Check failure on line 25 in internal/keppel/account_management_driver.go

View workflow job for this annotation

GitHub Actions / Build & Lint

File is not `goimports`-ed with -local github.com/sapcc/keppel (goimports)
"github.com/sapcc/keppel/internal/models"
)
Expand All @@ -30,7 +33,7 @@ type AccountManagementDriver interface {
pluggable.Plugin
// Init is called before any other interface methods, and allows the plugin to
// perform first-time initialization.
Init(string) error
Init() error

// Called by a jobloop for every account every once in a while (e.g. every hour).
//
Expand All @@ -50,3 +53,15 @@ type AccountManagementDriver interface {

// AccountManagementDriverRegistry is a pluggable.Registry for AccountManagementDriver implementations.
var AccountManagementDriverRegistry pluggable.Registry[AccountManagementDriver]

// NewAccountManagementDriver creates a new AuthDriver using one of the plugins registered
// with AccountManagementDriver.
func NewAccountManagementDriver(pluginTypeID string) (AccountManagementDriver, error) {
logg.Debug("initializing account management driver %q...", pluginTypeID)

amd := AccountManagementDriverRegistry.Instantiate(pluginTypeID)
if amd == nil {
return nil, errors.New("no such account management driver: " + pluginTypeID)
}
return amd, amd.Init()
}
79 changes: 79 additions & 0 deletions internal/tasks/account_management.go
@@ -0,0 +1,79 @@
/******************************************************************************
*
* Copyright 2024 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package tasks

import (
"context"
"database/sql"
"slices"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sapcc/go-bits/jobloop"
)

// CreateManagedAccounts is a job. Each task creates newly discovered accounts from the driver.
func (j *Janitor) CreateManagedAccounts(registerer prometheus.Registerer) jobloop.Job {
return (&jobloop.ProducerConsumerJob[[]string]{
Metadata: jobloop.JobMetadata{
ReadableName: "create new managed accounts",
CounterOpts: prometheus.CounterOpts{
Name: "keppel_managed_account_creations",
Help: "Counter for managed account creations.",
},
},
DiscoverTask: j.discoverAccountsToCreate,
ProcessTask: j.createNewAccounts,
}).Setup(registerer)
}

func (j *Janitor) discoverAccountsToCreate(_ context.Context, _ prometheus.Labels) (accounts []string, err error) {
managedAccounts, err := j.amd.ManagedAccountNames()
if err != nil {
return nil, err
}

var existingAccounts []string
_, err = j.db.Select(&existingAccounts, "SELECT name FROM accounts WHERE is_managed = true")
if err != nil {
return nil, err
}

for _, account := range managedAccounts {
if !slices.Contains(existingAccounts, account) {
accounts = append(accounts, account)
}
}

if len(accounts) == 0 {
return nil, sql.ErrNoRows
}

return accounts, nil
}

func (j *Janitor) createNewAccounts(ctx context.Context, accounts []string, labels prometheus.Labels) error {
for _, account := range accounts {
_, err := j.db.Exec(accountAnnouncementDoneQuery, account, j.timeNow().Add(j.addJitter(1*time.Hour)))
if err != nil {
return err
}
}
return nil
}
5 changes: 3 additions & 2 deletions internal/tasks/janitor.go
Expand Up @@ -53,6 +53,7 @@ type Janitor struct {
sd keppel.StorageDriver
icd keppel.InboundCacheDriver
db *keppel.DB
amd keppel.AccountManagementDriver
auditor keppel.Auditor

// non-pure functions that can be replaced by deterministic doubles for unit tests
Expand All @@ -62,8 +63,8 @@ type Janitor struct {
}

// NewJanitor creates a new Janitor.
func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor keppel.Auditor) *Janitor {
j := &Janitor{cfg, fd, sd, icd, db, auditor, time.Now, keppel.GenerateStorageID, addJitter}
func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, amd keppel.AccountManagementDriver, auditor keppel.Auditor) *Janitor {
j := &Janitor{cfg, fd, sd, icd, db, amd, auditor, time.Now, keppel.GenerateStorageID, addJitter}
return j
}

Expand Down
4 changes: 2 additions & 2 deletions internal/tasks/shared_test.go
Expand Up @@ -40,7 +40,7 @@ func setup(t *testing.T, opts ...test.SetupOption) (*Janitor, test.Setup) {
test.WithQuotas,
}
s := test.NewSetup(t, append(params, opts...)...)
j := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j.DisableJitter()
return j, s
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func setupReplica(t *testing.T, s1 test.Setup, strategy string) (*Janitor, test.
test.WithQuotas,
)

j2 := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j2 := NewJanitor(s.Config, s.FD, s.SD, s.ICD, s.DB, s.AMD, s.Auditor).OverrideTimeNow(s.Clock.Now).OverrideGenerateStorageID(s.SIDGenerator.Next)
j2.DisableJitter()
return j2, s
}
Expand Down
3 changes: 3 additions & 0 deletions internal/test/setup.go
Expand Up @@ -40,6 +40,7 @@ import (
keppelv1 "github.com/sapcc/keppel/internal/api/keppel"
peerv1 "github.com/sapcc/keppel/internal/api/peer"
registryv2 "github.com/sapcc/keppel/internal/api/registry"
"github.com/sapcc/keppel/internal/drivers/basic"
"github.com/sapcc/keppel/internal/drivers/trivial"
"github.com/sapcc/keppel/internal/keppel"
"github.com/sapcc/keppel/internal/models"
Expand Down Expand Up @@ -152,6 +153,7 @@ type Setup struct {
SIDGenerator *StorageIDGenerator
Auditor *Auditor
AD *AuthDriver
AMD *basic.AccountManagementDriver
FD *FederationDriver
SD *trivial.StorageDriver
ICD *InboundCacheDriver
Expand Down Expand Up @@ -301,6 +303,7 @@ func NewSetup(t *testing.T, opts ...SetupOption) Setup {
// setup essential test doubles
s.Clock = mock.NewClock()
s.SIDGenerator = &StorageIDGenerator{}
s.AMD = &basic.AccountManagementDriver{}
s.Auditor = &Auditor{}

// if we are secondary and we know the primary, share the clock with it
Expand Down

0 comments on commit ae9413b

Please sign in to comment.