Skip to content

Commit

Permalink
ambient: fix flake on waypoint (#50213)
Browse files Browse the repository at this point in the history
* ambient: fix flake on waypoint

Example
https://prow.istio.io/view/gs/istio-prow/logs/unit-tests_istio_postsubmit/1775278590686924800

2 issues:
* Race on Dump()
* Failure in the test

Failure comes from not waiting for the waypoint to be applied yet

* Also fix syncing
  • Loading branch information
howardjohn committed Apr 3, 2024
1 parent 0b3e009 commit a40a23d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Index interface {
ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo
Waypoint(network, address string) []netip.Addr
SyncAll()
HasSynced() bool
model.AmbientIndexes
}

Expand Down Expand Up @@ -468,6 +469,13 @@ func (a *index) SyncAll() {
a.networkUpdateTrigger.TriggerRecomputation()
}

func (a *index) HasSynced() bool {
return a.services.Synced().HasSynced() &&
a.workloads.Synced().HasSynced() &&
a.waypoints.Synced().HasSynced() &&
a.authorizationPolicies.Synced().HasSynced()
}

type LookupNetwork func(endpointIP string, labels labels.Instance) network.ID

func PushXds[T any](xds model.XDSUpdater, f func(T) model.ConfigKey) func(events []krt.Event[T]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,8 @@ func TestWorkloadsForWaypointOrder(t *testing.T) {
assert.Equal(t, wl, expected)
}
s.addWaypoint(t, "10.0.0.1", "waypoint", "", true)

// Wait until waypoint is available
assert.EventuallyEqual(t, func() int { return len(s.waypoints.List("")) }, 1)
// expected order is pod3, pod1, pod2, which is the order of creation
s.addPods(t,
"127.0.0.3",
Expand Down
3 changes: 3 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,9 @@ func (c *Controller) HasSynced() bool {
}

func (c *Controller) informersSynced() bool {
if c.ambientIndex != nil && !c.ambientIndex.HasSynced() {
return false
}
return c.namespaces.HasSynced() &&
c.services.HasSynced() &&
c.endpoints.slices.HasSynced() &&
Expand Down
2 changes: 2 additions & 0 deletions pkg/kube/krt/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (h *manyCollection[I, O]) Synced() Syncer {

// nolint: unused // (not true, its to implement an interface)
func (h *manyCollection[I, O]) dump() {
h.recomputeMu.Lock()
defer h.recomputeMu.Unlock()
h.mu.Lock()
defer h.mu.Unlock()
h.log.Errorf(">>> BEGIN DUMP")
Expand Down
27 changes: 27 additions & 0 deletions pkg/kube/krt/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "istio.io/istio/pkg/kube"

type Syncer interface {
WaitUntilSynced(stop <-chan struct{}) bool
HasSynced() bool
}

var (
Expand All @@ -34,6 +35,15 @@ func (c channelSyncer) WaitUntilSynced(stop <-chan struct{}) bool {
return waitForCacheSync(c.name, stop, c.synced)
}

func (c channelSyncer) HasSynced() bool {
select {
case <-c.synced:
return true
default:
return false
}
}

type pollSyncer struct {
name string
f func() bool
Expand All @@ -43,12 +53,20 @@ func (c pollSyncer) WaitUntilSynced(stop <-chan struct{}) bool {
return kube.WaitForCacheSync(c.name, stop, c.f)
}

func (c pollSyncer) HasSynced() bool {
return c.f()
}

type alwaysSynced struct{}

func (c alwaysSynced) WaitUntilSynced(stop <-chan struct{}) bool {
return true
}

func (c alwaysSynced) HasSynced() bool {
return true
}

type multiSyncer struct {
syncers []Syncer
}
Expand All @@ -61,3 +79,12 @@ func (c multiSyncer) WaitUntilSynced(stop <-chan struct{}) bool {
}
return true
}

func (c multiSyncer) HasSynced() bool {
for _, s := range c.syncers {
if !s.HasSynced() {
return false
}
}
return true
}

0 comments on commit a40a23d

Please sign in to comment.