Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client, consumer] Move client features to consumerconnection #9996

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/client-move-to-consumer.yaml
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: collector/client

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecates collector/client. Use `consumerconnection` instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
note: Deprecates collector/client. Use `consumerconnection` instead.
note: Deprecates collector/client. Use `consumerconn` instead.


# One or more tracking issues or pull requests related to the change
issues: [9996]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
13 changes: 13 additions & 0 deletions client/client.go
Expand Up @@ -86,6 +86,8 @@ import (
type ctxKey struct{}

// Info contains data related to the clients connecting to receivers.
//
// Deprecated: [v0.99.0] Use consumerconn.Info instead
type Info struct {
// Addr for the client connecting to this collector. Available in a
// best-effort basis, and generally reliable for receivers making use of
Expand All @@ -103,6 +105,8 @@ type Info struct {

// AuthData represents the authentication data as seen by authenticators tied to
// the receivers.
//
// Deprecated: [v0.99.0] Use consumerconn.AuthData instead
type AuthData interface {
// GetAttribute returns the value for the given attribute. Authenticator
// implementations might define different data types for different
Expand All @@ -114,16 +118,21 @@ type AuthData interface {
GetAttributeNames() []string
}

// Deprecated: [v0.99.0] Use consumerconn.MetadataHostName instead
const MetadataHostName = "Host"

// NewContext takes an existing context and derives a new context with the
// client.Info value stored on it.
//
// Deprecated: [v0.99.0] Use consumerconnection.NewContextWithConnection instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Deprecated: [v0.99.0] Use consumerconnection.NewContextWithConnection instead
// Deprecated: [v0.99.0] Use consumerconn.NewContextWithConnection instead

func NewContext(ctx context.Context, c Info) context.Context {
return context.WithValue(ctx, ctxKey{}, c)
}

// FromContext takes a context and returns a ClientInfo from it.
// When a ClientInfo isn't present, a new empty one is returned.
//
// Deprecated: [v0.99.0] Use consumerconn.InfoFromContext instead
func FromContext(ctx context.Context) Info {
c, ok := ctx.Value(ctxKey{}).(Info)
if !ok {
Expand All @@ -133,11 +142,15 @@ func FromContext(ctx context.Context) Info {
}

// Metadata is an immutable map, meant to contain request metadata.
//
// Deprecated: [v0.99.0] Use consumerconn.Metadata instead
type Metadata struct {
data map[string][]string
}

// NewMetadata creates a new Metadata object to use in Info.
//
// Deprecated: [v0.99.0] Use consumerconn.NewMetadata instead
func NewMetadata(md map[string][]string) Metadata {
c := make(map[string][]string, len(md))
for k, v := range md {
Expand Down
18 changes: 9 additions & 9 deletions config/configgrpc/configgrpc.go
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
Expand All @@ -33,6 +32,7 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/consumer/consumerconn"
"go.opentelemetry.io/collector/extension/auth"
)

Expand Down Expand Up @@ -390,7 +390,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
}

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
// a consumerconn.Info, potentially with the peer's address.
func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
Expand All @@ -403,23 +403,23 @@ func enhanceStreamWithClientInformation(includeMetadata bool) func(srv any, ss g
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
// contextWithClient attempts to add the peer address to the consumerconn.Info from the context. When no
// consumerconn.Info exists in the context, one is created.
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
cl := client.FromContext(ctx)
cl := consumerconn.InfoFromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
copiedMD := md.Copy()
if len(md[client.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[client.MetadataHostName] = md[":authority"]
if len(md[consumerconn.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[consumerconn.MetadataHostName] = md[":authority"]
}
cl.Metadata = client.NewMetadata(copiedMD)
cl.Metadata = consumerconn.NewMetadata(copiedMD)
}
}
return client.NewContext(ctx, cl)
return consumerconn.NewContextWithInfo(ctx, cl)
}

func authUnaryServerInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, server auth.Server) (any, error) {
Expand Down
54 changes: 27 additions & 27 deletions config/configgrpc/configgrpc_test.go
Expand Up @@ -22,14 +22,14 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumerconn"
"go.opentelemetry.io/collector/extension/auth"
"go.opentelemetry.io/collector/extension/auth/authtest"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
Expand Down Expand Up @@ -696,21 +696,21 @@ func TestContextWithClient(t *testing.T) {
desc string
input context.Context
doMetadata bool
expected client.Info
expected consumerconn.Info
}{
{
desc: "no peer information, empty client",
input: context.Background(),
expected: client.Info{},
expected: consumerconn.Info{},
},
{
desc: "existing client with IP, no peer information",
input: client.NewContext(context.Background(), client.Info{
input: consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
}),
expected: client.Info{
expected: consumerconn.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
Expand All @@ -723,15 +723,15 @@ func TestContextWithClient(t *testing.T) {
IP: net.IPv4(1, 2, 3, 4),
},
}),
expected: client.Info{
expected: consumerconn.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
},
{
desc: "existing client, existing IP gets overridden with peer information",
input: peer.NewContext(client.NewContext(context.Background(), client.Info{
input: peer.NewContext(consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
Expand All @@ -740,56 +740,56 @@ func TestContextWithClient(t *testing.T) {
IP: net.IPv4(1, 2, 3, 5),
},
}),
expected: client.Info{
expected: consumerconn.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 5),
},
},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
input: consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{
Metadata: consumerconn.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
}),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
expected: consumerconn.Info{
Metadata: consumerconn.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
expected: consumerconn.Info{
Metadata: consumerconn.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context, no metadata processing",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
expected: client.Info{},
expected: consumerconn.Info{},
},
{
desc: "existing client with Host and metadata",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{}),
metadata.Pairs("test-metadata-key", "test-value", ":authority", "localhost:55443"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}),
expected: consumerconn.Info{
Metadata: consumerconn.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}),
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata))
cl := consumerconn.InfoFromContext(contextWithClient(tC.input, tC.doMetadata))
assert.Equal(t, tC.expected, cl)
})
}
Expand Down Expand Up @@ -817,7 +817,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
// verify
assert.NoError(t, err)

cl := client.FromContext(outContext)
cl := consumerconn.InfoFromContext(outContext)
assert.Equal(t, "1.1.1.1", cl.Addr.String())
}

Expand Down Expand Up @@ -901,7 +901,7 @@ func TestClientInfoInterceptors(t *testing.T) {
}

// verify
cl := client.FromContext(mock.recordedContext)
cl := consumerconn.InfoFromContext(mock.recordedContext)

// the client address is something like 127.0.0.1:41086
assert.Contains(t, cl.Addr.String(), "127.0.0.1")
Expand All @@ -915,15 +915,15 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
ctx := client.NewContext(context.Background(), client.Info{
ctx := consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})

return ctx, nil
}
handler := func(ctx context.Context, _ any) (any, error) {
handlerCalled = true
cl := client.FromContext(ctx)
cl := consumerconn.InfoFromContext(ctx)
assert.Equal(t, "1.2.3.4", cl.Addr.String())
return nil, nil
}
Expand Down Expand Up @@ -987,14 +987,14 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
ctx := client.NewContext(context.Background(), client.Info{
ctx := consumerconn.NewContextWithInfo(context.Background(), consumerconn.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})
return ctx, nil
}
handler := func(_ any, stream grpc.ServerStream) error {
// ensure that the client information is propagated down to the underlying stream
cl := client.FromContext(stream.Context())
cl := consumerconn.InfoFromContext(stream.Context())
assert.Equal(t, "1.2.3.4", cl.Addr.String())
handlerCalled = true
return nil
Expand Down
3 changes: 2 additions & 1 deletion config/configgrpc/go.mod
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/mostynb/go-grpc-compression v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.98.0
go.opentelemetry.io/collector/component v0.98.0
go.opentelemetry.io/collector/config/configauth v0.98.0
go.opentelemetry.io/collector/config/configcompression v1.5.0
Expand All @@ -14,6 +13,7 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.98.0
go.opentelemetry.io/collector/config/configtls v0.98.0
go.opentelemetry.io/collector/config/internal v0.98.0
go.opentelemetry.io/collector/consumer v0.98.0
go.opentelemetry.io/collector/extension/auth v0.98.0
go.opentelemetry.io/collector/pdata v1.5.0
go.opentelemetry.io/collector/pdata/testdata v0.98.0
Expand Down Expand Up @@ -49,6 +49,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.3 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector v0.98.0 // indirect
go.opentelemetry.io/collector/confmap v0.98.0 // indirect
go.opentelemetry.io/collector/extension v0.98.0 // indirect
go.opentelemetry.io/collector/featuregate v1.5.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions config/confighttp/clientinfohandler.go
Expand Up @@ -8,10 +8,10 @@ import (
"net"
"net/http"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer/consumerconn"
)

// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info.
// clientInfoHandler is an http.Handler that enhances the incoming request context with consumerconn.Info.
type clientInfoHandler struct {
next http.Handler

Expand All @@ -20,16 +20,16 @@ type clientInfoHandler struct {
}

// ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains
// a client.Info containing the client's IP address.
// a consumerconn.Info containing the client's IP address.
func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(contextWithClient(req, h.includeMetadata))
h.next.ServeHTTP(w, req)
}

// contextWithClient attempts to add the client IP address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
// contextWithClient attempts to add the client IP address to the consumerconn.Info from the context. When no
// consumerconn.Info exists in the context, one is created.
func contextWithClient(req *http.Request, includeMetadata bool) context.Context {
cl := client.FromContext(req.Context())
cl := consumerconn.InfoFromContext(req.Context())

ip := parseIP(req.RemoteAddr)
if ip != nil {
Expand All @@ -38,14 +38,14 @@ func contextWithClient(req *http.Request, includeMetadata bool) context.Context

if includeMetadata {
md := req.Header.Clone()
if len(md.Get(client.MetadataHostName)) == 0 && req.Host != "" {
md.Add(client.MetadataHostName, req.Host)
if len(md.Get(consumerconn.MetadataHostName)) == 0 && req.Host != "" {
md.Add(consumerconn.MetadataHostName, req.Host)
}

cl.Metadata = client.NewMetadata(md)
cl.Metadata = consumerconn.NewMetadata(md)
}

ctx := client.NewContext(req.Context(), cl)
ctx := consumerconn.NewContextWithInfo(req.Context(), cl)
return ctx
}

Expand Down