Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: is ready Admin logic to AppState #3491

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ module PostgREST.Admin
) where

import qualified Data.Aeson as JSON
import qualified Hasql.Session as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
Expand All @@ -28,28 +27,25 @@ import qualified PostgREST.Config as Config

import Protolude

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings =
runAdmin :: AppState -> Warp.Settings -> IO ()
runAdmin appState settings = do
AppConfig{configAdminServerPort} <- AppState.getConfig appState
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
observer $ AdminStartObs configAdminServerPort
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState conf
adminApp = admin appState
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
admin :: AppState.AppState -> Wai.Application
admin appState req respond = do
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
isSchemaCacheLoaded <- AppState.getSchemaCacheLoaded appState
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")
isLoaded <- AppState.isLoaded appState

case Wai.pathInfo req of
["ready"] ->
respond $ Wai.responseLBS (if isMainAppReachable && isConnectionUp && isSchemaCacheLoaded then HTTP.status200 else HTTP.status503) [] mempty
respond $ Wai.responseLBS (if isMainAppReachable && isLoaded then HTTP.status200 else HTTP.status503) [] mempty
["live"] ->
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status503) [] mempty
["config"] -> do
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ run appState = do
-- reload schema cache + config on NOTIFY
AppState.runListener conf appState

Admin.runAdmin conf appState (serverSettings conf)
Admin.runAdmin appState (serverSettings conf)

let app = postgrest configLogLevel appState (AppState.connectionWorker appState)

Expand Down
110 changes: 60 additions & 50 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ module PostgREST.AppState
, destroy
, getConfig
, getSchemaCache
, getIsListenerOn
, getMainThreadId
, getPgVersion
, getRetryNextIn
, getTime
, getJwtCache
, getSocketREST
, getSocketAdmin
, getSchemaCacheLoaded
, init
, initSockets
, initWithPool
Expand All @@ -28,6 +26,7 @@ module PostgREST.AppState
, connectionWorker
, runListener
, getObserver
, isLoaded
) where

import qualified Data.Aeson as JSON
Expand Down Expand Up @@ -90,14 +89,14 @@ data AppState = AppState
, statePgVersion :: IORef PgVersion
-- | No schema cache at the start. Will be filled in by the connectionWorker
, stateSchemaCache :: IORef (Maybe SchemaCache)
-- | If schema cache is loaded
, stateSchemaCacheLoaded :: IORef Bool
-- | The schema cache status
, stateSCacheStatus :: IORef SchemaCacheStatus
-- | The connection status
, stateConnStatus :: IORef ConnectionStatus
-- | starts the connection worker with a debounce
, debouncedConnectionWorker :: IO ()
-- | Binary semaphore used to sync the listener(NOTIFY reload) with the connectionWorker.
, stateListener :: MVar ()
-- | State of the LISTEN channel, used for the admin server checks
, stateIsListenerOn :: IORef Bool
-- | Config that can change at runtime
, stateConf :: IORef AppConfig
-- | Time used for verifying JWT expiration
Expand All @@ -118,6 +117,20 @@ data AppState = AppState
, stateMetrics :: Metrics.MetricsState
}

-- | Schema cache status
data SchemaCacheStatus
= SCLoaded
| SCPending
| SCFatalFail
deriving Eq

-- | Current database connection status
data ConnectionStatus
= ConnEstablished
| ConnPending
| ConnFatalFail Text
deriving Eq

type AppSockets = (NS.Socket, Maybe NS.Socket)


Expand All @@ -138,10 +151,10 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
<*> newIORef False
<*> newIORef SCPending
<*> newIORef ConnPending
<*> pure (pure ())
<*> newEmptyMVar
<*> newIORef False
<*> newIORef conf
<*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime }
<*> myThreadId
Expand Down Expand Up @@ -286,29 +299,33 @@ waitListener = takeMVar . stateListener
signalListener :: AppState -> IO ()
signalListener appState = void $ tryPutMVar (stateListener appState) ()

getIsListenerOn :: AppState -> IO Bool
getIsListenerOn = readIORef . stateIsListenerOn
isConnEstablished :: AppState -> IO Bool
isConnEstablished x = do
conf <- getConfig x
if configDbChannelEnabled conf
then do -- if the listener is enabled, we can be sure the connection status is always up to date
st <- readIORef $ stateConnStatus x
return $ st == ConnEstablished
else -- otherwise the only way to check the connection is to make a query
isRight <$> usePool x (SQL.sql "SELECT 1")

putIsListenerOn :: AppState -> Bool -> IO ()
putIsListenerOn = atomicWriteIORef . stateIsListenerOn
isLoaded :: AppState -> IO Bool
isLoaded x = do
scacheStatus <- readIORef $ stateSCacheStatus x
connEstablished <- isConnEstablished x
return $ scacheStatus == SCLoaded && connEstablished

getSchemaCacheLoaded :: AppState -> IO Bool
getSchemaCacheLoaded = readIORef . stateSchemaCacheLoaded
putSCacheStatus :: AppState -> SchemaCacheStatus -> IO ()
putSCacheStatus = atomicWriteIORef . stateSCacheStatus

