Skip to content

Commit

Permalink
loadbalancer/experimental: Add experimental API for load-balancing
Browse files Browse the repository at this point in the history
Add the experimental Services API for managing load-balancing frontends
and backends. This is added as a new experimental package to avoid
confusing it with the production implementation.

When the hidden "--enable-new-services" flag is set the reconciler is started to
reconcile Table[Frontend] and Table[Backend] with the ServiceManager.

There is no code yet to insert into the tables, so this does nothing
by itself yet.

The tables can be inspected with "cilium-dbg statedb dump".

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed May 8, 2024
1 parent d5f7202 commit ea09ba4
Show file tree
Hide file tree
Showing 9 changed files with 1,422 additions and 0 deletions.
4 changes: 4 additions & 0 deletions daemon/cmd/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
k8sSynced "github.com/cilium/cilium/pkg/k8s/synced"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/l2announcer"
loadbalancer_experimental "github.com/cilium/cilium/pkg/loadbalancer/experimental"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/metricsmap"
"github.com/cilium/cilium/pkg/metrics"
Expand Down Expand Up @@ -160,6 +161,9 @@ var (
// daemonCell wraps the legacy daemon initialization and provides Promise[*Daemon].
daemonCell,

// Experimental control-plane for configuring service load-balancing.
loadbalancer_experimental.Cell,

// Service is a datapath service handler. Its main responsibility is to reflect
// service-related changes into BPF maps used by datapath BPF programs.
service.Cell,
Expand Down
222 changes: 222 additions & 0 deletions pkg/loadbalancer/experimental/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package experimental_test

import (
"encoding/binary"
"fmt"
"testing"

"github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/loadbalancer/experimental"
"github.com/cilium/cilium/pkg/source"
)

func BenchmarkInsertService(b *testing.B) {
p := servicesFixture(b)

b.ResetTimer()

numObjects := 1000

// Add 'numObjects' existing objects to the table.
wtxn := p.Services.WriteTxn()
for i := 0; i < numObjects; i++ {
name := loadbalancer.ServiceName{Namespace: "test-existing", Name: fmt.Sprintf("svc-%d", i)}
var addr1 [4]byte
binary.BigEndian.PutUint32(addr1[:], 0x02000000+uint32(i))
addrCluster, _ := types.AddrClusterFromIP(addr1[:])
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster, 12345, loadbalancer.ScopeExternal),
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)
}
wtxn.Commit()

// Benchmark the speed at which a new service is upserted. 'numObjects' are inserted in one
// WriteTxn to amortize the cost of WriteTxn&Commit.
for n := 0; n < b.N; n++ {
wtxn := p.Services.WriteTxn()
for i := 0; i < numObjects; i++ {
name := loadbalancer.ServiceName{Namespace: "test-new", Name: fmt.Sprintf("svc-%d", i)}
var addr1 [4]byte
binary.BigEndian.PutUint32(addr1[:], 0x01000000+uint32(i))
addrCluster, _ := types.AddrClusterFromIP(addr1[:])
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster, 12345, loadbalancer.ScopeExternal),
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)
}
wtxn.Abort()
}

b.StopTimer()
b.ReportMetric(float64(b.N*numObjects)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkInsertBackend(b *testing.B) {
p := servicesFixture(b)

b.ResetTimer()

addrCluster1 := types.MustParseAddrCluster("1.0.0.1")
addrCluster2 := types.MustParseAddrCluster("2.0.0.2")

name := loadbalancer.ServiceName{Namespace: "test", Name: "svc"}
wtxn := p.Services.WriteTxn()
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster1, 12345, loadbalancer.ScopeExternal),
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)
wtxn.Commit()

numObjects := 1000

// Add 'numObjects' existing objects to the table.
wtxn = p.Services.WriteTxn()
for i := 0; i < numObjects; i++ {
beAddr := *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster1, uint16(i), loadbalancer.ScopeExternal)
p.Services.UpsertBackends(
wtxn,
name,
source.Kubernetes,
&loadbalancer.Backend{
L3n4Addr: beAddr,
State: loadbalancer.BackendStateActive,
},
)
}
wtxn.Abort()

