Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Update opentelemetry-collector-contrib
Browse files Browse the repository at this point in the history
This version includes the changes to the Jaeger translator that support
multi ChildOf references to the links.

open-telemetry/opentelemetry-collector-contrib#14463

We used to do this before translating to OTEL and after translating from
OTEL, in order to keep this information and pass the Jaeger
certification tests.
  • Loading branch information
alejandrodnm committed Jan 11, 2023
1 parent ca19e2f commit d93fde7
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 1,884 deletions.
19 changes: 10 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ require (
github.com/jackc/pgx/v5 v5.2.0
github.com/jaegertracing/jaeger v1.38.2-0.20221006002917-5bf8a28fe06d
github.com/mitchellh/mapstructure v1.5.0
github.com/jackc/pgproto3/v2 v2.3.1
github.com/oklog/run v1.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.61.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.68.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/peterbourgon/ff/v3 v3.1.2
github.com/pkg/errors v0.9.1
Expand All @@ -41,14 +42,14 @@ require (
github.com/testcontainers/testcontainers-go v0.13.0
github.com/thanos-io/thanos v0.28.1
github.com/walle/targz v0.0.0-20140417120357-57fe4206da5a
go.opentelemetry.io/collector/pdata v0.61.0
go.opentelemetry.io/collector/semconv v0.61.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0
go.opentelemetry.io/otel/sdk v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
go.opentelemetry.io/collector/pdata v1.0.0-rc2
go.opentelemetry.io/collector/semconv v1.0.0-rc2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/jaeger v1.11.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1
go.opentelemetry.io/otel/sdk v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
go.uber.org/atomic v1.10.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/goleak v1.2.0
Expand Down
1,712 changes: 0 additions & 1,712 deletions go.sum

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ type tracesServer struct {
ingestor ingestor.DBInserter
}

func (t *tracesServer) Export(ctx context.Context, tr ptraceotlp.Request) (ptraceotlp.Response, error) {
return ptraceotlp.NewResponse(), t.ingestor.IngestTraces(ctx, tr.Traces())
func (t *tracesServer) Export(ctx context.Context, tr ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
return ptraceotlp.NewExportResponse(), t.ingestor.IngestTraces(ctx, tr.Traces())
}
34 changes: 27 additions & 7 deletions pkg/jaeger/store/trace_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func ScanRow(row pgxconn.PgxRows, traces *ptrace.Traces) error {
return nil
}

func newMapFromRaw(m map[string]interface{}) pcommon.Map {
func newMapFromRaw(m map[string]interface{}) (pcommon.Map, error) {
pm := pcommon.NewMap()
pm.FromRaw(m)
return pm
err := pm.FromRaw(m)
return pm, err
}

func populateSpan(
Expand All @@ -122,7 +122,12 @@ func populateSpan(
if err != nil {
return fmt.Errorf("making resource tags: %w", err)
}
newMapFromRaw(attr).CopyTo(resourceSpan.Resource().Attributes())

m, err := newMapFromRaw(attr)
if err != nil {
return fmt.Errorf("making resource tags map: %w", err)
}
m.CopyTo(resourceSpan.Resource().Attributes())

instrumentationLibSpan := resourceSpan.ScopeSpans().AppendEmpty()
if dbResult.instLibSchemaUrl != nil {
Expand Down Expand Up @@ -186,7 +191,12 @@ func populateSpan(
if err != nil {
return fmt.Errorf("making span tags: %w", err)
}
newMapFromRaw(attr).CopyTo(ref.Attributes())

m, err = newMapFromRaw(attr)
if err != nil {
return fmt.Errorf("making resource tags map: %w", err)
}
m.CopyTo(ref.Attributes())

if dbResult.eventNames != nil {
if err := populateEvents(ref.Events(), dbResult); err != nil {
Expand Down Expand Up @@ -226,7 +236,12 @@ func populateEvents(
if err != nil {
return fmt.Errorf("making event tags: %w", err)
}
newMapFromRaw(attr).CopyTo(event.Attributes())

m, err := newMapFromRaw(attr)
if err != nil {
return fmt.Errorf("making resource tags map: %w", err)
}
m.CopyTo(event.Attributes())
}
return nil
}
Expand Down Expand Up @@ -256,7 +271,12 @@ func populateLinks(
if err != nil {
return fmt.Errorf("making link tags: %w", err)
}
newMapFromRaw(attr).CopyTo(link.Attributes())

m, err := newMapFromRaw(attr)
if err != nil {
return fmt.Errorf("making resource tags map: %w", err)
}
m.CopyTo(link.Attributes())
}
return nil
}
Expand Down
154 changes: 13 additions & 141 deletions pkg/jaeger/store/translation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)

var (
Expand All @@ -31,6 +30,15 @@ var (
"producer": ptrace.SpanKindProducer,
"unspecified": ptrace.SpanKindUnspecified,
}

spanKindToInternalValue = map[string]string{
"SPAN_KIND_UNSPECIFIED": "unspecified",
"SPAN_KIND_INTERNAL": "internal",
"SPAN_KIND_SERVER": "server",
"SPAN_KIND_CLIENT": "client",
"SPAN_KIND_PRODUCER": "producer",
"SPAN_KIND_CONSUMER": "consumer",
}
// Map of Jaeger span kind tag strings to internal DB values.
// This is kept explicit even though the values are identical.
jSpanKindToInternalValue = map[string]string{
Expand Down Expand Up @@ -88,12 +96,11 @@ func internalToSpanKind(s string) ptrace.SpanKind {
}

func spanKindStringToInternal(kind string) (string, error) {
for k, v := range spanKindInternalValue {
if v.String() == kind {
return k, nil
}
v, ok := spanKindToInternalValue[kind]
if !ok {
return "", fmt.Errorf("unknown span kind: %s", kind)
}
return "", fmt.Errorf("unknown span kind: %s", kind)
return v, nil
}

// JSpanKindToInternal translates Jaeger span kind tag value to internal enum value used for storage.
Expand All @@ -118,161 +125,26 @@ func ProtoToTraces(span *model.Span) (ptrace.Traces, error) {
return ptrace.NewTraces(), err
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can remove the following
// if condition and the addRefTypeAttributeToLinks function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if len(span.References) > 1 {
links := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Links()
addRefTypeAttributeToLinks(span, links)
}

return traces, nil
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can delete this function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
//
// addRefTypeAttributeToLinks adds the RefType of the Jaeger Span references as
// an attribute to their corresponding OTEL links. The `links` argument must
// be the OTEL representation of the given `span.References`.
//
// The added attributes follow the OpenTracing to OTEL semantic convention
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
func addRefTypeAttributeToLinks(span *model.Span, links ptrace.SpanLinkSlice) {

// The reference to the parent span is stored directly as an attribute
// of the Span and not as a Link.
parentsSpanID := span.ParentSpanID()

// Recreate SpanLinkSlice from Span References.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/c9e646b99610615730a845acc278b30bf49aa35f/pkg/translator/jaeger/jaegerproto_to_traces.go#L411-L427
spanLinks := ptrace.NewSpanLinkSlice()
spanLinks.EnsureCapacity(len(span.References))
for _, ref := range span.References {
if ref.RefType == model.ChildOf && ref.SpanID == parentsSpanID {
continue
}
sl := spanLinks.AppendEmpty()
jSpanRefToInternal(ref, sl)
}
spanLinks.CopyTo(links)
}

func jSpanRefToInternal(ref model.SpanRef, link ptrace.SpanLink) {
link.SetTraceID(uInt64ToTraceID(ref.TraceID.High, ref.TraceID.Low))
link.SetSpanID(uInt64ToSpanID(uint64(ref.SpanID)))

// Since there are only 2 types of refereces, ChildOf and FollowsFrom, we
// keep track only of the former.
// Everything that's not ChildOf will be set as FollowsFrom.
if ref.RefType == model.ChildOf {
link.Attributes().PutString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeChildOf,
)
} else {
link.Attributes().PutString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeFollowsFrom,
)
}
}

func ProtoFromTraces(traces ptrace.Traces) ([]*model.Batch, error) {
batches, err := jaegertranslator.ProtoFromTraces(traces)
if err != nil {
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
}
otherParents := getOtherParents(traces)
for _, batch := range batches {
if batch != nil {
decodeBinaryTags(batch.Process.Tags)
}
for _, span := range batch.GetSpans() {
decodeSpanBinaryTags(span)

// TODO: There's an open PR against the Jaeger translator that adds
// support for keeping the RefType. Once the PR is merged we can remove
// the following if condition and the getOtherParents function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if pIdxs, ok := otherParents[span.SpanID]; ok {
refs := span.GetReferences()
for _, i := range pIdxs {
refs[i].RefType = model.ChildOf
}
}
}
}
return batches, nil
}

// getOtherParents returns a map where the keys are the IDs of Spans that have
// more than one parent and the values are the position in the Span.References
// list where those other parents references are.
//
// A parent is a link that has the `child_of` attribute defined in the semantic
// convention for opentracing:
//
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
//
// It tracks the position instead of the SpanID because there might multiple
// links to the same SpanID but different RefTypes.
func getOtherParents(traces ptrace.Traces) map[model.SpanID][]int {
otherParents := map[model.SpanID][]int{}

resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rSpan := resourceSpans.At(i)
sSpans := rSpan.ScopeSpans()
for j := 0; j < sSpans.Len(); j++ {
sSpan := sSpans.At(j)
spans := sSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
links := span.Links()

// We need an offset because if the span has a ParentSpanID, then
// that's going to be the first link when translating from OTEL to
// Jaeger. We could say that is it doesn't have a ParentSpanID then
// it shouldn't have other parents, but just to be extra safe we
// inspect the attributes even if there's no ParentSpanID set.
offset := 0
if !span.ParentSpanID().IsEmpty() {
offset = 1
}
for l := 0; l < links.Len(); l++ {
link := links.At(l)
v, ok := link.Attributes().Get(conventions.AttributeOpentracingRefType)
if !ok || v.Str() != conventions.AttributeOpentracingRefTypeChildOf {
continue
}
spanID := spanIDToJaegerProto(span.SpanID())
pIdxs, ok := otherParents[spanID]
if !ok {
pIdxs = []int{}
otherParents[spanID] = pIdxs
}
otherParents[spanID] = append(pIdxs, l+offset)
}
}
}
}
return otherParents
}

// UInt64ToTraceID converts the pair of uint64 representation of a TraceID to pcommon.TraceID.
func uInt64ToTraceID(high, low uint64) pcommon.TraceID {
traceID := [16]byte{}
binary.BigEndian.PutUint64(traceID[:8], high)
binary.BigEndian.PutUint64(traceID[8:], low)
return traceID
}

func uInt64ToSpanID(id uint64) pcommon.SpanID {
spanID := pcommon.SpanID{}
binary.BigEndian.PutUint64(spanID[:], id)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jaeger/store/translation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSpanMultipleParentProtoFromTraces(t *testing.T) {
otherParentSpanID := pcommon.SpanID{'1', '2', '3', '4', '5', '6', '7', '9'}
childOfLink.SetSpanID(otherParentSpanID)
childOfLink.SetTraceID(traceID)
childOfLink.Attributes().PutString(
childOfLink.Attributes().PutStr(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeChildOf,
)
Expand Down
10 changes: 3 additions & 7 deletions pkg/pgmodel/ingestor/trace/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var (
traceLinkLabel = prometheus.Labels{"type": "trace", "kind": "link"}
)

var tracesMarshaller = ptrace.NewProtoMarshaler()
var tracesMarshaller = &ptrace.ProtoMarshaler{}

func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces ptrace.Traces) error {
startIngest := time.Now() // Time taken for complete ingestion => Processing + DB insert.
Expand Down Expand Up @@ -357,12 +357,8 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces ptrace.Traces
// Only report telemetry if ingestion successful.
tput.ReportSpansProcessed(timestamp.FromTime(time.Now()), traces.SpanCount())

// since otel is making Protobufs internal this is our only chance to get the size of the message
tracesSizer, ok := tracesMarshaller.(ptrace.Sizer)
if ok {
size := tracesSizer.TracesSize(traces)
metrics.IngestorBytes.With(prometheus.Labels{"type": "trace"}).Add(float64(size))
}
size := tracesMarshaller.TracesSize(traces)
metrics.IngestorBytes.With(prometheus.Labels{"type": "trace"}).Add(float64(size))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func Run(cfg *Config) error {
options = append(options, grpc.Creds(creds))
}
grpcServer := grpc.NewServer(options...)
ptraceotlp.RegisterServer(grpcServer, api.NewTraceServer(client))
ptraceotlp.RegisterGRPCServer(grpcServer, api.NewTraceServer(client))

queryPlugin := shared.StorageGRPCPlugin{
Impl: jaegerStore,
Expand Down
4 changes: 3 additions & 1 deletion pkg/tests/end_to_end_tests/ingest_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ func getTracesFixtures() (tracesFixtures, error) {

// We recreate the batches to have a unique copy that can be
// modified without altering trace1 and trace2
batches, err = tracesFixturesToBatches(traces.Clone())
newTraces := ptrace.NewTraces()
traces.CopyTo(newTraces)
batches, err = tracesFixturesToBatches(newTraces)
if err != nil {
return tracesFixtures{}, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/tests/testdata/trace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (

func newMapFromRaw(m map[string]interface{}) pcommon.Map {
pm := pcommon.NewMap()
pm.FromRaw(m)
_ = pm.FromRaw(m)
return pm
}

Expand Down Expand Up @@ -261,7 +261,9 @@ func fillSpanThree(span ptrace.Span, parentSpanID pcommon.SpanID) {

// deep copy the traces since we mutate them.
func CopyTraces(traces ptrace.Traces) ptrace.Traces {
return traces.Clone()
newTraces := ptrace.NewTraces()
traces.CopyTo(newTraces)
return newTraces
}

func readTraces(t testing.TB, count int) []ptrace.Traces {
Expand Down Expand Up @@ -297,7 +299,7 @@ func readTraces(t testing.TB, count int) []ptrace.Traces {
var (
buf []byte
tr ptrace.Traces
tracesUnmarshaler = ptrace.NewProtoUnmarshaler()
tracesUnmarshaler = ptrace.ProtoUnmarshaler{}
i = 0
result = make([]ptrace.Traces, 0, count)
)
Expand Down

0 comments on commit d93fde7

Please sign in to comment.