Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
discoverd: initial working attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
progrium committed Oct 7, 2013
1 parent 3db4bd7 commit e5471cf
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 31 deletions.
68 changes: 49 additions & 19 deletions discover/backend_etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package discover

import (
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -9,10 +10,22 @@ import (
"github.com/coreos/go-etcd/etcd"
)

const (
KeyPrefix = "/discover"
)

type EtcdBackend struct {
Client *etcd.Client
}

func servicePath(name, addr string) string {
if addr != "" {
return fmt.Sprintf("%s/services/%s/%s", KeyPrefix, name, addr)
} else {
return fmt.Sprintf("%s/services/%s", KeyPrefix, name)
}
}

func (b *EtcdBackend) Subscribe(name string) (UpdateStream, error) {
stream := &etcdStream{ch: make(chan *ServiceUpdate), stop: make(chan bool)}
watch := b.getStateChanges(name, stream.stop)
Expand Down Expand Up @@ -46,50 +59,67 @@ func (s *etcdStream) Chan() chan *ServiceUpdate { return s.ch }
func (s *etcdStream) Close() { s.stopOnce.Do(func() { close(s.stop) }) }

func (b *EtcdBackend) responseToUpdate(resp *store.Response) *ServiceUpdate {
respList := strings.SplitN(resp.Key, "/", 4)
if len(respList) < 3 {
// expected key structure: /PREFIX/services/NAME/ADDR
splitKey := strings.SplitN(resp.Key, "/", 5)
if len(splitKey) < 5 {
return nil
}

serviceName := respList[2]
if ("SET" == resp.Action && resp.NewKey) || "GET" == resp.Action {
serviceName := splitKey[3]
serviceAddr := splitKey[4]
if "GET" == resp.Action || ("SET" == resp.Action && (resp.NewKey || resp.Value != resp.PrevValue)) {
// GET is because getCurrentState returns responses of Action GET.
// some SETs are heartbeats, so we ignore SETs where value didn't change.
var serviceAttrs map[string]string
err := json.Unmarshal([]byte(resp.Value), &serviceAttrs)
if err != nil {
return nil
}
return &ServiceUpdate{
Name: serviceName,
Addr: resp.Value,
Addr: serviceAddr,
Online: true,
Attrs: serviceAttrs,
}
} else if "DELETE" == resp.Action {
return &ServiceUpdate{
Name: serviceName,
Addr: resp.PrevValue,
Addr: serviceAddr,
}
} else {
return nil
}
}

func (b *EtcdBackend) getCurrentState(name string) ([]*store.Response, error) {
return b.Client.Get(fmt.Sprintf("/services/%s", name))
return b.Client.Get(servicePath(name, ""))
}

func (b *EtcdBackend) getStateChanges(name string, stop chan bool) chan *store.Response {
watch := make(chan *store.Response)
go b.Client.Watch(fmt.Sprintf("/services/%s", name), 0, watch, stop)
go b.Client.Watch(servicePath(name, ""), 0, watch, stop)
return watch
}

func (b *EtcdBackend) Register(name string, addr string, attrs map[string]string) error {
_, err := b.Client.Set(fmt.Sprintf("/services/%s/%s", name, addr), addr, HeartbeatIntervalSecs+MissedHearbeatTTL)
return err
func (b *EtcdBackend) Register(name, addr string, attrs map[string]string) error {
attrsJson, err1 := json.Marshal(attrs)
if err1 != nil {
return err1
}
_, err2 := b.Client.Set(servicePath(name, addr), string(attrsJson), HeartbeatIntervalSecs+MissedHearbeatTTL)
return err2
}

func (b *EtcdBackend) Unregister(name string, addr string) error {
_, err := b.Client.Delete(fmt.Sprintf("/services/%s/%s", name, addr))
return err
func (b *EtcdBackend) Heartbeat(name, addr string) error {
resp, err1 := b.Client.Get(servicePath(name, addr))
if err1 != nil {
return err1
}
// ignore test failure, it doesn't need a heartbeat if it was just set.
_, _, err2 := b.Client.TestAndSet(servicePath(name, addr), resp[0].Value, resp[0].Value, HeartbeatIntervalSecs+MissedHearbeatTTL)
return err2
}

func (b *EtcdBackend) Heartbeat(name string, addr string) error {
// Heartbeat currently just calls Register because eventually Register will also update attributes
// where Heartbeat will not
return b.Register(name, addr, map[string]string{})
func (b *EtcdBackend) Unregister(name, addr string) error {
_, err := b.Client.Delete(servicePath(name, addr))
return err
}
22 changes: 10 additions & 12 deletions discover/backend_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"github.com/coreos/go-etcd/etcd"
)

func touchService(client *etcd.Client, service string, addr string) {
client.Set(fmt.Sprintf("/services/%s/%s", service, addr), addr, 0)
}

func deleteService(client *etcd.Client, service string, addr string) {
client.Delete(fmt.Sprintf("/services/%s/%s", service, addr))
}

const (
NoAttrService = "null"
)

func TestEtcdBackend_RegisterAndUnregister(t *testing.T) {

// TODO Create server here itself and connect to it.
Expand All @@ -26,28 +26,26 @@ func TestEtcdBackend_RegisterAndUnregister(t *testing.T) {
serviceAddr := "127.0.0.1"

deleteService(client, serviceName, serviceAddr)
t.Log("Testing Register")
backend.Register(serviceName, serviceAddr, nil)

getUrl := "/services/" + serviceName + "/" + serviceAddr
results, err := client.Get(getUrl)
servicePath := KeyPrefix + "/services/" + serviceName + "/" + serviceAddr
results, err := client.Get(servicePath)
if err != nil {
t.Fatal(err)
}

// Adding the case where the result is checked.
if len(results) < 1 {
t.Fatal("Flynn Error: No Response From Server")
t.Fatal("Error: No Response From Server")
} else {
// Check if the files the returned values are the same.
if (results[0].Key != getUrl) || (results[0].Value != serviceAddr) {
if (results[0].Key != servicePath) || (results[0].Value != NoAttrService) {
t.Fatal("Returned value not equal to sent one")
}
}

t.Log("Testing Unregister of etcd backend")
backend.Unregister("test_register", "127.0.0.1")
_, err = client.Get("/services/test_register/127.0.0.1")
backend.Unregister(serviceName, serviceAddr)
_, err = client.Get(servicePath)
if err == nil {
t.Fatal("Value not deleted after unregister")
}
Expand Down

0 comments on commit e5471cf

Please sign in to comment.