// Benchmark the speed at which a new backend is upserted. 'numObjects' are inserted in one
// WriteTxn to amortize the cost of WriteTxn&Commit.
for n := 0; n < b.N; n++ {
wtxn = p.Services.WriteTxn()
for i := 0; i < numObjects; i++ {
beAddr := *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster2, uint16(i), loadbalancer.ScopeExternal)
p.Services.UpsertBackends(
wtxn,
name,
source.Kubernetes,
&loadbalancer.Backend{
L3n4Addr: beAddr,
State: loadbalancer.BackendStateActive,
},
)
}
// Don't commit the changes so we actually test the cost of Insert() of new object.
wtxn.Abort()
}

b.StopTimer()
b.ReportMetric(float64(b.N*numObjects)/b.Elapsed().Seconds(), "objects/sec")
}
func BenchmarkReplaceBackend(b *testing.B) {
p := servicesFixture(b)

b.ResetTimer()

addrCluster1 := types.MustParseAddrCluster("1.0.0.1")
addrCluster2 := types.MustParseAddrCluster("2.0.0.2")

name := loadbalancer.ServiceName{Namespace: "test", Name: "svc"}
wtxn := p.Services.WriteTxn()
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster1, 12345, loadbalancer.ScopeExternal),
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)

beAddr := *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster2, uint16(1234), loadbalancer.ScopeExternal)
p.Services.UpsertBackends(
wtxn,
name,
source.Kubernetes,
&loadbalancer.Backend{
L3n4Addr: beAddr,
State: loadbalancer.BackendStateActive,
},
)
wtxn.Commit()

wtxn = p.Services.WriteTxn()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Services.UpsertBackends(
wtxn,
name,
source.Kubernetes,
&loadbalancer.Backend{
L3n4Addr: beAddr,
State: loadbalancer.BackendStateActive,
},
)
}
wtxn.Abort()

b.StopTimer()
b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkReplaceService(b *testing.B) {
p := servicesFixture(b)

b.ResetTimer()

addrCluster := types.MustParseAddrCluster("1.0.0.1")
l3n4Addr := *loadbalancer.NewL3n4Addr(loadbalancer.TCP, addrCluster, 12345, loadbalancer.ScopeExternal)

name := loadbalancer.ServiceName{Namespace: "test", Name: "svc"}
wtxn := p.Services.WriteTxn()
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: l3n4Addr,
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)
wtxn.Commit()

b.ResetTimer()

// Replace the service b.N times
wtxn = p.Services.WriteTxn()
for i := 0; i < b.N; i++ {
p.Services.UpsertFrontend(
wtxn,
&experimental.FrontendParams{
Name: name,
Address: l3n4Addr,
Type: loadbalancer.SVCTypeClusterIP,
Source: source.Kubernetes,
},
)
}
wtxn.Abort()

b.StopTimer()
b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec")
}
35 changes: 35 additions & 0 deletions pkg/loadbalancer/experimental/cell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package experimental

import (
"github.com/cilium/cilium/pkg/statedb"
"github.com/cilium/hive/cell"
)

// Cell provides the [Services] API for configuring load-balancing and the
// Table[Frontend] and Table[Backend] for read-only access to service frontends
// and backends.
var Cell = cell.Module(
"loadbalancer",
"Service load-balancing structures and tables",

cell.Config(DefaultConfig),

// Provide the RWTable[Frontend] and RWTable[Backend] privately to this
// module so that the tables are only modified via the Services API.
cell.ProvidePrivate(
NewFrontendsTable,
NewBackendsTable,
),

cell.Provide(
NewServices,

// Provide Table[Frontend] and Table[Backend] to the outside for
// read access.
statedb.RWTable[*Frontend].ToTable,
statedb.RWTable[*Backend].ToTable,
),
)
19 changes: 19 additions & 0 deletions pkg/loadbalancer/experimental/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package experimental

import "github.com/spf13/pflag"

type Config struct {
EnableNewServices bool
}

func (def Config) Flags(flags *pflag.FlagSet) {
flags.Bool("enable-new-services", def.EnableNewServices, "Enable use of the new service load-balancing API")
flags.MarkHidden("enable-new-services")
}

var DefaultConfig = Config{
EnableNewServices: false,
}

0 comments on commit ea09ba4

Please sign in to comment.