Skip to content

Commit

Permalink
Implement all repository functions and move away from mongoClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiep committed Aug 13, 2020
1 parent 8195778 commit c619baf
Show file tree
Hide file tree
Showing 18 changed files with 1,495 additions and 499 deletions.
11 changes: 6 additions & 5 deletions odin-engine/api/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (

"github.com/theycallmemac/odin/odin-engine/pkg/executor"
"github.com/theycallmemac/odin/odin-engine/pkg/fsm"
"github.com/valyala/fasthttp"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"
)

// ExecNode is a type to be used to unmarshal data into after a HTTP request
Expand All @@ -16,15 +17,15 @@ type ExecNode struct {
}

// Executor is used to execute the item at the head of the job queue
func Executor(ctx *fasthttp.RequestCtx) {
func Executor(repo repository.Repository, ctx *fasthttp.RequestCtx) {
var en ExecNode
json.Unmarshal(ctx.PostBody(), &en)
go executor.Execute(en.Items, 0, HTTPAddr, en.Store)
go executor.Execute(repo, en.Items, 0, HTTPAddr, en.Store)
}

// ExecuteYaml is used to execute a job passed to the command line tool
func ExecuteYaml(ctx *fasthttp.RequestCtx) {
func ExecuteYaml(repo repository.Repository, ctx *fasthttp.RequestCtx) {
var en ExecNode
json.Unmarshal(ctx.PostBody(), &en)
go executor.Execute(en.Items, 1, HTTPAddr, en.Store)
go executor.Execute(repo, en.Items, 1, HTTPAddr, en.Store)
}
131 changes: 66 additions & 65 deletions odin-engine/api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,124 +2,125 @@ package api

import (
"bytes"
"fmt"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/theycallmemac/odin/odin-engine/pkg/resources"
"github.com/valyala/fasthttp"

"go.mongodb.org/mongo-driver/bson"
"github.com/valyala/fasthttp"
)

// AddJob is used to create a new job
func AddJob(ctx *fasthttp.RequestCtx) {
body := ctx.PostBody()
func AddJob(repo repository.Repository, ctx *fasthttp.RequestCtx) {
body := ctx.PostBody()
path := jobs.SetupEnvironment(body)
client, err := jobs.SetupClient()
id, err := repo.CreateJob(ctx, body, path, "")
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprintf(ctx, "[FAILED] Job failed to deploy: %v\n", err)
} else {
status := jobs.InsertIntoMongo(client, body, path, "")
fmt.Fprintf(ctx, status)
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) deployed successfully\n", id)
}
}

// DeleteJob is used to delete a job
func DeleteJob(ctx *fasthttp.RequestCtx) {
func DeleteJob(repo repository.Repository, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), " ")
id, uid := args[0], args[1]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
os.RemoveAll("/etc/odin/jobs/" + id)
os.RemoveAll("/etc/odin/logs/" + id)
if err := repo.DeleteJob(ctx, id, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to remove job (%s): %v\n", id, err)
} else {
os.RemoveAll("/etc/odin/jobs/" + id)
os.RemoveAll("/etc/odin/logs/" + id)
if jobs.DeleteJobByValue(client, bson.M{"id": id}, uid) {
fmt.Fprintf(ctx, "Job removed!\n")
} else {
fmt.Fprintf(ctx, "Job with that ID does not exist!\n")
}
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) removed\n", id)
}
}

// UpdateJob is used to update a job
func UpdateJob(ctx *fasthttp.RequestCtx) {
func UpdateJob(repo repository.Repository, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), "_")
id, name, description, schedule, uid := args[0], args[1], args[2], args[3], args[4]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
job := &repository.Job{
ID: id,
UID: uid,
}
if resources.NotEmpty(name) {
job.Name = name
}
if resources.NotEmpty(description) {
job.Description = description
}
if resources.NotEmpty(schedule) {
ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654)
resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml")))
os.Remove(".tmp.yml")
job.Schedule = resp
}
if err := repo.UpdateJob(ctx, job); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
if resources.NotEmpty(name) {
job.Name = name
}
if resources.NotEmpty(description) {
job.Description = description
}
if resources.NotEmpty(schedule) {
ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654)
resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml")))
os.Remove(".tmp.yml")
job.Schedule = resp
}
_ = jobs.UpdateJobByValue(client, job)
fmt.Fprintf(ctx, "Updated job " + id + " successfully\n")
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id)
}
}

