Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Make work without 2nd token cache but instead with 2nd credentials fu…
Browse files Browse the repository at this point in the history
…ture
  • Loading branch information
Fabio Grätz committed Sep 19, 2023
1 parent ead52f3 commit b07d100
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 156 deletions.
67 changes: 34 additions & 33 deletions clients/go/admin/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package admin

import (
"context"
"errors"
"fmt"
"net/http"

Expand All @@ -11,7 +12,6 @@ import (
"golang.org/x/oauth2"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"google.golang.org/grpc"
Expand All @@ -21,8 +21,8 @@ const ProxyAuthorizationHeader = "proxy-authorization"

// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server.
// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values.
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error {
authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyTokenCache)
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err)
}
Expand Down Expand Up @@ -51,8 +51,8 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T
return nil
}

func GetProxyTokenSource(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (oauth2.TokenSource, error) {
tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand, proxyTokenCache)
func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) {
tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand)
if err != nil {
return nil, fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err)
}
Expand All @@ -63,18 +63,14 @@ func GetProxyTokenSource(ctx context.Context, cfg *Config, proxyTokenCache cache
return proxyTokenSource, nil
}

func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) {
proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache)
func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) {
proxyTokenSource, err := GetProxyTokenSource(ctx, cfg)
if err != nil {
return nil, err
}

token, err := proxyTokenSource.Token()
if err != nil {
return nil, err
}
md := metadata.Pairs(ProxyAuthorizationHeader, "Bearer "+token.AccessToken)
ctx = metadata.NewOutgoingContext(ctx, md)
wrappedTokenSource := NewCustomHeaderTokenSource(proxyTokenSource, cfg.UseInsecureConnection, ProxyAuthorizationHeader)
proxyCredentialsFuture.Store(wrappedTokenSource)

return ctx, nil
}
Expand All @@ -84,21 +80,27 @@ func shouldAttemptToAuthenticate(errorCode codes.Code) bool {
}

type proxyAuthTransport struct {
transport http.RoundTripper
tokenSource oauth2.TokenSource
transport http.RoundTripper
proxyCredentialsFuture *PerRPCCredentialsFuture
}

func (c *proxyAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) {
token, err := c.tokenSource.Token()
// check if the proxy credentials future is initialized
if !c.proxyCredentialsFuture.IsInitialized() {
return nil, errors.New("proxy credentials future is not initialized")
}

metadata, err := c.proxyCredentialsFuture.GetRequestMetadata(context.Background(), "")
if err != nil {
return nil, err
}
req.Header.Add(ProxyAuthorizationHeader, "Bearer "+token.AccessToken)
token := metadata[ProxyAuthorizationHeader]
req.Header.Add(ProxyAuthorizationHeader, token)
return c.transport.RoundTrip(req)
}

// Set up http client used in oauth2
func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) {
func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) {
httpClient := &http.Client{}
transport := &http.Transport{}

Expand All @@ -108,14 +110,9 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cach
}

if cfg.ProxyCommand != nil {
proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache)
if err != nil {
return nil, err
}

httpClient.Transport = &proxyAuthTransport{
transport: transport,
tokenSource: proxyTokenSource,
transport: transport,
proxyCredentialsFuture: proxyCredentialsFuture,
}
} else {
httpClient.Transport = transport
Expand All @@ -134,13 +131,12 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cach
// more. It'll fail hard if it couldn't do so (i.e. it will no longer attempt to send an unauthenticated request). Once
// a token source has been created, it'll invoke the grpc pipeline again, this time the grpc.PerRPCCredentials should
// be able to find and acquire a valid AccessToken to annotate the request with.
func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor {
func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, err := setHTTPClientContext(ctx, cfg, proxyTokenCache)
ctx, err := setHTTPClientContext(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return err
}

err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)
Expand All @@ -149,7 +145,7 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCach
// If the error we receive from executing the request expects
if shouldAttemptToAuthenticate(st.Code()) {
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, proxyTokenCache, credentialsFuture)
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr)
}
Expand All @@ -163,12 +159,17 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCach
}
}

func NewProxyAuthInterceptor(cfg *Config, proxyTokenCache cache.TokenCache) grpc.UnaryClientInterceptor {
func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyTokenCache)

err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
return fmt.Errorf("proxy authorization error! Original Error: %v", err)
ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("proxy authorization error! Original Error: %v", err)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
return invoker(ctx, method, req, reply, cc, opts...)
return err
}
}
21 changes: 11 additions & 10 deletions clients/go/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ func getAuthenticationDialOption(ctx context.Context, cfg *Config, tokenSourcePr
}