putSchemaCacheLoaded :: AppState -> Bool -> IO ()
putSchemaCacheLoaded = atomicWriteIORef . stateSchemaCacheLoaded
putConnStatus :: AppState -> ConnectionStatus -> IO ()
putConnStatus = atomicWriteIORef . stateConnStatus

getObserver :: AppState -> ObservationHandler
getObserver = stateObserver

-- | Schema cache status
data SCacheStatus
= SCLoaded
| SCOnRetry
| SCFatalFail

-- | Load the SchemaCache by using a connection from the pool.
loadSchemaCache :: AppState -> IO SCacheStatus
loadSchemaCache :: AppState -> IO SchemaCacheStatus
loadSchemaCache appState@AppState{stateObserver=observer} = do
conf@AppConfig{..} <- getConfig appState
(resultTime, result) <-
Expand All @@ -323,24 +340,17 @@ loadSchemaCache appState@AppState{stateObserver=observer} = do
Nothing -> do
putSchemaCache appState Nothing
observer $ SchemaCacheNormalErrorObs e
putSchemaCacheLoaded appState False
return SCOnRetry
putSCacheStatus appState SCPending
return SCPending

Right sCache -> do
putSchemaCache appState $ Just sCache
observer $ SchemaCacheQueriedObs resultTime
(t, _) <- timeItT $ observer $ SchemaCacheSummaryObs $ showSummary sCache
observer $ SchemaCacheLoadedObs t
putSchemaCacheLoaded appState True
putSCacheStatus appState SCLoaded
return SCLoaded

-- | Current database connection status data ConnectionStatus
data ConnectionStatus
= NotConnected
| Connected PgVersion
| FatalConnectionError Text
deriving (Eq)

-- | The purpose of this worker is to obtain a healthy connection to pg and an
-- up-to-date schema cache(SchemaCache). This method is meant to be called
-- multiple times by the same thread, but does nothing if the previous
Expand All @@ -358,20 +368,19 @@ internalConnectionWorker appState@AppState{stateObserver=observer} = work
work = do
AppConfig{..} <- getConfig appState
observer DBConnectAttemptObs
connected <- establishConnection appState
case connected of
FatalConnectionError reason ->
connStatus <- establishConnection appState
case connStatus of
ConnFatalFail reason ->
-- Fatal error when connecting
observer (ExitFatalObs reason) >> killThread (getMainThreadId appState)
NotConnected ->
-- Unreachable because establishConnection will keep trying to connect, unless disable-recovery is turned on
ConnPending ->
unless configDbPoolAutomaticRecovery
$ observer ExitDBNoRecoveryObs >> killThread (getMainThreadId appState)
Connected actualPgVersion -> do
ConnEstablished -> do
-- Procede with initialization
putPgVersion appState actualPgVersion
when configDbChannelEnabled $
signalListener appState
actualPgVersion <- getPgVersion appState
observer (DBConnectedObs $ pgvFullName actualPgVersion)
-- this could be fail because the connection drops, but the loadSchemaCache will pick the error and retry again
-- We cannot retry after it fails immediately, because db-pre-config could have user errors. We just log the error and continue.
Expand All @@ -381,7 +390,7 @@ internalConnectionWorker appState@AppState{stateObserver=observer} = work
SCLoaded ->
-- do nothing and proceed if the load was successful
return ()
SCOnRetry ->
SCPending ->
-- retry reloading the schema cache
work
SCFatalFail ->
Expand Down Expand Up @@ -415,23 +424,26 @@ establishConnection appState@AppState{stateObserver=observer} =
observer $ ConnectionPgVersionErrorObs e
case checkIsFatal e of
Just reason ->
return $ FatalConnectionError reason
Nothing ->
return NotConnected
return $ ConnFatalFail reason
Nothing -> do
putConnStatus appState ConnPending
return ConnPending
Right version ->
if version < minimumPgVersion then
return . FatalConnectionError $
return . ConnFatalFail $
"Cannot run in this PostgreSQL version, PostgREST needs at least "
<> pgvName minimumPgVersion
else
return . Connected $ version
else do
putConnStatus appState ConnEstablished
putPgVersion appState version
return ConnEstablished

shouldRetry :: RetryStatus -> ConnectionStatus -> IO Bool
shouldRetry rs isConnSucc = do
AppConfig{..} <- getConfig appState
let
delay = fromMaybe 0 (rsPreviousDelay rs) `div` backoffMicroseconds
itShould = NotConnected == isConnSucc && configDbPoolAutomaticRecovery
itShould = ConnPending == isConnSucc && configDbPoolAutomaticRecovery
when itShould $ observer $ ConnectionRetryObs delay
when itShould $ putRetryNextIn appState delay
return itShould
Expand Down Expand Up @@ -503,7 +515,6 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
case dbOrError of
Right db -> do
observer $ DBListenerStart dbChannel
putIsListenerOn appState True
SQL.listen db $ SQL.toPgIdentifier dbChannel
SQL.waitForNotifications handleNotification db

Expand All @@ -517,7 +528,6 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
handleFinally dbChannel True err = do
-- if the thread dies, we try to recover
observer $ DBListenerFailRecoverObs True dbChannel err
putIsListenerOn appState False
-- assume the pool connection was also lost, call the connection worker
connectionWorker appState
-- retry the listener
Expand Down
Loading