Skip to content

Commit

Permalink
Merge pull request #2047 from max-melentyev/instance-api
Browse files Browse the repository at this point in the history
fix(ec2): Call describe-instance API concurrently
  • Loading branch information
MisterMX committed May 2, 2024
2 parents a49ef55 + 6482b89 commit bddb183
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 52 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -42,6 +42,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
go.uber.org/zap v1.26.0
golang.org/x/sync v0.7.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/api v0.28.3
k8s.io/apiextensions-apiserver v0.28.3
Expand Down Expand Up @@ -125,7 +126,6 @@ require (
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
131 changes: 80 additions & 51 deletions pkg/controller/ec2/instance/controller.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -129,61 +130,14 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E
}, nil
}

response, err := e.client.DescribeInstances(ctx,
&awsec2.DescribeInstancesInput{
InstanceIds: []string{meta.GetExternalName(cr)},
})

// deleted instances that have not yet been cleaned up from the cluster return a
// 200 OK with a nil response.Reservations slice
if err == nil && len(response.Reservations) == 0 {
return managed.ExternalObservation{}, nil
}

if err != nil {
return managed.ExternalObservation{},
errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, err), errDescribe)
}

// in a successful response, there should be one and only one object
if len(response.Reservations[0].Instances) != 1 {
return managed.ExternalObservation{}, errors.New(errMultipleItems)
instancePtr, o, err := e.describeInstance(ctx, meta.GetExternalName(cr))
if err != nil || instancePtr == nil {
return managed.ExternalObservation{}, err
}

observed := response.Reservations[0].Instances[0]
observed := *instancePtr

// update the CRD spec for any new values from provider
current := cr.Spec.ForProvider.DeepCopy()

o := awsec2.DescribeInstanceAttributeOutput{}

for _, input := range []types.InstanceAttributeName{
types.InstanceAttributeNameDisableApiTermination,
types.InstanceAttributeNameInstanceInitiatedShutdownBehavior,
types.InstanceAttributeNameUserData,
} {
r, err := e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{
InstanceId: aws.String(meta.GetExternalName(cr)),
Attribute: input,
})

if err != nil {
return managed.ExternalObservation{}, errorutils.Wrap(err, errDescribe)
}

if r.DisableApiTermination != nil {
o.DisableApiTermination = r.DisableApiTermination
}

if r.InstanceInitiatedShutdownBehavior != nil {
o.InstanceInitiatedShutdownBehavior = r.InstanceInitiatedShutdownBehavior
}

if r.UserData != nil {
o.UserData = r.UserData
}
}

ec2.LateInitializeInstance(&cr.Spec.ForProvider, &observed, &o)

if !cmp.Equal(current, &cr.Spec.ForProvider) {
Expand Down Expand Up @@ -221,6 +175,81 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E
}, nil
}

func (e *external) describeInstance(ctx context.Context, instanceId string) (
*types.Instance,
awsec2.DescribeInstanceAttributeOutput,
error,
) {
eg := errgroup.Group{}

var describeOutput *awsec2.DescribeInstancesOutput
var describeError error
eg.Go(func() error {
describeOutput, describeError = e.client.DescribeInstances(ctx, &awsec2.DescribeInstancesInput{
InstanceIds: []string{instanceId},
})
return nil
})

attrs := awsec2.DescribeInstanceAttributeOutput{}
descAttr := func(attr types.InstanceAttributeName) (*awsec2.DescribeInstanceAttributeOutput, error) {
return e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{
InstanceId: &instanceId,
Attribute: attr,
})
}

eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameDisableApiTermination); err != nil {
return errorutils.Wrap(err, "fetching DisableApiTermination")
} else {
attrs.DisableApiTermination = res.DisableApiTermination
return nil
}
})

eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior); err != nil {
return errorutils.Wrap(err, "fetching InstanceInitiatedShutdownBehavior")
} else {
attrs.InstanceInitiatedShutdownBehavior = res.InstanceInitiatedShutdownBehavior
return nil
}
})

eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameUserData); err != nil {
return errorutils.Wrap(err, "fetching UserData")
} else {
attrs.UserData = res.UserData
return nil
}
})

attrsErr := eg.Wait()

if describeError != nil {
return nil, attrs,
errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, describeError), errDescribe)
}

// deleted instances that have not yet been cleaned up from the cluster return a
// 200 OK with a nil response.Reservations slice
if len(describeOutput.Reservations) == 0 {
return nil, attrs, nil
}

// in a successful response, there should be one and only one object
if len(describeOutput.Reservations[0].Instances) != 1 {
return nil, attrs, errors.New(errMultipleItems)
}

if attrsErr != nil {
return nil, attrs, errorutils.Wrap(attrsErr, errDescribe)
}
return &describeOutput.Reservations[0].Instances[0], attrs, nil
}

func (e *external) Create(ctx context.Context, mgd resource.Managed) (managed.ExternalCreation, error) {
cr, ok := mgd.(*svcapitypes.Instance)
if !ok {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/ec2/instance/controller_test.go
Expand Up @@ -166,6 +166,9 @@ func TestObserve(t *testing.T) {
}},
}, nil
},
MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) {
return &awsec2.DescribeInstanceAttributeOutput{}, nil
},
},
cr: instance(withSpec(manualv1alpha1.InstanceParameters{
InstanceType: string(types.InstanceTypeM1Small),
Expand All @@ -187,6 +190,9 @@ func TestObserve(t *testing.T) {
MockDescribeInstances: func(ctx context.Context, input *awsec2.DescribeInstancesInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstancesOutput, error) {
return &awsec2.DescribeInstancesOutput{}, errBoom
},
MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) {
return &awsec2.DescribeInstanceAttributeOutput{}, nil
},
},
cr: instance(withSpec(manualv1alpha1.InstanceParameters{
InstanceType: string(types.InstanceTypeM1Small),
Expand Down

0 comments on commit bddb183

Please sign in to comment.