// GetJobDescription is used to show a job's description
func GetJobDescription(ctx *fasthttp.RequestCtx) {
func GetJobDescription(repo repository.Repository, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), "_")
id, uid := args[0], args[1]
client, err := jobs.SetupClient()
job, err := repo.GetJobById(ctx, id, uid)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprintf(ctx, "[FAILED] Failed to get job (%s): %v", id, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
fmt.Fprintf(ctx, job.Name + " - " + job.Description + "\n")
fmt.Fprintf(ctx, job.Name+" - "+job.Description+"\n")
}
}

// UpdateJobRuns is used to update a job's run number
func UpdateJobRuns(ctx *fasthttp.RequestCtx) {
// TODO: Get and update job should be done as a transaction
func UpdateJobRuns(repo repository.Repository, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), " ")
id, runs, uid := args[0], args[1], args[2]
client, err := jobs.SetupClient()
job, err := repo.GetJobById(ctx, id, uid)
if err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", id, err)
return
}
inc, err := strconv.Atoi(runs)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprint(ctx, "Invalid run")
return
}
job.Runs = job.Runs + inc
if err := repo.UpdateJob(ctx, job); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
inc, _ := strconv.Atoi(runs)
job.Runs = job.Runs + inc
_ = jobs.UpdateJobByValue(client, job)
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id)
}
}

// ListJobs is used to list the current jobs running
func ListJobs(ctx *fasthttp.RequestCtx) {
client, err := jobs.SetupClient()
func ListJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) {
uid := string(ctx.PostBody())
jobList, err := repo.GetUserJobs(ctx, uid)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
} else {
jobList := jobs.GetUserJobs(client, string(ctx.PostBody()))
fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE"))
for _, job := range jobList {
linkLen := len(job.Links) - 1
if linkLen < 0 {
linkLen = 0
}
fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1]))
fmt.Fprintf(ctx, "[FAILED] Failed to get jobs for user %s\n", uid)
return
}
fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE"))
for _, job := range jobList {
linkLen := len(job.Links) - 1
if linkLen < 0 {
linkLen = 0
}
fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1]))
}
}

// GetJobLogs is used to retrieve the logs for a job
func GetJobLogs(ctx *fasthttp.RequestCtx) {
log, _ := ioutil.ReadFile("/etc/odin/logs/" + string(ctx.PostBody()))
fmt.Fprintf(ctx, "\n" + string(log) + "\n")
fmt.Fprintf(ctx, "\n"+string(log)+"\n")
}
26 changes: 11 additions & 15 deletions odin-engine/api/links.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
package api

import (
"fmt"
"fmt"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/valyala/fasthttp"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"
)

