Skip to content

Commit

Permalink
Implement all repository functions and move away from mongoClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiep committed Aug 6, 2020
1 parent 8195778 commit b7d7907
Show file tree
Hide file tree
Showing 6 changed files with 783 additions and 47 deletions.
2 changes: 2 additions & 0 deletions odin-engine/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/theycallmemac/odin/odin-engine

go 1.13

replace github.com/hidal-go/hidalgo => github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06

require (
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/hashicorp/raft v1.1.2
Expand Down
4 changes: 4 additions & 0 deletions odin-engine/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:hBE4LGxApbZiV/3YoEPv7uYlUMWOogG1hwtkpiU87zQ=
github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:PmwFwRyxsWbp6bGa+cNbfDdJm3cyz41ut83kblKc6o8=
github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 h1:ZKIptNmc3Mfb8pNDMz3kM2dZAWliF53qZFno8XcqbRw=
github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
33 changes: 5 additions & 28 deletions odin-engine/pkg/jobs/mongoClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type JobStats struct {
Value string
}


// getHome is used to get the path to the user's home directory
// parameters: nil
// return string (the path to the user's home)
Expand Down Expand Up @@ -112,34 +111,13 @@ func InsertIntoMongo(client *mongo.Client, d []byte, path string, uid string) st
if string(GetJobByValue(client, bson.M{"id": string(job.ID)}, uid).ID) == string(job.ID) {
return "Job with ID: " + job.ID + " already exists\n"
}
collection := client.Database("odin").Collection("jobs")
_, err := collection.InsertOne(context.TODO(), job)
client.Disconnect(context.TODO())
if err != nil {
log.Fatalln("Error on inserting new job", err)
}
return "Job: " + job.ID + " deployed successfully\n"
}

// GetJobStats is used to retrieve the stats associated with each job from the MongoDB instance
// parameters: client (a *mongo.Client), id (a string representation of a job's id)
// returns: []JobStats (the collection of fetched job stats)
func GetJobStats(client *mongo.Client, id string) []JobStats {
var statMap map[string]string
var jobStats JobStats
var statsList []JobStats
collection := client.Database("odin").Collection("observability")
documents, _ := collection.Find(context.TODO(), bson.M{"id": id})
collection := client.Database("odin").Collection("jobs")
_, err := collection.InsertOne(context.TODO(), job)
client.Disconnect(context.TODO())
for documents.Next(context.TODO()) {
documents.Decode(&statMap)
jobStats.ID = statMap["id"]
jobStats.Description = statMap["desc"]
jobStats.Type = statMap["type"]
jobStats.Value = statMap["value"]
statsList = append(statsList, jobStats)
if err != nil {
log.Fatalln("Error on inserting new job", err)
}
return statsList
return "Job: " + job.ID + " deployed successfully\n"
}

// GetJobByValue is used to return a job in MongoDB by filtering on a certain value pertaining to that job
Expand Down Expand Up @@ -289,7 +267,6 @@ func DeleteJobLink(client *mongo.Client, from string, to string, uid string) int
return updateResult.ModifiedCount
}


// RunLinks is used to run jobs linked to a job which has just been executed
// parameters: links (a string array of Job ID's to execute), uid (a uint32 of that user's id), httpAddr (a string port of the master node), store (a fsm.Store containing information about other nodes)
// returns: nil
Expand Down
183 changes: 171 additions & 12 deletions odin-engine/pkg/repository/nosql/nosql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package nosql

import (
"context"
"encoding/json"
"fmt"

"github.com/hidal-go/hidalgo/legacy/nosql/mongo"

Expand All @@ -27,30 +29,156 @@ func init() {
})
}

// Repository is implementation of Repository interface
type Repository struct {
db nosql.Database
}

