Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use container ports in tasks SRV records #519

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion config.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
"srvRecordDefaultWeight": 1,
"IPSources": ["mesos", "host"],
"EnforceRFC952": false,
"EnumerationOn": true
"EnumerationOn": true,
"SRVPreferContainerPorts": false
}
6 changes: 5 additions & 1 deletion docs/docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ The configuration file should include the following fields:
"SOARetry": 600,
"SOAExpire": 86400,
"SOAMinttl": 60,
"IPSources": ["netinfo", "mesos", "host"]
"IPSources": ["netinfo", "mesos", "host"],
"SRVPreferContainerPorts": false
}
```

Expand Down Expand Up @@ -114,3 +115,6 @@ sorted by priority. If you use **Docker**, and enable the `netinfo` IPSource, it
- `mesos`: Mesos containerizer IP. **DEPRECATED**
- `docker`: Docker containerizer IP. **DEPRECATED**
- `netinfo`: Mesos 0.25 NetworkInfo.

`SRVPreferContainerPorts` is a boolean field that controls whether Mesos-DNS use the container ports from the containerinfos port mapping definitions. It return the container port only if a match is found, the host port otherwise. This behavioir requires the `netinfo` IPSource.

59 changes: 32 additions & 27 deletions records/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,38 +108,42 @@ type Config struct {
httpConfigMap httpcli.ConfigMap

MesosAuthentication httpcli.AuthMechanism

// Use container ports from portmapping definitions.
SRVPreferContainerPorts bool
}

// NewConfig return the default config of the resolver
func NewConfig() Config {
return Config{
ZkDetectionTimeout: 30,
RefreshSeconds: 60,
TTL: 60,
SRVRecordDefaultWeight: 1,
Domain: "mesos",
Port: 53,
Timeout: 5,
StateTimeoutSeconds: 300,
SOARname: "root.ns1.mesos",
SOAMname: "ns1.mesos",
SOARefresh: 60,
SOARetry: 600,
SOAExpire: 86400,
SOAMinttl: 60,
ZoneResolvers: map[string][]string{},
Resolvers: []string{"8.8.8.8"},
Listener: "0.0.0.0",
HTTPListener: "0.0.0.0",
HTTPPort: 8123,
DNSOn: true,
HTTPOn: true,
ExternalOn: true,
SetTruncateBit: true,
RecurseOn: true,
IPSources: []string{"netinfo", "mesos", "host"},
EnumerationOn: true,
MesosAuthentication: httpcli.AuthNone,
ZkDetectionTimeout: 30,
RefreshSeconds: 60,
TTL: 60,
SRVRecordDefaultWeight: 1,
Domain: "mesos",
Port: 53,
Timeout: 5,
StateTimeoutSeconds: 300,
SOARname: "root.ns1.mesos",
SOAMname: "ns1.mesos",
SOARefresh: 60,
SOARetry: 600,
SOAExpire: 86400,
SOAMinttl: 60,
ZoneResolvers: map[string][]string{},
Resolvers: []string{"8.8.8.8"},
Listener: "0.0.0.0",
HTTPListener: "0.0.0.0",
HTTPPort: 8123,
DNSOn: true,
HTTPOn: true,
ExternalOn: true,
SetTruncateBit: true,
RecurseOn: true,
IPSources: []string{"netinfo", "mesos", "host"},
EnumerationOn: true,
MesosAuthentication: httpcli.AuthNone,
SRVPreferContainerPorts: false,
}
}

Expand Down Expand Up @@ -309,6 +313,7 @@ func (c Config) log() {
"MesosAuthentication is set to none. This is probably not intentional")
}
}
logging.Verbose.Println(" - SRVPreferContainerPorts: ", c.SRVPreferContainerPorts)
}

func readCACertFile(caCertFile string) (caPool *x509.CertPool, err error) {
Expand Down
30 changes: 18 additions & 12 deletions records/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (rg *RecordGenerator) ParseState(c Config, masters ...string) error {
hostSpec = labels.RFC952
}

return rg.InsertState(sj, c.Domain, c.SOAMname, c.Listener, masters, c.IPSources, hostSpec)
return rg.InsertState(sj, c.Domain, c.SOAMname, c.Listener, masters, c.IPSources, c.SRVPreferContainerPorts, hostSpec)
}

// hashes a given name using a truncated sha1 hash
Expand All @@ -202,7 +202,7 @@ func hashString(s string) string {
}

// InsertState transforms a StateJSON into RecordGenerator RRs
func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener string, masters, ipSources []string, spec labels.Func) error {
func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener string, masters, ipSources []string, srvPreferContainerPorts bool, spec labels.Func) error {
rg.SlaveIPs = map[string][]string{}
rg.SRVs = rrs{}
rg.As = rrs{}
Expand All @@ -211,7 +211,7 @@ func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener stri
rg.slaveRecords(sj, domain, spec)
rg.listenerRecord(listener, ns)
rg.masterRecord(domain, masters, sj.Leader)
rg.taskRecords(sj, domain, spec, ipSources)
rg.taskRecords(sj, domain, spec, ipSources, srvPreferContainerPorts)

return nil
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (rg *RecordGenerator) listenerRecord(listener string, ns string) {
}
}

func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec labels.Func, ipSources []string) {
func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec labels.Func, ipSources []string, srvPreferContainerPorts bool) {
for _, f := range sj.Frameworks {
enumerableFramework := &EnumerableFramework{
Name: f.Name,
Expand All @@ -382,21 +382,22 @@ func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec label

// only do running and discoverable tasks
if ok && (task.State == "TASK_RUNNING") {
rg.taskRecord(task, f, domain, spec, ipSources, enumerableFramework)
rg.taskRecord(task, f, domain, spec, ipSources, srvPreferContainerPorts, enumerableFramework)
}
}
}
}

type context struct {
taskName string
taskID string
slaveID string
taskIPs []net.IP
slaveIPs []string
taskName string
taskID string
slaveID string
taskIPs []net.IP
slaveIPs []string
srvPreferContainerPorts bool
}

func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain string, spec labels.Func, ipSources []string, enumFW *EnumerableFramework) {
func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain string, spec labels.Func, ipSources []string, srvPreferContainerPorts bool, enumFW *EnumerableFramework) {

newTask := &EnumerableTask{ID: task.ID, Name: task.Name}

Expand All @@ -409,6 +410,7 @@ func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain
slaveIDTail(task.SlaveID),
task.IPs(ipSources...),
task.SlaveIPs,
srvPreferContainerPorts,
}

// use DiscoveryInfo name if defined instead of task name
Expand Down Expand Up @@ -488,7 +490,11 @@ func (rg *RecordGenerator) taskContextRecord(ctx context, task state.Task, f sta
}

for _, port := range task.DiscoveryInfo.Ports.DiscoveryPorts {
target := canonical + tail + ":" + strconv.Itoa(port.Number)
p := port.Number
if ctx.srvPreferContainerPorts {
p = state.MapPort(task, p)
}
target := canonical + tail + ":" + strconv.Itoa(p)
recordName(withProtocol(port.Protocol, fname, spec,
withNamedPort(port.Name, spec, asSRV(target))))
}
Expand Down
18 changes: 10 additions & 8 deletions records/generator_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
taskCount = 1000
)
type params struct {
task state.Task
f state.Framework
domain string
spec labels.Func
ipSources []string
enumFW EnumerableFramework
rg RecordGenerator
task state.Task
f state.Framework
domain string
spec labels.Func
ipSources []string
enumFW EnumerableFramework
rg RecordGenerator
srvPreferContainerPorts bool
}
var (
initialState = params{
Expand All @@ -67,6 +68,7 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
As: rrs{},
SRVs: rrs{},
},
srvPreferContainerPorts: false,
}
slaves = make([]string, clusterSize)
tasks = make([]string, taskCount)
Expand All @@ -87,6 +89,6 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
tt.task.Name = tasks[ti]
tt.task.SlaveIPs = []string{slaves[si]}
tt.task.SlaveID = "ID-" + slaves[si]
tt.rg.taskRecord(tt.task, tt.f, tt.domain, tt.spec, tt.ipSources, &tt.enumFW)
tt.rg.taskRecord(tt.task, tt.f, tt.domain, tt.spec, tt.ipSources, tt.srvPreferContainerPorts, &tt.enumFW)
}
}
20 changes: 13 additions & 7 deletions records/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func expectRecords(rg *RecordGenerator, expect []expectedRR) (eA, eAAAA, eSRV rr
return
}

func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) RecordGenerator {
func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string, srvPreferContainerPorts bool) RecordGenerator {
var sj state.State

b, err := ioutil.ReadFile("../factories/fake.json")
Expand All @@ -240,7 +240,7 @@ func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) Rec
masters := []string{"144.76.157.37:5050"}

var rg RecordGenerator
if err := rg.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", masters, ipSources, spec); err != nil {
if err := rg.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", masters, ipSources, srvPreferContainerPorts, spec); err != nil {
t.Fatal(err)
}

Expand All @@ -249,11 +249,12 @@ func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) Rec

// ensure we are parsing what we think we are
func TestInsertState(t *testing.T) {
rg := testRecordGenerator(t, labels.RFC952, []string{"netinfo", "docker", "mesos", "host"})
rgDocker := testRecordGenerator(t, labels.RFC952, []string{"docker", "host"})
rgMesos := testRecordGenerator(t, labels.RFC952, []string{"mesos", "host"})
rgSlave := testRecordGenerator(t, labels.RFC952, []string{"host"})
rgNetinfo := testRecordGenerator(t, labels.RFC952, []string{"netinfo"})
rg := testRecordGenerator(t, labels.RFC952, []string{"netinfo", "docker", "mesos", "host"}, false)
rgDocker := testRecordGenerator(t, labels.RFC952, []string{"docker", "host"}, false)
rgMesos := testRecordGenerator(t, labels.RFC952, []string{"mesos", "host"}, false)
rgSlave := testRecordGenerator(t, labels.RFC952, []string{"host"}, false)
rgNetinfo := testRecordGenerator(t, labels.RFC952, []string{"netinfo"}, false)
rgContainerPorts := testRecordGenerator(t, labels.RFC952, []string{"netinfo"}, true)

for i, tt := range []struct {
rrs rrs
Expand Down Expand Up @@ -333,6 +334,11 @@ func TestInsertState(t *testing.T) {
{rgNetinfo.AAAAs, "toy-store.ipv6-framework.mesos.", []string{"fd01:b::1:8000:2"}},
{rgNetinfo.AAAAs, "toy-store.ipv6-framework.slave.mesos.", []string{"2001:db8::1"}},

{rgContainerPorts.As, "toy-store.ipv6-framework.mesos.", []string{"12.0.1.2"}},

{rgContainerPorts.AAAAs, "toy-store.ipv6-framework.mesos.", []string{"fd01:b::1:8000:2"}},
{rgContainerPorts.AAAAs, "toy-store.ipv6-framework.slave.mesos.", []string{"2001:db8::1"}},

{rgDocker.As, "liquor-store.marathon.mesos.", []string{"10.3.0.1", "10.3.0.2"}},
{rgDocker.As, "liquor-store.marathon.slave.mesos.", []string{"1.2.3.11", "1.2.3.12"}},
{rgDocker.As, "nginx.marathon.mesos.", []string{"1.2.3.11"}},
Expand Down
2 changes: 1 addition & 1 deletion records/state/client/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func LoadMasterStateFailover(initialMasterIP string, stateLoader func(ip string)
}
return sj, nil
}
err = errors.New("fetched state does not contain leader information")
err = errors.New("Fetched state does not contain leader information")
return sj, err
}

Expand Down
24 changes: 23 additions & 1 deletion records/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type ContainerStatus struct {
// NetworkInfo holds the network configuration for a single interface
// as defined in the /state Mesos HTTP endpoint.
type NetworkInfo struct {
IPAddresses []IPAddress `json:"ip_addresses,omitempty"`
IPAddresses []IPAddress `json:"ip_addresses,omitempty"`
PortMappings []PortMapping `json:"port_mappings,omitempty"`
// back-compat with 0.25 IPAddress format
IPAddress string `json:"ip_address,omitempty"`
}
Expand All @@ -82,6 +83,13 @@ type IPAddress struct {
IPAddress string `json:"ip_address,omitempty"`
}

// PortMapping holds a port for a task defined in the /state Mesos HTTP endpoint.
type PortMapping struct {
Protocol string `json:"protocol,omitempty"`
HostPort int `json:"host_port"`
ContainerPort int `json:"container_port"`
}

// Task holds a task as defined in the /state Mesos HTTP endpoint.
type Task struct {
FrameworkID string `json:"framework_id"`
Expand Down Expand Up @@ -200,6 +208,20 @@ func statusIPs(st []Status, src func(*Status) []string) []string {
return nil
}

// MapPort returns the mapped port if available, or the host port
// listening on.
func MapPort(t Task, hostport int) int {
ni := t.Statuses[0].ContainerStatus.NetworkInfos
for n := range ni {
for m := range ni[n].PortMappings {
if ni[n].PortMappings[m].HostPort == hostport && ni[n].PortMappings[m].ContainerPort > 0 {
return ni[n].PortMappings[m].ContainerPort
vixns marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return hostport
}

// labels returns all given Status.[]Labels' values whose keys are equal
// to the given key
func labels(key string) func(*Status) []string {
Expand Down
2 changes: 1 addition & 1 deletion resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func fakeDNS() (*Resolver, error) {
}

spec := labels.RFC952
err = res.rs.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", res.config.Masters, res.config.IPSources, spec)
err = res.rs.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", res.config.Masters, res.config.IPSources, res.config.SRVPreferContainerPorts, spec)
if err != nil {
return nil, err
}
Expand Down