// InitializeAuthMetadataClient creates a new anonymously Auth Metadata Service client.
func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (client service.AuthMetadataServiceClient, err error) {
func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (client service.AuthMetadataServiceClient, err error) {
// Create an unauthenticated connection to fetch AuthMetadata
authMetadataConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache)
authMetadataConnection, err := NewAdminConnection(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return nil, fmt.Errorf("failed to initialized admin connection. Error: %w", err)
}

return service.NewAuthMetadataServiceClient(authMetadataConnection), nil
}

func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
func NewAdminConnection(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if opts == nil {
// Initialize opts list to the potential number of options we will add. Initialization optimizes memory
// allocation.
Expand Down Expand Up @@ -153,9 +153,9 @@ func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache.

opts = append(opts, GetAdditionalAdminClientConfigOptions(cfg)...)

// Ensure proxy auth interceptor is invoked prior to auth interceptor
if cfg.ProxyCommand != nil {
opts = append([]grpc.DialOption{grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, proxyTokenCache))}, opts...)
opts = append(opts, grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, proxyCredentialsFuture)))
opts = append(opts, grpc.WithPerRPCCredentials(proxyCredentialsFuture))
}

return grpc.Dial(cfg.Endpoint.String(), opts...)
Expand All @@ -164,7 +164,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache.
// InitializeAdminClient creates an AdminClient with a shared Admin connection for the process
// Deprecated: Please use initializeClients instead.
func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) service.AdminServiceClient {
set, err := initializeClients(ctx, cfg, nil, nil, opts...)
set, err := initializeClients(ctx, cfg, nil, opts...)
if err != nil {
logger.Panicf(ctx, "Failed to initialized client. Error: %v", err)
return nil
Expand All @@ -175,18 +175,19 @@ func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOp

// initializeClients creates an AdminClient, AuthServiceClient and IdentityServiceClient with a shared Admin connection
// for the process. Note that if called with different cfg/dialoptions, it will not refresh the connection.
func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) {
func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) {
credentialsFuture := NewPerRPCCredentialsFuture()
proxyCredentialsFuture := NewPerRPCCredentialsFuture()

opts = append(opts,
grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, proxyTokenCache, credentialsFuture)),
grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)),
grpc.WithPerRPCCredentials(credentialsFuture))

if cfg.DefaultServiceConfig != "" {
opts = append(opts, grpc.WithDefaultServiceConfig(cfg.DefaultServiceConfig))
}

adminConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache, opts...)
adminConnection, err := NewAdminConnection(ctx, cfg, proxyCredentialsFuture, opts...)
if err != nil {
logger.Panicf(ctx, "failed to initialized Admin connection. Err: %s", err.Error())
}
Expand All @@ -204,7 +205,7 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC

// Deprecated: Please use NewClientsetBuilder() instead.
func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) {
clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, nil, opts...)
clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...)
if err != nil {
return nil, err
}
Expand Down
19 changes: 4 additions & 15 deletions clients/go/admin/client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (

// ClientsetBuilder is used to build the clientset. This allows custom token cache implementations to be plugged in.
type ClientsetBuilder struct {
config *Config
tokenCache cache.TokenCache
proxyTokenCache cache.TokenCache
opts []grpc.DialOption
config *Config
tokenCache cache.TokenCache
opts []grpc.DialOption
}

// ClientSetBuilder is constructor function to be used by the clients in interacting with the builder
Expand All @@ -33,13 +32,6 @@ func (cb *ClientsetBuilder) WithTokenCache(tokenCache cache.TokenCache) *Clients
return cb
}

// TokenCache is designed to cache a single token. When clients choose to send `"proxy-authorization`"
// headers, we, thus, employ a separate token cache.
func (cb *ClientsetBuilder) WithProxyTokenCache(tokenCache cache.TokenCache) *ClientsetBuilder {
cb.proxyTokenCache = tokenCache
return cb
}

func (cb *ClientsetBuilder) WithDialOptions(opts ...grpc.DialOption) *ClientsetBuilder {
cb.opts = opts
return cb
Expand All @@ -50,15 +42,12 @@ func (cb *ClientsetBuilder) Build(ctx context.Context) (*Clientset, error) {
if cb.tokenCache == nil {
cb.tokenCache = &cache.TokenCacheInMemoryProvider{}
}
if cb.proxyTokenCache == nil {
cb.proxyTokenCache = &cache.TokenCacheInMemoryProvider{}
}

if cb.config == nil {
cb.config = GetConfig(ctx)
}

return initializeClients(ctx, cb.config, cb.tokenCache, cb.proxyTokenCache, cb.opts...)
return initializeClients(ctx, cb.config, cb.tokenCache, cb.opts...)
}

func NewClientsetBuilder() *ClientsetBuilder {
Expand Down
37 changes: 0 additions & 37 deletions clients/go/admin/externalprocess/token.go

This file was deleted.

24 changes: 0 additions & 24 deletions clients/go/admin/externalprocess/token_test.go

This file was deleted.

0 comments on commit b07d100

Please sign in to comment.