Skip to content

Commit

Permalink
refactor: synchronization server
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Igrychev <alexey.igrychev@flant.com>
  • Loading branch information
alexey-igrychev committed Mar 17, 2022
1 parent 660a937 commit bd03900
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cmd/werf/synchronization/main.go
Expand Up @@ -104,7 +104,7 @@ func runSynchronization() error {
}

var distributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error)
var stagesStorageCacheFactoryFunc func(clientID string) (storage.StagesStorageCache, error)
var stagesStorageCacheFactoryFunc func(clientID string) (synchronization_server.StagesStorageCacheInterface, error)

if cmdData.Kubernetes {
if err := kube.Init(kube.InitOptions{kube.KubeConfigOptions{
Expand Down Expand Up @@ -137,7 +137,7 @@ func runSynchronization() error {
return distributed_locker.NewOptimisticLockingStorageBasedBackend(store), nil
}

stagesStorageCacheFactoryFunc = func(clientID string) (storage.StagesStorageCache, error) {
stagesStorageCacheFactoryFunc = func(clientID string) (synchronization_server.StagesStorageCacheInterface, error) {
return storage.NewKubernetesStagesStorageCache("werf-synchronization", kube.Client, func(projectName string) string {
return fmt.Sprintf("werf-%s", clientID)
}), nil
Expand All @@ -153,7 +153,7 @@ func runSynchronization() error {
return distributed_locker.NewOptimisticLockingStorageBasedBackend(store), nil
}

stagesStorageCacheFactoryFunc = func(clientID string) (storage.StagesStorageCache, error) {
stagesStorageCacheFactoryFunc = func(clientID string) (synchronization_server.StagesStorageCacheInterface, error) {
return storage.NewFileStagesStorageCache(filepath.Join(stagesStorageCacheBaseDir, clientID)), nil
}
}
Expand Down
@@ -1,12 +1,12 @@
package storage
package synchronization_server

import (
"context"

"github.com/werf/werf/pkg/image"
)

type StagesStorageCache interface {
type StagesStorageCacheInterface interface {
GetAllStages(ctx context.Context, projectName string) (bool, []image.StageID, error)
DeleteAllStages(ctx context.Context, projectName string) error
GetStagesByDigest(ctx context.Context, projectName, digest string) (bool, []image.StageID, error)
Expand Down
Expand Up @@ -6,11 +6,10 @@ import (

"github.com/werf/logboek"
"github.com/werf/werf/pkg/image"
"github.com/werf/werf/pkg/storage"
"github.com/werf/werf/pkg/util"
)

func NewStagesStorageCacheHttpHandler(stagesStorageCache storage.StagesStorageCache) *StagesStorageCacheHttpHandler {
func NewStagesStorageCacheHttpHandler(stagesStorageCache StagesStorageCacheInterface) *StagesStorageCacheHttpHandler {
handler := &StagesStorageCacheHttpHandler{
StagesStorageCache: stagesStorageCache,
ServeMux: http.NewServeMux(),
Expand All @@ -25,7 +24,7 @@ func NewStagesStorageCacheHttpHandler(stagesStorageCache storage.StagesStorageCa

type StagesStorageCacheHttpHandler struct {
*http.ServeMux
StagesStorageCache storage.StagesStorageCache
StagesStorageCache StagesStorageCacheInterface
}

type GetAllStagesRequest struct {
Expand Down
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/werf/logboek"
"github.com/werf/werf/pkg/image"
"github.com/werf/werf/pkg/storage"
"github.com/werf/werf/pkg/util"
)

Expand All @@ -15,7 +14,7 @@ type StageIDLegacy struct {
UniqueID int64 `json:"uniqueID"`
}

func NewStagesStorageCacheHttpHandlerLegacy(stagesStorageCache storage.StagesStorageCache) *StagesStorageCacheHttpHandlerLegacy {
func NewStagesStorageCacheHttpHandlerLegacy(stagesStorageCache StagesStorageCacheInterface) *StagesStorageCacheHttpHandlerLegacy {
handler := &StagesStorageCacheHttpHandlerLegacy{
StagesStorageCache: stagesStorageCache,
ServeMux: http.NewServeMux(),
Expand All @@ -30,7 +29,7 @@ func NewStagesStorageCacheHttpHandlerLegacy(stagesStorageCache storage.StagesSto

type StagesStorageCacheHttpHandlerLegacy struct {
*http.ServeMux
StagesStorageCache storage.StagesStorageCache
StagesStorageCache StagesStorageCacheInterface
}

type GetAllStagesRequestLegacy struct {
Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/synchronization_server/synchronization_server.go
Expand Up @@ -11,11 +11,10 @@ import (

"github.com/werf/lockgate/pkg/distributed_locker"
"github.com/werf/logboek"
"github.com/werf/werf/pkg/storage"
"github.com/werf/werf/pkg/util"
)

func RunSynchronizationServer(_ context.Context, ip, port string, distributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error), stagesStorageCacheFactoryFunc func(clientID string) (storage.StagesStorageCache, error)) error {
func RunSynchronizationServer(_ context.Context, ip, port string, distributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error), stagesStorageCacheFactoryFunc func(clientID string) (StagesStorageCacheInterface, error)) error {
handler := NewSynchronizationServerHandler(distributedLockerBackendFactoryFunc, stagesStorageCacheFactoryFunc)
return http.ListenAndServe(fmt.Sprintf("%s:%s", ip, port), handler)
}
Expand All @@ -24,13 +23,13 @@ type SynchronizationServerHandler struct {
*http.ServeMux

DistributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error)
StagesStorageCacheFactoryFunc func(clientID string) (storage.StagesStorageCache, error)
StagesStorageCacheFactoryFunc func(clientID string) (StagesStorageCacheInterface, error)

mux sync.Mutex
SynchronizationServerByClientID map[string]*SynchronizationServerHandlerByClientID
}

func NewSynchronizationServerHandler(distributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error), stagesStorageCacheFactoryFunc func(requestID string) (storage.StagesStorageCache, error)) *SynchronizationServerHandler {
func NewSynchronizationServerHandler(distributedLockerBackendFactoryFunc func(clientID string) (distributed_locker.DistributedLockerBackend, error), stagesStorageCacheFactoryFunc func(requestID string) (StagesStorageCacheInterface, error)) *SynchronizationServerHandler {
srv := &SynchronizationServerHandler{
ServeMux: http.NewServeMux(),
DistributedLockerBackendFactoryFunc: distributedLockerBackendFactoryFunc,
Expand Down Expand Up @@ -158,10 +157,10 @@ type SynchronizationServerHandlerByClientID struct {
ClientID string

DistributedLockerBackend distributed_locker.DistributedLockerBackend
StagesStorageCache storage.StagesStorageCache
StagesStorageCache StagesStorageCacheInterface
}

func NewSynchronizationServerHandlerByClientID(clientID string, distributedLockerBackend distributed_locker.DistributedLockerBackend, stagesStorageCache storage.StagesStorageCache) *SynchronizationServerHandlerByClientID {
func NewSynchronizationServerHandlerByClientID(clientID string, distributedLockerBackend distributed_locker.DistributedLockerBackend, stagesStorageCache StagesStorageCacheInterface) *SynchronizationServerHandlerByClientID {
srv := &SynchronizationServerHandlerByClientID{
ServeMux: http.NewServeMux(),
ClientID: clientID,
Expand Down

0 comments on commit bd03900

Please sign in to comment.