func (repo *Repository) ensureIndex(ctx context.Context) error {
return repo.db.EnsureIndex(
if err := repo.db.EnsureIndex(
ctx,
base.ObservabilityTable,
nosql.Index{
Fields: []string{"_id"},
Fields: []string{"id"},
Type: nosql.StringExact,
},
[]nosql.Index{
{
Fields: []string{"id"},
Type: nosql.StringExact,
},
nil,
); err != nil {
return err
}

if err := repo.db.EnsureIndex(
ctx,
base.JobTable,
nosql.Index{
Fields: []string{"id"},
Type: nosql.StringExact,
},
nil,
); err != nil {
return err
}

return nil
}

// CreateJob creates a new job for an user
func (repo *Repository) CreateJob(ctx context.Context, data []byte, path string, uid string) error {
job := &base.Job{}
if err := json.Unmarshal(data, job); err != nil {
return err
}
if _, err := repo.GetJobById(ctx, job.ID, uid); err == nil {
return fmt.Errorf("job with id %s exists", job.ID)
} else if err != nosql.ErrNotFound {
return err
}
job.File = path
job.Runs = 0
doc := marshalJob(job)
_, err := repo.db.Insert(ctx, base.JobTable, []string{job.ID}, doc)
return err
}

// GetJobById returns a job by filtering on a certain value pertaining to that job
func (repo *Repository) GetJobById(ctx context.Context, id string, uid string) (*base.Job, error) {
doc, err := repo.db.Query(base.JobTable).WithFields(
nosql.FieldFilter{
Path: []string{"_id"},
Filter: nosql.Equal,
Value: nosql.String(id),
},
)
nosql.FieldFilter{
Path: []string{"uid"},
Filter: nosql.Equal,
Value: nosql.String(uid),
},
).One(ctx)

if err != nil {
return nil, err
}

job := &base.Job{}
unmarshalJob(doc, job)
return job, nil
}

func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobStats, error) {
// GetUserJobs returns all jobs belonging to an user
func (repo *Repository) GetUserJobs(ctx context.Context, uid string) ([]*base.Job, error) {
iter := repo.db.Query(base.JobTable).WithFields(nosql.FieldFilter{
Path: []string{"uid"},
Filter: nosql.Equal,
Value: nosql.String(uid),
}).Iterate()

if iter.Err() != nil {
return nil, iter.Err()
}
defer iter.Close()

results := make([]*base.Job, 0)
for iter.Next(ctx) {
doc := iter.Doc()
job := &base.Job{}
unmarshalJob(doc, job)
results = append(results, job)
}
return results, nil
}

// GetAll returns all jobs
func (repo *Repository) GetAll(ctx context.Context) ([]*base.Job, error) {
iter := repo.db.Query(base.JobTable).Iterate()
if iter.Err() != nil {
return nil, iter.Err()
}
defer iter.Close()

results := make([]*base.Job, 0)
for iter.Next(ctx) {
doc := iter.Doc()
job := &base.Job{}
unmarshalJob(doc, job)
results = append(results, job)
}
return results, nil
}

// UpdateJob modifies a job
func (repo *Repository) UpdateJob(ctx context.Context, job *base.Job) error {
key := []string{job.ID}
doc, err := repo.db.FindByKey(ctx, base.JobTable, key)
if err != nil {
return err
}
doc["name"] = nosql.String(job.Name)
doc["description"] = nosql.String(job.Description)
doc["schedule"] = nosql.String(job.Schedule)
doc["runs"] = nosql.Int(job.Runs)
return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx)
}

// DeleteJob deletes an user's job
func (repo *Repository) DeleteJob(ctx context.Context, id string, uid string) error {
return repo.db.Delete(base.JobTable).WithFields(
nosql.FieldFilter{
Path: []string{"_id"},
Filter: nosql.Equal,
Value: nosql.String(id),
},
nosql.FieldFilter{
Path: []string{"uid"},
Filter: nosql.Equal,
Value: nosql.String(uid),
},
).Do(ctx)
}

// GetJobStats returns stats of a job given the job id
func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]*base.JobStats, error) {
iter := repo.db.Query(base.ObservabilityTable).WithFields(nosql.FieldFilter{
Path: []string{"id"},
Path: []string{"_id"},
Filter: nosql.Equal,
Value: nosql.String(id),
}).Iterate()
Expand All @@ -60,10 +188,10 @@ func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobS
}
defer iter.Close()

results := make([]base.JobStats, 0)
results := make([]*base.JobStats, 0)
for iter.Next(ctx) {
doc := iter.Doc()
jobStats := base.JobStats{
jobStats := &base.JobStats{
ID: string(doc["id"].(nosql.String)),
Description: string(doc["desc"].(nosql.String)),
Type: string(doc["type"].(nosql.String)),
Expand All @@ -74,6 +202,37 @@ func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobS
return results, nil
}

// Close closes db connection
func (repo *Repository) Close() error {
return repo.db.Close()
}

func unmarshalJob(doc nosql.Document, job *base.Job) {
job.ID = string(doc["id"].(nosql.String))
job.UID = string(doc["uid"].(nosql.String))
job.GID = string(doc["gid"].(nosql.String))
job.Name = string(doc["name"].(nosql.String))
job.Description = string(doc["description"].(nosql.String))
job.Language = string(doc["language"].(nosql.String))
job.File = string(doc["file"].(nosql.String))
job.Stats = string(doc["stats"].(nosql.String))
job.Schedule = string(doc["schedule"].(nosql.String))
job.Runs = int(doc["runs"].(nosql.Int))
job.Links = string(doc["links"].(nosql.String))
}

func marshalJob(job *base.Job) nosql.Document {
return nosql.Document{
"id": nosql.String(job.ID),
"uid": nosql.String(job.UID),
"gid": nosql.String(job.GID),
"name": nosql.String(job.Name),
"description": nosql.String(job.Description),
"language": nosql.String(job.Language),
"file": nosql.String(job.File),
"stats": nosql.String(job.Stats),
"schedule": nosql.String(job.Schedule),
"runs": nosql.Int(job.Runs),
"links": nosql.String(job.Links),
}
}

0 comments on commit b7d7907

Please sign in to comment.