Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: is there a way to set up automatic performance monitoring transaction for each incoming request for Tonic #636

Open
spencerbart opened this issue Jan 22, 2024 · 4 comments

Comments

@spencerbart
Copy link

Is there a way to do something similar like this

let layer = tower::ServiceBuilder::new()
    .layer(sentry_tower::NewSentryLayer::<Request>::new_from_top())
    .layer(sentry_tower::SentryHttpLayer::with_transaction());

but for Tonic?

The description for sentry_tower::SentryHttpLayer::with_transaction() is Creates a new Layer which starts a new performance monitoring transaction for each incoming request. It would be nice to have this for tonic instead of going down the tracing route.

@Swatinem
Copy link
Member

I think it should be fairly straight forward to copy the existing code for SentryHttpLayer over and adapt it to work with tonic. Apart from starting a transaction, the HttpLayer also does things like add the incoming request details to the scope, and use the incoming headers trace-id to start the transaction.

@spencerbart
Copy link
Author

I am attempting to meld this https://github.com/getsentry/sentry-rust/blob/master/sentry-tower/src/http.rs and this https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs together but it's not working. Not sure why.

use std::convert::TryInto;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use http::{header, uri, StatusCode};
use hyper::{Body, Request, Response};
use sentry::protocol;
use tonic::body::BoxBody;
use tower_layer::Layer;
use tower_service::Service;

/// Tower Layer that logs Http Request Headers.
///
/// The Service created by this Layer can also optionally start a new
/// performance monitoring transaction for each incoming request,
/// continuing the trace based on incoming distributed tracing headers.
///
/// The created transaction will automatically use the request URI as its name.
/// This is sometimes not desirable in case the request URI contains unique IDs
/// or similar. In this case, users should manually override the transaction name
/// in the request handler using the [`Scope::set_transaction`](sentry::Scope::set_transaction)
/// method.
#[derive(Clone, Default)]
pub struct SentryGrpcLayer {
    start_transaction: bool,
}

impl SentryGrpcLayer {
    /// Creates a new Layer that only logs Request Headers.
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates a new Layer which starts a new performance monitoring transaction
    /// for each incoming request.
    pub fn with_transaction() -> Self {
        Self {
            start_transaction: true,
        }
    }
}

/// Tower Service that logs Http Request Headers.
///
/// The Service can also optionally start a new performance monitoring transaction
/// for each incoming request, continuing the trace based on incoming
/// distributed tracing headers.
#[derive(Clone)]
pub struct SentryGrpcService<S> {
    service: S,
    start_transaction: bool,
}

impl<S> Layer<S> for SentryGrpcLayer {
    type Service = SentryGrpcService<S>;

    fn layer(&self, service: S) -> Self::Service {
        Self::Service {
            service,
            start_transaction: self.start_transaction,
        }
    }
}

/// The Future returned from [`SentryHttpService`].
#[pin_project::pin_project]
pub struct SentryGrpcFuture<F> {
    on_first_poll: Option<(
        sentry::protocol::Request,
        Option<sentry::TransactionContext>,
    )>,
    transaction: Option<(sentry::TransactionOrSpan, Option<sentry::TransactionOrSpan>)>,
    #[pin]
    future: F,
}

impl<F, BoxBody, Error> Future for SentryGrpcFuture<F>
where
    F: Future<Output = Result<Response<BoxBody>, Error>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("SentryGrpcFuture::poll");
        let slf = self.project();
        if let Some((sentry_req, trx_ctx)) = slf.on_first_poll.take() {
            sentry::configure_scope(|scope| {
                if let Some(trx_ctx) = trx_ctx {
                    let transaction: sentry::TransactionOrSpan =
                        sentry::start_transaction(trx_ctx).into();
                    transaction.set_request(sentry_req.clone());
                    let parent_span = scope.get_span();
                    scope.set_span(Some(transaction.clone()));
                    *slf.transaction = Some((transaction, parent_span));
                }

                scope.add_event_processor(move |mut event| {
                    if event.request.is_none() {
                        event.request = Some(sentry_req.clone());
                    }
                    Some(event)
                });
            });
        }
        match slf.future.poll(cx) {
            Poll::Ready(res) => {
                if let Some((transaction, parent_span)) = slf.transaction.take() {
                    if transaction.get_status().is_none() {
                        let status = match &res {
                            Ok(res) => map_status(res.status()),
                            Err(_) => protocol::SpanStatus::UnknownError,
                        };
                        transaction.set_status(status);
                    }
                    transaction.finish();
                    sentry::configure_scope(|scope| scope.set_span(parent_span));
                }
                Poll::Ready(res)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<S> Service<Request<Body>> for SentryGrpcService<S>
where
    S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = SentryGrpcFuture<S::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request<Body>) -> Self::Future {
        println!("SentryGrpcService::call");
        let sentry_req = sentry::protocol::Request {
            method: None,
            url: get_url_from_request(&request),
            headers: request
                .headers()
                .into_iter()
                .map(|(header, value)| {
                    (
                        header.to_string(),
                        value.to_str().unwrap_or_default().into(),
                    )
                })
                .collect(),
            ..Default::default()
        };
        let trx_ctx = if self.start_transaction {
            let headers = request.headers().into_iter().flat_map(|(header, value)| {
                value.to_str().ok().map(|value| (header.as_str(), value))
            });

            let tx_name = format!("{}", path_from_request(&request));
            Some(sentry::TransactionContext::continue_from_headers(
                &tx_name,
                "http.server",
                headers,
            ))
        } else {
            None
        };

        SentryGrpcFuture {
            on_first_poll: Some((sentry_req, trx_ctx)),
            transaction: None,
            future: self.service.call(request),
        }
    }
}

fn path_from_request<B>(request: &Request<B>) -> &str {
    request.uri().path()
}

fn map_status(status: StatusCode) -> protocol::SpanStatus {
    match status {
        StatusCode::UNAUTHORIZED => protocol::SpanStatus::Unauthenticated,
        StatusCode::FORBIDDEN => protocol::SpanStatus::PermissionDenied,
        StatusCode::NOT_FOUND => protocol::SpanStatus::NotFound,
        StatusCode::TOO_MANY_REQUESTS => protocol::SpanStatus::ResourceExhausted,
        status if status.is_client_error() => protocol::SpanStatus::InvalidArgument,
        StatusCode::NOT_IMPLEMENTED => protocol::SpanStatus::Unimplemented,
        StatusCode::SERVICE_UNAVAILABLE => protocol::SpanStatus::Unavailable,
        status if status.is_server_error() => protocol::SpanStatus::InternalError,
        StatusCode::CONFLICT => protocol::SpanStatus::AlreadyExists,
        status if status.is_success() => protocol::SpanStatus::Ok,
        _ => protocol::SpanStatus::UnknownError,
    }
}

fn get_url_from_request<B>(request: &Request<B>) -> Option<url::Url> {
    let uri = request.uri().clone();
    let mut uri_parts = uri.into_parts();
    uri_parts.scheme.get_or_insert(uri::Scheme::HTTP);
    if uri_parts.authority.is_none() {
        let host = request.headers().get(header::HOST)?.as_bytes();
        uri_parts.authority = Some(host.try_into().ok()?);
    }
    let uri = uri::Uri::from_parts(uri_parts).ok()?;
    uri.to_string().parse().ok()
}

@Swatinem
Copy link
Member

impl<S> Service<Request<Body>> for SentryGrpcService<S>

I’m not quite sure what kind of request/body type you need for grpc, but I doubt that it is hyper::Request<hyper::Body>?

@spencerbart
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants