Skip to content

Commit

Permalink
scratch
Browse files Browse the repository at this point in the history
rewrite over

feat: OTel rewrite w/o unliftio
  • Loading branch information
develop7 committed Feb 12, 2024
1 parent b22bb74 commit 586e7a1
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 71 deletions.
3 changes: 3 additions & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ library
PostgREST.Query.QueryBuilder
PostgREST.Query.SqlFragment
PostgREST.Query.Statements
PostgREST.OpenTelemetry
PostgREST.Plan
PostgREST.Plan.CallPlan
PostgREST.Plan.MutatePlan
Expand Down Expand Up @@ -102,6 +103,8 @@ library
, hasql-transaction >= 1.0.1 && < 1.1
, heredoc >= 0.2 && < 0.3
, http-types >= 0.12.2 && < 0.13
, hs-opentelemetry-sdk >= 0.0.3.6 && < 0.0.4
, hs-opentelemetry-instrumentation-wai
, insert-ordered-containers >= 0.2.2 && < 0.3
, interpolatedstring-perl6 >= 1 && < 1.1
, jose >= 0.8.5.1 && < 0.12
Expand Down
116 changes: 62 additions & 54 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,27 @@ import qualified PostgREST.Unix as Unix (installSignalHandlers)

import PostgREST.ApiRequest (Action (..), ApiRequest (..),
Mutation (..), Target (..))
import PostgREST.AppState (AppState)
import PostgREST.AppState (AppState, getOTelTracer)
import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Error (Error (..))
import PostgREST.Query (DbHandler)
import PostgREST.Response.Performance (ServerTiming (..),
serverTimingHeader)
import PostgREST.SchemaCache (SchemaCache (..))
import PostgREST.SchemaCache.Routine (Routine (..))
import PostgREST.Version (docsVersion, prettyVersion)

import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import Protolude hiding (Handler)
import System.TimeIt (timeItT)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware)
import OpenTelemetry.Trace (defaultSpanArguments)
import PostgREST.OpenTelemetry (inSpan)
import Protolude hiding (Handler)
import System.TimeIt (timeItT)

type Handler = ExceptT Error

Expand All @@ -87,7 +90,9 @@ run appState = do
pure $ "port " <> show port
AppState.logWithZTime appState $ "Listening on " <> what

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app
oTelMWare <- newOpenTelemetryWaiMiddleware

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) (oTelMWare app)

serverSettings :: AppConfig -> Warp.Settings
serverSettings AppConfig{..} =
Expand All @@ -105,27 +110,28 @@ postgrest conf appState connWorker =
Logger.middleware (configLogLevel conf) $
-- fromJust can be used, because the auth middleware will **always** add
-- some AuthResult to the vault.
\req respond -> case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
return $ addRetryHint delay response
respond resp
\req respond -> inSpan (getOTelTracer appState) "PostgREST.postgrest" defaultSpanArguments $
case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
return $ addRetryHint delay response
respond resp

postgrestResponse
:: AppState.AppState
Expand Down Expand Up @@ -169,54 +175,54 @@ handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool ->
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
(planTime', wrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationCreate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationUpdate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationSingleUpsert, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationDelete, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInvoke invMethod, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdTimeout $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
(planTime', iPlan) <- withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withTiming $ runQuery roleIsoLvl mempty (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
(planTime', iPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInfo, TargetIdent identifier) -> do
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst

(ActionInfo, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' Nothing respTime') pgrst

(ActionInfo, TargetDefaultSpec _) -> do
Expand All @@ -241,6 +247,8 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A

withTiming = calcTiming $ configServerTimingEnabled conf

withOTel label = inSpan (getOTelTracer appState) label defaultSpanArguments

calcTiming :: Bool -> Handler IO a -> Handler IO (Maybe Double, a)
calcTiming timingEnabled f = if timingEnabled
then do
Expand Down
20 changes: 14 additions & 6 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module PostgREST.AppState
, getJwtCache
, getSocketREST
, getSocketAdmin
, getOTelTracer
, init
, initSockets
, initWithPool
Expand Down Expand Up @@ -78,6 +79,7 @@ import PostgREST.Unix (createAndBindDomainSocket)

import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
import Data.String (IsString (..))
import OpenTelemetry.Trace (Tracer)
import Protolude

data AuthResult = AuthResult
Expand Down Expand Up @@ -116,19 +118,21 @@ data AppState = AppState
, stateSocketREST :: NS.Socket
-- | Network socket for the admin UI
, stateSocketAdmin :: Maybe NS.Socket
-- | OpenTelemetry tracer
, oTelTracer :: Tracer
}

type AppSockets = (NS.Socket, Maybe NS.Socket)

init :: AppConfig -> IO AppState
init conf = do
init :: AppConfig -> Tracer -> IO AppState
init conf tracer = do
pool <- initPool conf
(sock, adminSock) <- initSockets conf
state' <- initWithPool (sock, adminSock) pool conf
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock }
state' <- initWithPool (sock, adminSock) pool tracer conf
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}

initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> IO AppState
initWithPool (sock, adminSock) pool conf = do
initWithPool :: AppSockets -> SQL.Pool -> Tracer -> AppConfig -> IO AppState
initWithPool (sock, adminSock) pool tracer conf = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
Expand All @@ -144,6 +148,7 @@ initWithPool (sock, adminSock) pool conf = do
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock
<*> pure tracer


debLogTimeout <-
Expand Down Expand Up @@ -272,6 +277,9 @@ getSocketREST = stateSocketREST
getSocketAdmin :: AppState -> Maybe NS.Socket
getSocketAdmin = stateSocketAdmin

getOTelTracer :: AppState -> Tracer
getOTelTracer = oTelTracer

-- | Log to stderr with local time
logWithZTime :: AppState -> Text -> IO ()
logWithZTime appState txt = do
Expand Down
14 changes: 7 additions & 7 deletions src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ import qualified Options.Applicative as O
import Data.Text.IO (hPutStrLn)
import Text.Heredoc (str)

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)
import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.OpenTelemetry (withTracer)
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)

import qualified PostgREST.App as App
import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config

import Protolude hiding (hPutStrLn)


main :: CLI -> IO ()
main CLI{cliCommand, cliPath} = do
main CLI{cliCommand, cliPath} = withTracer "PostgREST" $ \tracer -> do
conf@AppConfig{..} <-
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty mempty

-- Per https://github.com/PostgREST/postgrest/issues/268, we want to
-- explicitly close the connections to PostgreSQL on shutdown.
-- 'AppState.destroy' takes care of that.
bracket
(AppState.init conf)
(AppState.init conf tracer)
AppState.destroy
(\appState -> case cliCommand of
CmdDumpConfig -> do
Expand Down

0 comments on commit 586e7a1

Please sign in to comment.