Skip to content

Commit

Permalink
otel: add opentelemetry traces
Browse files Browse the repository at this point in the history
  • Loading branch information
develop7 committed Mar 11, 2024
1 parent 58999f5 commit 94d2b9b
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 72 deletions.
5 changes: 5 additions & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ library
PostgREST.Query.QueryBuilder
PostgREST.Query.SqlFragment
PostgREST.Query.Statements
PostgREST.OpenTelemetry
PostgREST.Plan
PostgREST.Plan.CallPlan
PostgREST.Plan.MutatePlan
Expand Down Expand Up @@ -113,6 +114,9 @@ library
, hasql-transaction >= 1.0.1 && < 1.1
, heredoc >= 0.2 && < 0.3
, http-types >= 0.12.2 && < 0.13
, hs-opentelemetry-sdk >= 0.0.3.6 && < 0.0.4
, hs-opentelemetry-instrumentation-wai
, hs-opentelemetry-utils-exceptions
, insert-ordered-containers >= 0.2.2 && < 0.3
, interpolatedstring-perl6 >= 1 && < 1.1
, jose >= 0.8.5.1 && < 0.12
Expand Down Expand Up @@ -257,6 +261,7 @@ test-suite spec
, hasql-pool >= 0.10 && < 0.11
, hasql-transaction >= 1.0.1 && < 1.1
, heredoc >= 0.2 && < 0.3
, hs-opentelemetry-sdk >= 0.0.3.6 && < 0.0.4
, hspec >= 2.3 && < 2.12
, hspec-wai >= 0.10 && < 0.12
, hspec-wai-json >= 0.10 && < 0.12
Expand Down
118 changes: 63 additions & 55 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ import qualified PostgREST.Unix as Unix (installSignalHandlers)

import PostgREST.ApiRequest (Action (..), ApiRequest (..),
Mutation (..), Target (..))
import PostgREST.AppState (AppState)
import PostgREST.AppState (AppState, getOTelTracer)
import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Error (Error (..))
import PostgREST.Observation (Observation (..))
import PostgREST.Query (DbHandler)
import PostgREST.Response.Performance (ServerTiming (..),
Expand All @@ -58,12 +58,15 @@ import PostgREST.SchemaCache (SchemaCache (..))
import PostgREST.SchemaCache.Routine (Routine (..))
import PostgREST.Version (docsVersion, prettyVersion)

import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import Protolude hiding (Handler)
import System.TimeIt (timeItT)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware)
import OpenTelemetry.Trace (defaultSpanArguments)
import OpenTelemetry.Utils.Exceptions (inSpanM)
import Protolude hiding (Handler)
import System.TimeIt (timeItT)

type Handler = ExceptT Error

Expand All @@ -88,7 +91,9 @@ run appState observer = do
port <- NS.socketPort $ AppState.getSocketREST appState
observer $ AppServerPortObs port

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app
oTelMWare <- newOpenTelemetryWaiMiddleware

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) (oTelMWare app)

serverSettings :: AppConfig -> Warp.Settings
serverSettings AppConfig{..} =
Expand All @@ -106,27 +111,28 @@ postgrest conf appState connWorker observer =
Logger.middleware (configLogLevel conf) $
-- fromJust can be used, because the auth middleware will **always** add
-- some AuthResult to the vault.
\req respond -> case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
return $ addRetryHint delay response
respond resp
\req respond -> inSpanM (getOTelTracer appState) "respond" defaultSpanArguments $
case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse = inSpanM (getOTelTracer appState) "eitherResponse" defaultSpanArguments $
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
return $ addRetryHint delay response
respond resp

