Skip to content

Commit

Permalink
revert: back to existing version of go-control-plane
Browse files Browse the repository at this point in the history
Signed-off-by: Lance Austin <laustin@datawire.io>
  • Loading branch information
Lance Austin authored and LanceEa committed Aug 28, 2023
1 parent ad2a346 commit 64bae85
Show file tree
Hide file tree
Showing 29 changed files with 264 additions and 743 deletions.
2 changes: 1 addition & 1 deletion _cxx/envoy.mk
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ RSYNC_EXTRAS ?=
# which commits are ancestors, I added `make guess-envoy-go-control-plane-commit` to do that in an
# automated way! Still look at the commit yourself to make sure it seems sane; blindly trusting
# machines is bad, mmkay?
ENVOY_GO_CONTROL_PLANE_COMMIT = b501c94cb61e3235b9156629377fba229d9571d8
ENVOY_GO_CONTROL_PLANE_COMMIT = 7f2a3030ef40e773a8413fa0f2f03dfe26226593

# Set ENVOY_DOCKER_REPO to the list of mirrors that we should
# sanity-check that things get pushed to.
Expand Down
4 changes: 1 addition & 3 deletions pkg/envoy-control-plane/OWNERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,5 @@ right place.
* Yangmin Zhu ([yangminzhu](https://github.com/yangminzhu)) (ymzhu@google.com)
* Snow Pettersen ([snowp](https://github.com/snowp)) (snowp@lyft.com)
* Alec Holmes ([alecholmez](https://github.com/alecholmez)) (alec.holmes@greymatter.io)
* James Peach ([jpeach](https://github.com/jpeach)) (jpeach@apache.org)
* James Peach ([jpeach](https://github.com/jpeach]) (jpeach@apache.org)
* Sunjay Bhatia ([sunjayBhatia](https://github.com/sunjayBhatia))(sunjayb@vmware.com)
* Valerian Roche ([valerian-roche](https://github.com/valerian-roche))(valerian.roche@datadoghq.com)
* Ryan Northey ([phlax](https://github.com/phlax)) (ryan@synca.io)
9 changes: 2 additions & 7 deletions pkg/envoy-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ This repository contains a Go-based implementation of an API server that
implements the discovery service APIs defined in
[data-plane-api](https://github.com/envoyproxy/data-plane-api).

## Proto files

The Go proto files are synced from the upstream Envoy repository (https://github.com/envoyproxy/envoy) on every upstream commit.

Synchronization is triggered using the `envoy-sync.yaml` workflow.

## Scope

Expand Down Expand Up @@ -66,8 +61,8 @@ The Envoy xDS APIs follow a well defined [versioning scheme](https://www.envoypr

### Deprecated

`V2` control-plane code has been removed and will no longer be supported. For previous conversations on support for various xDS versions, see here:
- [here](https://docs.google.com/document/d/1ZkHpz6DwEUmAlG0kb2Mgu4iaeQC2Bbb0egMbECoNNKY/edit?ts=5e602993#heading=h.15nsmgmjaaml)
`V2` control-plane code has been removed and will no longer be supported. For previous conversations on support for various xDS versions, see here:
- [here](https://docs.google.com/document/d/1ZkHpz6DwEUmAlG0kb2Mgu4iaeQC2Bbb0egMbECoNNKY/edit?ts=5e602993#heading=h.15nsmgmjaaml)
- [here](https://envoyproxy.slack.com/archives/C7LDJTM6Z/p1582925082005300)

*Note*: It is recommended to use a previous SHA if there is still a need for `V2`.
Expand Down
6 changes: 1 addition & 5 deletions pkg/envoy-control-plane/cache/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ func (e SkipFetchError) Error() string {
// ResponseType enumeration of supported response types
type ResponseType int

// NOTE: The order of this enum MATTERS!
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#aggregated-discovery-service
// ADS expects things to be returned in a specific order.
// See the following issue for details: https://github.com/envoyproxy/go-control-plane/issues/526
const (
Endpoint ResponseType = iota
Cluster
ScopedRoute
Route
ScopedRoute
VirtualHost
Listener
Secret
Expand Down
13 changes: 3 additions & 10 deletions pkg/envoy-control-plane/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest
// ConfigWatcher implementation must be thread-safe.
type ConfigWatcher interface {
// CreateWatch returns a new open watch from a non-empty request.
// This is the entrypoint to propagate configuration changes the
// provided Response channel. State from the gRPC server is utilized
// to make sure consuming cache implementations can see what the server has sent to clients.
//
// An individual consumer normally issues a single open watch by each type URL.
//
// The provided channel produces requested resources as responses, once they are available.
Expand All @@ -57,9 +53,6 @@ type ConfigWatcher interface {
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
// provided DeltaResponse channel. State from the gRPC server is utilized
// to make sure consuming cache implementations can see what the server has sent to clients.
//
// The provided channel produces requested resources as responses, or spontaneous updates in accordance
// with the incremental xDS specification.
Expand Down Expand Up @@ -318,12 +311,12 @@ func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (ty
}

if !r.Heartbeat {
rsrc, err := anypb.New(resource.Resource)
any, err := anypb.New(resource.Resource)
if err != nil {
return nil, "", err
}
rsrc.TypeUrl = r.Request.TypeUrl
wrappedResource.Resource = rsrc
any.TypeUrl = r.Request.TypeUrl
wrappedResource.Resource = any
}

return wrappedResource, deltaResourceTypeURL, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/envoy-control-plane/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return resources
}

func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() {
func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
if request.TypeUrl != cache.typeURL {
value <- nil
return nil
Expand Down Expand Up @@ -450,7 +450,7 @@ func (cache *LinearCache) nextDeltaWatchID() int64 {
return atomic.AddInt64(&cache.deltaWatchCount, 1)
}

func (cache *LinearCache) Fetch(context.Context, *Request) (Response, error) {
func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
return nil, errors.New("not implemented")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/envoy-control-plane/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.Stream
return cache.CreateDeltaWatch(request, state, value)
}

func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) {
func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) {
return nil, errors.New("not implemented")
}
24 changes: 0 additions & 24 deletions pkg/envoy-control-plane/cache/v3/order.go

This file was deleted.

72 changes: 0 additions & 72 deletions pkg/envoy-control-plane/cache/v3/order_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/envoy-control-plane/cache/v3/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Resources struct {

// IndexResourcesByName creates a map from the resource name to the resource.
func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.ResourceWithTTL {
indexed := make(map[string]types.ResourceWithTTL, len(items))
indexed := make(map[string]types.ResourceWithTTL)
for _, item := range items {
indexed[GetResourceName(item.Resource)] = item
}
Expand All @@ -22,7 +22,7 @@ func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.Resour

// IndexRawResourcesByName creates a map from the resource name to the resource.
func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource {
indexed := make(map[string]types.Resource, len(items))
indexed := make(map[string]types.Resource)
for _, item := range items {
indexed[GetResourceName(item)] = item
}
Expand All @@ -31,7 +31,7 @@ func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource {

// NewResources creates a new resource group.
func NewResources(version string, items []types.Resource) Resources {
itemsWithTTL := make([]types.ResourceWithTTL, 0, len(items))
itemsWithTTL := []types.ResourceWithTTL{}
for _, item := range items {
itemsWithTTL = append(itemsWithTTL, types.ResourceWithTTL{Resource: item})
}
Expand Down
46 changes: 11 additions & 35 deletions pkg/envoy-control-plane/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
}
}

// SetSnapshotCache updates a snapshot for a node.
// SetSnapshotCacheContext updates a snapshot for a node.
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
Expand All @@ -232,41 +232,20 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
for id, watch := range info.watches {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)

resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}

// discard the watch
delete(info.watches, id)
}
return nil
}

// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
}

// We only calculate version hashes when using delta. We don't
Expand All @@ -279,8 +258,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}
}

// this won't run if there are no delta watches
// to process.
// process our delta watches
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
ctx,
Expand All @@ -303,7 +281,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
return nil
}

// GetSnapshot gets the snapshot for a node, and returns an error if not found.
// GetSnapshots gets the snapshot for a node, and returns an error if not found.
func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
Expand Down Expand Up @@ -343,8 +321,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
return nil
}

// CreateWatch returns a watch for an xDS request. A nil function may be
// returned if an error occurs.
// CreateWatch returns a watch for an xDS request.
func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
nodeID := cache.hash.ID(request.Node)

Expand All @@ -363,6 +340,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
info.mu.Unlock()

var version string

snapshot, exists := cache.snapshots[nodeID]
if exists {
version = snapshot.GetVersion(request.TypeUrl)
Expand All @@ -387,9 +365,8 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
request.ResourceNames, nodeID, err)
return nil
}
return func() {}
return nil
}
}
}
Expand All @@ -410,10 +387,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
request.ResourceNames, nodeID, err)
return nil
}

return func() {}
return nil
}

func (cache *snapshotCache) nextWatchID() int64 {
Expand Down Expand Up @@ -441,7 +417,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value
// if they do not, then the watch is never responded, and it is expected that envoy makes another request
if len(request.ResourceNames) != 0 && cache.ads {
if err := superset(nameSet(request.ResourceNames), resources); err != nil {
cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.TypeUrl, request.ResourceNames, err)
cache.log.Warnf("ADS mode: not responding to request: %v", err)
return nil
}
}
Expand Down

0 comments on commit 64bae85

Please sign in to comment.