Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add observation module #3232

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ library
PostgREST.Error
PostgREST.Logger
PostgREST.MediaType
PostgREST.Observation
PostgREST.Query
PostgREST.Query.QueryBuilder
PostgREST.Query.SqlFragment
Expand Down
20 changes: 10 additions & 10 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@ import qualified Data.ByteString.Lazy as LBS
import Network.Socket
import Network.Socket.ByteString

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.Observation (Observation (..))

import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config

import Protolude
import Protolude.Partial (fromJust)

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings =
runAdmin :: AppConfig -> AppState -> Warp.Settings -> (Observation -> IO ()) -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings observer =
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
AppState.logWithZTime appState $ "Admin server listening on port " <> show (fromIntegral (fromJust configAdminServerPort) :: Integer)
observer $ AdminStartObs configAdminServerPort
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState conf
adminApp = admin appState conf observer

-- | PostgREST admin application
admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
admin :: AppState.AppState -> AppConfig -> (Observation -> IO ()) -> Wai.Application
admin appState appConfig observer req respond = do
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
isSchemaCacheLoaded <- isJust <$> AppState.getSchemaCache appState
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState appConfig (SQL.sql "SELECT 1")
else isRight <$> AppState.usePool appState appConfig (SQL.sql "SELECT 1") observer

case Wai.pathInfo req of
["ready"] ->
Expand Down
47 changes: 25 additions & 22 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Observation (Observation (..))
import PostgREST.Query (DbHandler)
import PostgREST.Response.Performance (ServerTiming (..),
serverTimingHeader)
Expand All @@ -66,26 +67,26 @@ import System.TimeIt (timeItT)

type Handler = ExceptT Error

run :: AppState -> IO ()
run appState = do
AppState.logWithZTime appState $ "Starting PostgREST " <> T.decodeUtf8 prettyVersion <> "..."
run :: AppState -> (Observation -> IO ()) -> IO ()
run appState observer = do
observer $ AppStartObs prettyVersion

conf@AppConfig{..} <- AppState.getConfig appState
AppState.connectionWorker appState -- Loads the initial SchemaCache
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState)
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState observer)
-- reload schema cache + config on NOTIFY
AppState.runListener conf appState
AppState.runListener conf appState observer

Admin.runAdmin conf appState $ serverSettings conf
Admin.runAdmin conf appState (serverSettings conf) observer

let app = postgrest conf appState (AppState.connectionWorker appState)
let app = postgrest conf appState (AppState.connectionWorker appState) observer

what <- case configServerUnixSocket of
Just path -> pure $ "unix socket " <> show path
case configServerUnixSocket of
Just path -> do
observer $ AppServerUnixObs path
Nothing -> do
port <- NS.socketPort $ AppState.getSocketREST appState
pure $ "port " <> show port
AppState.logWithZTime appState $ "Listening on " <> what
observer $ AppServerPortObs port

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app

Expand All @@ -97,8 +98,8 @@ serverSettings AppConfig{..} =
& setServerName ("postgrest/" <> prettyVersion)

-- | PostgREST application
postgrest :: AppConfig -> AppState.AppState -> IO () -> Wai.Application
postgrest conf appState connWorker =
postgrest :: AppConfig -> AppState.AppState -> IO () -> (Observation -> IO ()) -> Wai.Application
postgrest conf appState connWorker observer =
traceHeaderMiddleware conf .
Cors.middleware (configServerCorsAllowedOrigins conf) .
Auth.middleware appState .
Expand All @@ -115,7 +116,7 @@ postgrest conf appState connWorker =
let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
Expand All @@ -134,8 +135,9 @@ postgrestResponse
-> PgVersion
-> AuthResult
-> Wai.Request
-> (Observation -> IO ())
-> Handler IO Wai.Response
postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@AuthResult{..} req = do
postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@AuthResult{..} req observer = do
sCache <-
case maybeSchemaCache of
Just sCache ->
Expand All @@ -151,22 +153,23 @@ postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@
ApiRequest.userApiRequest conf req body sCache

let jwtTime = if configServerTimingEnabled then Auth.getJwtDur req else Nothing
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache jwtTime parseTime
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache jwtTime parseTime observer

runDbHandler :: AppState.AppState -> AppConfig -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> DbHandler b -> Handler IO b
runDbHandler appState config isoLvl mode authenticated prepared handler = do
runDbHandler :: AppState.AppState -> AppConfig -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> (Observation -> IO ()) -> DbHandler b -> Handler IO b
runDbHandler appState config isoLvl mode authenticated prepared observer handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState config . transaction isoLvl mode $ runExceptT handler
AppState.usePool appState config (transaction isoLvl mode $ runExceptT handler) observer

resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp

liftEither resp

handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache -> Maybe Double -> Maybe Double -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime =
handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache ->
Maybe Double -> Maybe Double -> (Observation -> IO ()) -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime observer =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
Expand Down Expand Up @@ -231,7 +234,7 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A
roleSettings = fromMaybe mempty (HM.lookup authRole $ configRoleSettings conf)
roleIsoLvl = HM.findWithDefault SQL.ReadCommitted authRole $ configRoleIsoLvl conf
runQuery isoLvl funcSets mode query =
runDbHandler appState conf isoLvl mode authenticated prepared $ do
runDbHandler appState conf isoLvl mode authenticated prepared observer $ do
Query.setPgLocals conf authClaims authRole (HM.toList roleSettings) funcSets apiReq
Query.runPreReq conf
query
Expand Down
Loading
Loading