Skip to content

Commit

Permalink
Update natchez-core, natchez-jaeger to 0.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
47erbot committed Feb 18, 2023
1 parent c6e0b87 commit cafb3aa
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 12 deletions.
Expand Up @@ -17,7 +17,7 @@
package integrationtest.protobuf

import cats.effect.{IO, Resource}
import fs2._
import fs2.Stream
import higherkindness.mu.rpc._
import higherkindness.mu.rpc.protocol.Empty
import integrationtest._
Expand Down
Expand Up @@ -23,7 +23,7 @@ import _root_.fs2._
import _root_.fs2.concurrent._
import _root_.grpc.health.v1.health._
import _root_.grpc.health.v1.health.HealthCheckResponse.ServingStatus
import io.grpc.{Status, StatusException}
import _root_.io.grpc.{Status, StatusException}

trait HealthService[F[_]] extends Health[F] {

Expand Down
Expand Up @@ -22,6 +22,7 @@ import higherkindness.mu.rpc.internal.context.{ClientContext, ClientContextMetaD
import io.grpc.Metadata.{ASCII_STRING_MARSHALLER, BINARY_HEADER_SUFFIX, Key}
import io.grpc.{CallOptions, Channel, Metadata, MethodDescriptor}
import natchez.{EntryPoint, Kernel, Span}
import org.typelevel.ci.CIString

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -55,7 +56,7 @@ object implicits {
def tracingKernelToHeaders(kernel: Kernel): Metadata = {
val headers = new Metadata()
kernel.toHeaders.foreach { case (k, v) =>
headers.put(Key.of(k, ASCII_STRING_MARSHALLER), v)
headers.put(Key.of(k.toString, ASCII_STRING_MARSHALLER), v)
}
headers
}
Expand All @@ -66,7 +67,7 @@ object implicits {
.asScala
.collect {
case k if !k.endsWith(BINARY_HEADER_SUFFIX) =>
k -> headers.get(Key.of(k, ASCII_STRING_MARSHALLER))
CIString(k) -> headers.get(Key.of(k, ASCII_STRING_MARSHALLER))
}
.toMap
Kernel(asciiHeaders)
Expand Down
Expand Up @@ -16,7 +16,7 @@

package higherkindness.mu.tests.rpc

import _root_.fs2._
import fs2.Stream
import cats.effect.{IO, Resource}
import higherkindness.mu.rpc.ChannelForAddress
import higherkindness.mu.rpc.protocol.{Gzip, Identity}
Expand Down
Expand Up @@ -22,6 +22,7 @@ import natchez._

import java.net.URI
import cats.effect.Ref
import org.typelevel.ci.CIString

/*
* A minimal Natchez tracing implementation that accumulates
Expand All @@ -47,9 +48,9 @@ object Tracing {
IO.unit // not implemented

def kernel: IO[Kernel] =
IO.pure(Kernel(Map("span-id" -> id.toString)))
IO.pure(Kernel(Map(CIString("span-id") -> id.toString)))

def span(name: String): Resource[IO, Span[IO]] =
override def span(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.make(
for {
spanId <- ref.modify(_.incrementNextSpanId)
Expand All @@ -63,12 +64,21 @@ object Tracing {
override def traceUri: IO[Option[URI]] = IO.pure(None)

override def spanId: IO[Option[String]] = IO.pure(Some(id.toString))

override def log(fields: (String, TraceValue)*): IO[Unit] =
IO.unit // not implemented

override def log(event: String): IO[Unit] =
IO.unit // not implemented

override def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] =
IO.unit // not implemented
}

def entrypoint(ref: Ref[IO, TracingData]): EntryPoint[IO] =
new EntryPoint[IO] {

def root(name: String): Resource[IO, Span[IO]] =
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.make(
for {
spanId <- ref.modify(_.incrementNextSpanId)
Expand All @@ -77,12 +87,16 @@ object Tracing {
} yield span
)(span => ref.update(_.append(s"End $span")))

def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
override def continue(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
Resource.make(
for {
parentSpanId <- IO {
kernel.toHeaders
.get("span-id")
.get(CIString("span-id"))
.map(_.toInt)
.getOrElse(throw new Exception("Required trace header not found!"))
}
Expand All @@ -92,7 +106,11 @@ object Tracing {
} yield span
)(span => ref.update(_.append(s"End $span")))

def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
continue(name, kernel).recoverWith { case _: Exception =>
root(name)
}
Expand Down
2 changes: 1 addition & 1 deletion project/ProjectPlugin.scala
Expand Up @@ -32,7 +32,7 @@ object ProjectPlugin extends AutoPlugin {
val logback: String = "1.4.5"
val munit: String = "0.7.29"
val munitCE: String = "1.0.7"
val natchez: String = "0.1.6"
val natchez: String = "0.3.1"
val nettySSL: String = "2.0.54.Final"
val paradise: String = "2.1.1"
val pbdirect: String = "0.7.0"
Expand Down

0 comments on commit cafb3aa

Please sign in to comment.