// LinkJobs is used to link two jobs together
func LinkJobs(ctx *fasthttp.RequestCtx) {
func LinkJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) {
split := strings.Split(string(ctx.PostBody()), "_")
from, to, uid := split[0], split[1], split[2]
client, _ := jobs.SetupClient()
updated := jobs.AddJobLink(client, from, to, uid)
if updated == 1 {
fmt.Fprintf(ctx, "Job " + from + " linked to " + to + "!\n")
if err := repo.AddJobLink(ctx, from, to, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Job %s could not be linked to %s: %v\n", from, to, err)
} else {
fmt.Fprintf(ctx, "Job " + from + " could not be linked to " + to + ".\n")
fmt.Fprintf(ctx, "[SUCCESS] Job %s linked to %s\n", from, to)
}
}

// UnlinkJobs is used to delete a job link
func UnlinkJobs(ctx *fasthttp.RequestCtx) {
func UnlinkJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) {
split := strings.Split(string(ctx.PostBody()), "_")
from, to, uid := split[0], split[1], split[2]
client, _ := jobs.SetupClient()
updated := jobs.DeleteJobLink(client, from, to, uid)
if updated == 1 {
fmt.Fprintf(ctx, "Job " + to + " unlinked from " + from + "!\n")
if err := repo.DeleteJobLink(ctx, from, to, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Link %s could not be unlinked from %s: %v\n", to, from, err)
} else {
fmt.Fprintf(ctx, "Job " + to + " has no links!\n")
fmt.Fprintf(ctx, "[SUCCESS] Link %s unlinked from %s\n", to, from)
}
}
24 changes: 12 additions & 12 deletions odin-engine/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,38 @@ func (service *Service) Start() {
case "/cluster/leave":
service.LeaveCluster(ctx)
case "/execute":
Executor(ctx)
Executor(service.repo, ctx)
case "/execute/yaml":
ExecuteYaml(ctx)
ExecuteYaml(service.repo, ctx)
case "/jobs/add":
AddJob(ctx)
AddJob(service.repo, ctx)
case "/jobs/delete":
DeleteJob(ctx)
DeleteJob(service.repo, ctx)
case "/jobs/info/update":
UpdateJob(ctx)
UpdateJob(service.repo, ctx)
case "/jobs/info/description":
GetJobDescription(ctx)
GetJobDescription(service.repo, ctx)
case "/jobs/info/runs":
UpdateJobRuns(ctx)
UpdateJobRuns(service.repo, ctx)
case "/jobs/list":
ListJobs(ctx)
ListJobs(service.repo, ctx)
case "/jobs/logs":
GetJobLogs(ctx)
case "/links/add":
LinkJobs(ctx)
LinkJobs(service.repo, ctx)
case "/links/delete":
UnlinkJobs(ctx)
UnlinkJobs(service.repo, ctx)
case "/schedule":
GetJobSchedule(ctx)
case "/stats/add":
AddJobStats(ctx)
AddJobStats(service.repo, ctx)
case "/stats/get":
GetJobStats(service.repo, ctx)
}
}

// start the countdown timer for the execution until the first job
go jobs.StartTicker(service.store, service.addr)
go jobs.StartTicker(service.repo, service.store, service.addr)

HTTPAddr = service.addr
fasthttp.ListenAndServe(HTTPAddr, routes)
Expand Down
44 changes: 12 additions & 32 deletions odin-engine/api/stats.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package api

import (
"context"
"fmt"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"

"go.mongodb.org/mongo-driver/mongo"
)

// JobStats is a type to be used for accessing and storing job stats information
Expand All @@ -22,45 +19,28 @@ type JobStats struct {
}

// AddJobStats is used to parse collected metrics
func AddJobStats(ctx *fasthttp.RequestCtx) {
func AddJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), ",")
typeOfValue, desc, value, id, timestamp := args[0], args[1], args[2], args[3], args[4]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
} else {
if InsertIntoMongo(client, typeOfValue, desc, value, id, timestamp) {
fmt.Fprintf(ctx, "200")
} else {
fmt.Fprintf(ctx, "500")
}
js := &repository.JobStats{
ID: id,
Description: desc,
Type: typeOfValue,
Value: value,
Timestamp: timestamp,
}
}

// InsertIntoMongo is used to add collected metrics to the observability collection
// parameters: client (a *mongo.Client), typeOfValue (a string of the type of value being stored), desc (a string describing the value being stored), value (a string of the value being stored), id (a string of the associated Job ID), timestamp (a string of the unix time at which the operation took place)
// returns: bool (true is successful, false if otherwise)
func InsertIntoMongo(client *mongo.Client, typeOfValue string, desc string, value string, id string, timestamp string) bool {
var js JobStats
js.ID = id
js.Description = desc
js.Type = typeOfValue
js.Value = value
js.Timestamp = timestamp
collection := client.Database("odin").Collection("observability")
_, err := collection.InsertOne(context.TODO(), js)
client.Disconnect(context.TODO())
if err != nil {
return false
if err := repo.CreateJobStats(ctx, js); err != nil {
fmt.Fprintf(ctx, "500")
} else {
fmt.Fprintf(ctx, "200")
}
return true
}

// GetJobStats is used to show stats collected by a specified job
func GetJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) {
statsList, err := repo.GetJobStats(ctx, string(ctx.PostBody()))
if err != nil {
fmt.Fprintf(ctx, "[ERROR] Cannot get job stats: %v", err)
fmt.Fprintf(ctx, "[FAILED] Cannot get job stats: %v\n", err)
} else {
for _, stat := range statsList {
fmt.Fprintf(ctx, jobs.Format(stat.ID, stat.Description, stat.Type, stat.Value))
Expand Down

0 comments on commit c619baf

Please sign in to comment.