Skip to content

Commit

Permalink
Add pluggable repository layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiep committed Jul 25, 2020
1 parent b024f1d commit 7f1cf53
Show file tree
Hide file tree
Showing 10 changed files with 577 additions and 58 deletions.
88 changes: 45 additions & 43 deletions odin-engine/api/main.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package api

import (
"os"
"syscall"
"os"
"syscall"

"github.com/theycallmemac/odin/odin-engine/pkg/fsm"
"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"
"github.com/theycallmemac/odin/odin-engine/pkg/fsm"
"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
)

var (
// HTTPAddr contains the port used by this node
HTTPAddr string
// HTTPAddr contains the port used by this node
HTTPAddr string
)


// SetOdinEnv is used to set variables to be used by running jobs via Odin SDK
// parameters: mongoURL (a string of the address for the MongoDB instance)
// returns: nil
Expand All @@ -31,15 +31,17 @@ func SetOdinEnv(mongoURL string) {
type Service struct {
addr string
store fsm.Store
repo repository.Repository
}

// NewService is used to initialize a new service struct
// parameters: addr (a string of a http address), store (a store of node details)
// returns: *Service (a newly initialized service struct)
func NewService(addr string, store fsm.Store) *Service {
func NewService(addr string, store fsm.Store, repo repository.Repository) *Service {
return &Service{
addr: addr,
store: store,
repo: repo,
}
}

Expand All @@ -49,44 +51,44 @@ func NewService(addr string, store fsm.Store) *Service {
func (service *Service) Start() {
routes := func(ctx *fasthttp.RequestCtx) {
switch string(ctx.Path()) {
case "/cluster/join":
service.JoinCluster(ctx)
case "/cluster/leave":
service.LeaveCluster(ctx)
case "/execute":
Executor(ctx)
case "/execute/yaml":
ExecuteYaml(ctx)
case "/jobs/add":
AddJob(ctx)
case "/jobs/delete":
DeleteJob(ctx)
case "/jobs/info/update":
UpdateJob(ctx)
case "/jobs/info/description":
GetJobDescription(ctx)
case "/jobs/info/runs":
UpdateJobRuns(ctx)
case "/jobs/list":
ListJobs(ctx)
case "/jobs/logs":
GetJobLogs(ctx)
case "/links/add":
LinkJobs(ctx)
case "/links/delete":
UnlinkJobs(ctx)
case "/schedule":
GetJobSchedule(ctx)
case "/stats/add":
AddJobStats(ctx)
case "/stats/get":
GetJobStats(ctx)
case "/cluster/join":
service.JoinCluster(ctx)
case "/cluster/leave":
service.LeaveCluster(ctx)
case "/execute":
Executor(ctx)
case "/execute/yaml":
ExecuteYaml(ctx)
case "/jobs/add":
AddJob(ctx)
case "/jobs/delete":
DeleteJob(ctx)
case "/jobs/info/update":
UpdateJob(ctx)
case "/jobs/info/description":
GetJobDescription(ctx)
case "/jobs/info/runs":
UpdateJobRuns(ctx)
case "/jobs/list":
ListJobs(ctx)
case "/jobs/logs":
GetJobLogs(ctx)
case "/links/add":
LinkJobs(ctx)
case "/links/delete":
UnlinkJobs(ctx)
case "/schedule":
GetJobSchedule(ctx)
case "/stats/add":
AddJobStats(ctx)
case "/stats/get":
GetJobStats(service.repo, ctx)
}
}

// start the countdown timer for the execution until the first job
go jobs.StartTicker(service.store, service.addr)
// start the countdown timer for the execution until the first job
go jobs.StartTicker(service.store, service.addr)

HTTPAddr = service.addr
HTTPAddr = service.addr
fasthttp.ListenAndServe(HTTPAddr, routes)
}
17 changes: 8 additions & 9 deletions odin-engine/api/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package api

import (
"context"
"fmt"
"fmt"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/valyala/fasthttp"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"

"go.mongodb.org/mongo-driver/mongo"
)
Expand All @@ -29,9 +30,9 @@ func AddJobStats(ctx *fasthttp.RequestCtx) {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
} else {
if InsertIntoMongo(client, typeOfValue, desc, value, id, timestamp) {
fmt.Fprintf(ctx,"200")
fmt.Fprintf(ctx, "200")
} else {
fmt.Fprintf(ctx,"500")
fmt.Fprintf(ctx, "500")
}
}
}
Expand All @@ -56,13 +57,11 @@ func InsertIntoMongo(client *mongo.Client, typeOfValue string, desc string, valu
}

// GetJobStats is used to show stats collected by a specified job
func GetJobStats(ctx *fasthttp.RequestCtx) {
client, err := jobs.SetupClient()
func GetJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) {
statsList, err := repo.GetJobStats(ctx, string(ctx.PostBody()))
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprintf(ctx, "[ERROR] Cannot get job stats: %v", err)
} else {
statsList := jobs.GetJobStats(client, string(ctx.PostBody()))
fmt.Fprintf(ctx, jobs.Format("ID", "DESCRIPTION", "TYPE", "VALUE"))
for _, stat := range statsList {
fmt.Fprintf(ctx, jobs.Format(stat.ID, stat.Description, stat.Type, stat.Value))
}
Expand Down
28 changes: 22 additions & 6 deletions odin-engine/cmd/odin-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/theycallmemac/odin/odin-engine/api"
"github.com/theycallmemac/odin/odin-engine/pkg/fsm"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
_ "github.com/theycallmemac/odin/odin-engine/pkg/repository/all"
"github.com/theycallmemac/odin/odin-engine/pkg/resources"
)

Expand All @@ -28,10 +30,12 @@ const (

// define variables to be used in setting up the engine
var (
httpAddr string
raftAddr string
joinAddr string
nodeID string
httpAddr string
raftAddr string
joinAddr string
nodeID string
storageName string
storageAddr string
)

// init is used to define options to be used when running the engine
Expand All @@ -42,6 +46,8 @@ func init() {
flag.StringVar(&raftAddr, "raft", DefaultRaftAddr, "Set Raft bind address")
flag.StringVar(&joinAddr, "join", "", "Set join address, if any")
flag.StringVar(&nodeID, "id", "", "Node ID")
flag.StringVar(&storageName, "storage_name", "", "Storage name (e.g mongodb)")
flag.StringVar(&storageAddr, "storage_address", "", "Storage address")
}

// main is used to setup the odin-engine as a single node cluster, as well as allowing for further nodes to joining it. This is all done with the use of flags when initially running the engine.
Expand All @@ -67,15 +73,25 @@ func main() {
log.Fatalf("%v", err)
}

reg, err := repository.GetRegistration(storageName)
if err != nil {
log.Fatal(err)
}
// TODO: options is nil for now. Pass proper storage options
repo, err := reg.OpenFunc(storageAddr, nil)
if err != nil {
log.Fatal(err)
}

if httpAddr == "" {
usr, _ := user.Current()
config := resources.UnmarsharlYaml(resources.ReadFileBytes(usr.HomeDir + "/odin-config.yml"))
api.SetOdinEnv(config.Mongo.Address)
httpAddr = config.OdinVars.Master + ":" + config.OdinVars.Port
}
service := api.NewService(httpAddr, *s)
service := api.NewService(httpAddr, *s, repo)
go service.Start()
fmt.Println("ADDR:",httpAddr,"JOIN:",joinAddr, "RAFT:", raftAddr)
fmt.Println("ADDR:", httpAddr, "JOIN:", joinAddr, "RAFT:", raftAddr)
if joinAddr != "" {
if err := join(joinAddr, raftAddr, nodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", joinAddr, err.Error())
Expand Down
2 changes: 2 additions & 0 deletions odin-engine/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.13
require (
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/hashicorp/raft v1.1.2
github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa
github.com/lnquy/cron v1.0.1
github.com/stretchr/testify v1.3.0
github.com/valyala/fasthttp v1.14.0
go.mongodb.org/mongo-driver v1.3.4
gopkg.in/yaml.v2 v2.3.0
Expand Down

0 comments on commit 7f1cf53

Please sign in to comment.