postgrestResponse
:: AppState.AppState
Expand Down Expand Up @@ -172,58 +178,58 @@ handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool ->
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime observer =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
(planTime', wrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationCreate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationUpdate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationSingleUpsert, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationDelete, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInvoke invMethod, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
(planTime', iPlan) <- withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withTiming $ runQuery roleIsoLvl mempty (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
(planTime', iPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInfo, TargetIdent identifier) -> do
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst

(ActionInfo, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' Nothing respTime') pgrst

(ActionInfo, TargetDefaultSpec _) -> do
(respTime', pgrst) <- withTiming $ liftEither Response.infoRootResponse
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither Response.infoRootResponse
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst

_ ->
Expand All @@ -244,6 +250,8 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A

withTiming = calcTiming $ configServerTimingEnabled conf

withOTel label = inSpanM (getOTelTracer appState) label defaultSpanArguments

calcTiming :: Bool -> Handler IO a -> Handler IO (Maybe Double, a)
calcTiming timingEnabled f = if timingEnabled
then do
Expand Down
20 changes: 14 additions & 6 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module PostgREST.AppState
, getJwtCache
, getSocketREST
, getSocketAdmin
, getOTelTracer
, init
, initSockets
, initWithPool
Expand Down Expand Up @@ -71,6 +72,7 @@ import PostgREST.Unix (createAndBindDomainSocket)

import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
import Data.String (IsString (..))
import OpenTelemetry.Trace (Tracer)
import Protolude

data AuthResult = AuthResult
Expand Down Expand Up @@ -107,19 +109,21 @@ data AppState = AppState
, stateSocketREST :: NS.Socket
-- | Network socket for the admin UI
, stateSocketAdmin :: Maybe NS.Socket
-- | OpenTelemetry tracer
, oTelTracer :: Tracer
}

type AppSockets = (NS.Socket, Maybe NS.Socket)

init :: AppConfig -> (Observation -> IO ()) -> IO AppState
init conf observer = do
init :: AppConfig -> Tracer -> (Observation -> IO ()) -> IO AppState
init conf tracer observer = do
pool <- initPool conf
(sock, adminSock) <- initSockets conf
state' <- initWithPool (sock, adminSock) pool conf observer
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock }
state' <- initWithPool (sock, adminSock) pool tracer conf observer
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}

initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> (Observation -> IO() ) -> IO AppState
initWithPool (sock, adminSock) pool conf observer = do
initWithPool :: AppSockets -> SQL.Pool -> Tracer -> AppConfig -> (Observation -> IO() ) -> IO AppState
initWithPool (sock, adminSock) pool tracer conf observer = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
Expand All @@ -134,6 +138,7 @@ initWithPool (sock, adminSock) pool conf observer = do
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock
<*> pure tracer


debPoolTimeout <-
Expand Down Expand Up @@ -263,6 +268,9 @@ getSocketREST = stateSocketREST
getSocketAdmin :: AppState -> Maybe NS.Socket
getSocketAdmin = stateSocketAdmin

getOTelTracer :: AppState -> Tracer
getOTelTracer = oTelTracer

getMainThreadId :: AppState -> ThreadId
getMainThreadId = stateMainThreadId

Expand Down
14 changes: 7 additions & 7 deletions src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import qualified Options.Applicative as O
import Data.Text.IO (hPutStrLn)
import Text.Heredoc (str)

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)
import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.OpenTelemetry (withTracer)
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)

import qualified PostgREST.App as App
import qualified PostgREST.AppState as AppState
Expand All @@ -29,9 +30,8 @@ import qualified PostgREST.Logger as Logger

import Protolude hiding (hPutStrLn)


main :: CLI -> IO ()
main CLI{cliCommand, cliPath} = do
main CLI{cliCommand, cliPath} = withTracer "PostgREST" $ \tracer -> do
conf@AppConfig{..} <-
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty mempty

Expand All @@ -40,7 +40,7 @@ main CLI{cliCommand, cliPath} = do
-- explicitly close the connections to PostgreSQL on shutdown.
-- 'AppState.destroy' takes care of that.
bracket
(AppState.init conf $ Logger.logObservation loggerState)
(AppState.init conf tracer $ Logger.logObservation loggerState)
AppState.destroy
(\appState -> case cliCommand of
CmdDumpConfig -> do
Expand Down
16 changes: 16 additions & 0 deletions src/PostgREST/OpenTelemetry.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module PostgREST.OpenTelemetry (withTracer) where

import OpenTelemetry.Trace (InstrumentationLibrary (..), Tracer,
initializeGlobalTracerProvider,
makeTracer, shutdownTracerProvider,
tracerOptions)
import PostgREST.Version (prettyVersion)
import Protolude

withTracer :: Text -> (Tracer -> IO c) -> IO c
withTracer label f = bracket
initializeGlobalTracerProvider
shutdownTracerProvider
(\tracerProvider -> f $ makeTracer tracerProvider instrumentationLibrary tracerOptions)
where
instrumentationLibrary = InstrumentationLibrary {libraryName = label, libraryVersion = decodeUtf8 prettyVersion}

0 comments on commit 94d2b9b

Please sign in to comment.