Skip to content

Commit

Permalink
Merge pull request #122 from t25kim/c_mgt
Browse files Browse the repository at this point in the history
Calculate scores by collecting resource information from all edge devices
  • Loading branch information
MoonkiHong committed Sep 10, 2020
2 parents 0fd7e52 + 05e7edd commit 22ce49b
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 8 deletions.
62 changes: 62 additions & 0 deletions src/controller/scoringmgr/scoringmgr.go 100644 → 100755
Expand Up @@ -20,6 +20,7 @@ package scoringmgr

import (
"common/resourceutil"
"errors"
"math"
)

Expand All @@ -31,6 +32,8 @@ const (
// Scoring is the interface to apply application specific scoring functions
type Scoring interface {
GetScore(ID string) (scoreValue float64, err error)
GetScoreWithResource(resource map[string]interface{}) (scoreValue float64, err error)
GetResource(ID string) (resource map[string]interface{}, err error)
}

// ScoringImpl structure
Expand Down Expand Up @@ -62,6 +65,65 @@ func (ScoringImpl) GetScore(ID string) (scoreValue float64, err error) {
return
}

// GetResource provides resource value for running applications on local device
func (ScoringImpl) GetResource(ID string) (resource map[string]interface{}, err error) {
resource = make(map[string]interface{})
cpuUsage, err := resourceIns.GetResource(resourceutil.CPUUsage)
if err != nil {
resource["error"] = INVALID_SCORE
return
} else {
resource["cpuUsage"] = cpuUsage
}

cpuCount, err := resourceIns.GetResource(resourceutil.CPUCount)
if err != nil {
resource["error"] = INVALID_SCORE
return
} else {
resource["cpuCount"] = cpuCount
}

cpuFreq, err := resourceIns.GetResource(resourceutil.CPUFreq)
if err != nil {
resource["error"] = INVALID_SCORE
return
} else {
resource["cpuFreq"] = cpuFreq
}

netBandwidth, err := resourceIns.GetResource(resourceutil.NetBandwidth)
if err != nil {
resource["error"] = INVALID_SCORE
return
} else {
resource["netBandwidth"] = netBandwidth
}

resourceIns.SetDeviceID(ID)
rtt, err := resourceIns.GetResource(resourceutil.NetRTT)
if err != nil {
resource["error"] = INVALID_SCORE
return
} else {
resource["rtt"] = rtt
}

return
}

// GetScoreWithResource provides score value of an edge device
func (ScoringImpl) GetScoreWithResource(resource map[string]interface{}) (scoreValue float64, err error) {
if _, found := resource["error"]; found {
return INVALID_SCORE, errors.New("Resource Not Found")
}

cpuScore := cpuScore(resource["cpuUsage"].(float64), resource["cpuCount"].(float64), resource["cpuFreq"].(float64))
netScore := netScore(resource["netBandwidth"].(float64))
renderingScore := renderingScore(resource["rtt"].(float64))
return float64(netScore + (cpuScore / 2) + renderingScore), nil
}

func calculateScore(ID string) float64 {
cpuUsage, err := resourceIns.GetResource(resourceutil.CPUUsage)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions src/orchestrationapi/orchestration.go 100644 → 100755
Expand Up @@ -57,6 +57,8 @@ type OrcheInternalAPI interface {
ExecuteAppOnLocal(appInfo map[string]interface{})
HandleNotificationOnLocal(serviceID float64, status string) error
GetScore(target string) (scoreValue float64, err error)
GetScoreWithResource(target map[string]interface{}) (scoreValue float64, err error)
GetResource(target string) (resourceMsg map[string]interface{}, err error)
}

var (
Expand Down Expand Up @@ -217,6 +219,16 @@ func (o orcheImpl) GetScore(devID string) (scoreValue float64, err error) {
return o.scoringIns.GetScore(devID)
}

// GetScoreWithResource gets a resource score of local device for specific app
func (o orcheImpl) GetScoreWithResource(resource map[string]interface{}) (scoreValue float64, err error) {
return o.scoringIns.GetScoreWithResource(resource)
}

// GetResource gets resource values of local device for running apps
func (o orcheImpl) GetResource(devID string) (resourceMsg map[string]interface{}, err error) {
return o.scoringIns.GetResource(devID)
}

// RequestVerifierConf setting up configuration of white list containers
func (o orcheImpl) RequestVerifierConf(containerInfo verifier.RequestVerifierConf) verifier.ResponseVerifierConf {
return o.verifierIns.RequestVerifierConf(containerInfo)
Expand Down
118 changes: 110 additions & 8 deletions src/orchestrationapi/orchestration_api.go 100644 → 100755
Expand Up @@ -56,10 +56,11 @@ type orcheImpl struct {
clientAPI client.Clienter
}

type deviceScore struct {
type deviceInfo struct {
id string
endpoint string
score float64
resource map[string]interface{}
execType string
}

Expand All @@ -73,6 +74,7 @@ type orcheClient struct {
type RequestServiceInfo struct {
ExecutionType string
ExeCmd []string
ExeOption map[string]interface{}
}

type ReqeustService struct {
Expand Down Expand Up @@ -137,8 +139,10 @@ func (orcheEngine *orcheImpl) RequestService(serviceInfo ReqeustService) Respons
go serviceClient.listenNotify()

executionTypes := make([]string, 0)
var scoringType string
for _, info := range serviceInfo.ServiceInfo {
executionTypes = append(executionTypes, info.ExecutionType)
scoringType, _ = info.ExeOption["scoringType"].(string)
}

candidates, err := orcheEngine.getCandidate(serviceInfo.ServiceName, executionTypes)
Expand All @@ -165,7 +169,21 @@ func (orcheEngine *orcheImpl) RequestService(serviceInfo ReqeustService) Respons
RemoteTargetInfo: TargetInfo{},
}

deviceScores := sortByScore(orcheEngine.gatherDevicesScore(candidates, serviceInfo.SelfSelection))
var deviceScores []deviceInfo

if scoringType == "resource" {
deviceResources := orcheEngine.gatherDevicesResource(candidates, serviceInfo.SelfSelection)
if len(deviceResources) <= 0 {
return errorResp
}
for i, dev := range deviceResources {
deviceResources[i].score, err = orcheEngine.GetScoreWithResource(dev.resource)
}
deviceScores = sortByScore(deviceResources)
} else {
deviceScores = sortByScore(orcheEngine.gatherDevicesScore(candidates, serviceInfo.SelfSelection))
}

if len(deviceScores) <= 0 {
return errorResp
} else if deviceScores[0].score == scoringmgr.INVALID_SCORE {
Expand Down Expand Up @@ -245,12 +263,12 @@ func (orcheEngine orcheImpl) getCandidate(appName string, execType []string) (de
return helper.GetDeviceInfoWithService(appName, execType)
}

func (orcheEngine orcheImpl) gatherDevicesScore(candidates []dbhelper.ExecutionCandidate, selfSelection bool) (deviceScores []deviceScore) {
func (orcheEngine orcheImpl) gatherDevicesScore(candidates []dbhelper.ExecutionCandidate, selfSelection bool) (deviceScores []deviceInfo) {
count := len(candidates)
if !selfSelection {
count -= 1
}
scores := make(chan deviceScore, count)
scores := make(chan deviceInfo, count)

info, err := sysDBExecutor.Get(sysDB.ID)
if err != nil {
Expand Down Expand Up @@ -295,7 +313,7 @@ func (orcheEngine orcheImpl) gatherDevicesScore(candidates []dbhelper.ExecutionC

if len(cand.Endpoint) == 0 {
log.Println("[orchestrationapi] cannot getting score, cause by ip list is empty")
scores <- deviceScore{endpoint: "", score: float64(0.0), id: cand.Id}
scores <- deviceInfo{endpoint: "", score: float64(0.0), id: cand.Id}
return
}

Expand All @@ -310,15 +328,15 @@ func (orcheEngine orcheImpl) gatherDevicesScore(candidates []dbhelper.ExecutionC

if err != nil {
log.Println("[orchestrationapi] cannot getting score from :", cand.Endpoint[0], "cause by", err.Error())
scores <- deviceScore{endpoint: cand.Endpoint[0], score: float64(0.0), id: cand.Id}
scores <- deviceInfo{endpoint: cand.Endpoint[0], score: float64(0.0), id: cand.Id}
return
}
log.Printf("[orchestrationapi] deviceScore")
log.Printf("candidate Id : %v", cand.Id)
log.Printf("candidate ExecType : %v", cand.ExecType)
log.Printf("candidate Endpoint : %v", cand.Endpoint[0])
log.Printf("candidate score : %v", score)
scores <- deviceScore{endpoint: cand.Endpoint[0], score: score, id: cand.Id, execType: cand.ExecType}
scores <- deviceInfo{endpoint: cand.Endpoint[0], score: score, id: cand.Id, execType: cand.ExecType}
}(candidate)
}

Expand All @@ -327,6 +345,90 @@ func (orcheEngine orcheImpl) gatherDevicesScore(candidates []dbhelper.ExecutionC
return
}

// gatherDevicesResource gathers resource values from edge devices
func (orcheEngine orcheImpl) gatherDevicesResource(candidates []dbhelper.ExecutionCandidate, selfSelection bool) (deviceResources []deviceInfo) {
count := len(candidates)
if !selfSelection {
count -= 1
}
resources := make(chan deviceInfo, count)

info, err := sysDBExecutor.Get(sysDB.ID)
if err != nil {
log.Println("[orchestrationapi] localhost devid gettering fail")
return
}

timeout := make(chan bool, 1)
go func() {
time.Sleep(3 * time.Second)
timeout <- true
}()

var wait sync.WaitGroup
wait.Add(1)
index := 0
go func() {
defer wait.Done()
for {
select {
case resource := <-resources:
deviceResources = append(deviceResources, resource)
if index++; count == index {
return
}
case <-timeout:
return
}
}
return
}()

localhosts, err := orcheEngine.networkhelper.GetIPs()
if err != nil {
log.Println("[orchestrationapi] localhost ip gettering fail. maybe skipped localhost")
}

for _, candidate := range candidates {
go func(cand dbhelper.ExecutionCandidate) {
var resource map[string]interface{}
var err error

if len(cand.Endpoint) == 0 {
log.Println("[orchestrationapi] cannot getting score, cause by ip list is empty")
resources <- deviceInfo{endpoint: "", resource: resource, id: cand.Id, execType: cand.ExecType}
return
}

if isLocalhost(cand.Endpoint, localhosts) {
if !selfSelection {
return
}
resource, err = orcheEngine.GetResource(info.Value)
} else {
resource, err = orcheEngine.clientAPI.DoGetResourceRemoteDevice(info.Value, cand.Endpoint[0])
}

if err != nil {
log.Println("[orchestrationapi] cannot getting msgs from :", cand.Endpoint[0], "cause by", err.Error())
resources <- deviceInfo{endpoint: cand.Endpoint[0], resource: resource, id: cand.Id, execType: cand.ExecType}
return
}
log.Printf("[orchestrationapi] deviceResource")
log.Printf("candidate Id : %v", cand.Id)
log.Printf("candidate ExecType : %v", cand.ExecType)
log.Printf("candidate Endpoint : %v", cand.Endpoint[0])
log.Printf("candidate resource : %v", resource)
resources <- deviceInfo{endpoint: cand.Endpoint[0], resource: resource, id: cand.Id, execType: cand.ExecType}
}(candidate)
}

wait.Wait()

return
}


func (orcheEngine orcheImpl) executeApp(endpoint, serviceName, requester string, args []string, notiChan chan string) {
ifArgs := make([]interface{}, len(args))
for i, v := range args {
Expand Down Expand Up @@ -363,7 +465,7 @@ func addServiceClient(clientID int, appName string) (client *orcheClient) {
return
}

func sortByScore(deviceScores []deviceScore) []deviceScore {
func sortByScore(deviceScores []deviceInfo) []deviceInfo {
sort.Slice(deviceScores, func(i, j int) bool {
return deviceScores[i].score > deviceScores[j].score
})
Expand Down
1 change: 1 addition & 0 deletions src/restinterface/client/client.go 100644 → 100755
Expand Up @@ -32,6 +32,7 @@ type Clienter interface {

// for scoringmgr
DoGetScoreRemoteDevice(devID string, endpoint string) (scoreValue float64, err error)
DoGetResourceRemoteDevice(devID string, endpoint string) (respMsg map[string]interface{}, err error)
}

// Setter interface
Expand Down
36 changes: 36 additions & 0 deletions src/restinterface/client/restclient/restclient.go 100644 → 100755
Expand Up @@ -152,6 +152,42 @@ func (c restClientImpl) DoGetScoreRemoteDevice(devID string, endpoint string) (s
return
}

// DoGetResourceRemoteDevice sends request to remote orchestration (APIV1ScoringmgrResourceGet) to get resource values
func (c restClientImpl) DoGetResourceRemoteDevice(devID string, endpoint string) (respMsg map[string]interface{}, err error) {
log.Printf("%s DoGetResourceRemoteDevice : endpoint[%v]", logPrefix, endpoint)
if c.IsSetKey == false {
return respMsg, errors.New(logPrefix + " does not set key")
}

restapi := "/api/v1/scoringmgr/resource"

targetURL := c.helper.MakeTargetURL(endpoint, c.internalPort, restapi)

info := make(map[string]interface{})
info["devID"] = devID
encryptBytes, err := c.Key.EncryptJSONToByte(info)
if err != nil {
return respMsg, errors.New(logPrefix + " can not encryption " + err.Error())
}

respBytes, code, err := c.helper.DoGetWithBody(targetURL, encryptBytes)
if err != nil || code != http.StatusOK {
return respMsg, errors.New(logPrefix + " get return error")
}

respMsg, err = c.Key.DecryptByteToJSON(respBytes)
if err != nil {
return respMsg, errors.New(logPrefix + " can not decryption " + err.Error())
}
log.Printf("%s respMsg From [%v] : %v", logPrefix, endpoint, respMsg)

if _, found := respMsg["error"]; found {
err = errors.New("failed")
}

return
}

func (c *restClientImpl) setHelper(helper resthelper.RestHelper) {
c.helper = helper
}
5 changes: 5 additions & 0 deletions src/restinterface/externalhandler/externalhandler.go 100644 → 100755
Expand Up @@ -210,6 +210,11 @@ func (h *Handler) APIV1RequestServicePost(w http.ResponseWriter, r *http.Request
for idy, cmd := range exeCmd {
serviceInfos.ServiceInfo[idx].ExeCmd[idy] = cmd.(string)
}

exeOption, ok := tmp["ExecOption"].(interface{})
if ok {
serviceInfos.ServiceInfo[idx].ExeOption = exeOption.(map[string]interface{})
}
}

resp = h.api.RequestService(serviceInfos)
Expand Down

0 comments on commit 22ce49b

Please sign in to comment.