Skip to content

Commit

Permalink
some refactoring within Main
Browse files Browse the repository at this point in the history
  • Loading branch information
monacoremo committed Apr 13, 2021
1 parent c2c7bbe commit 5da3cc7
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 143 deletions.
274 changes: 136 additions & 138 deletions main/Main.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}

module Main (main) where

Expand Down Expand Up @@ -36,7 +37,6 @@ import PostgREST.App (postgrest)
import PostgREST.CLI (CLI (..), Command (..),
readCLIShowHelp)
import PostgREST.Config (AppConfig (..), Environment,
configDbPoolTimeout',
dumpAppConfig, readAppConfig,
readDbUriFile, readSecretFile)
import PostgREST.DbStructure (DbStructure, getDbStructure,
Expand Down Expand Up @@ -76,14 +76,7 @@ data SCacheStatus
-- | This is where everything starts.
main :: IO ()
main = do
--
-- LineBuffering: the entire output buffer is flushed whenever a newline is
-- output, the buffer overflows, a hFlush is issued or the handle is closed
--
-- NoBuffering: output is written immediately and never stored in the buffer
hSetBuffering stdout LineBuffering
hSetBuffering stdin LineBuffering
hSetBuffering stderr NoBuffering
setBuffering

-- read PGRST_ env variables
env <- readEnvironment
Expand All @@ -94,138 +87,162 @@ main = do
-- build the 'AppConfig' from the config file path and env vars
pathEnvConf <- either panic identity <$> readAppConfig mempty env cliPath Nothing Nothing

-- FIXME: fullConfigReReader depends on exactly this value, not the one from conf. Why?
let jwtSecret = configJwtSecret pathEnvConf

-- read external files
dbUriFile <- readDbUriFile $ configDbUri pathEnvConf
secretFile <- readSecretFile $ configJwtSecret pathEnvConf

-- add the external files to AppConfig
conf <- either panic identity <$> readAppConfig mempty env cliPath dbUriFile secretFile
conf@AppConfig{..} <- either panic identity <$> readAppConfig mempty env cliPath dbUriFile secretFile

-- These are config values that can't be reloaded at runtime. Reloading some of them would imply restarting the web server.
let
host = configServerHost conf
port = configServerPort conf
maybeSocketAddr = configServerUnixSocket conf
#ifndef mingw32_HOST_OS
socketFileMode = configServerUnixSocketMode conf
#endif
dbUri = toS (configDbUri conf)
(dbChannelEnabled, dbChannel) = (configDbChannelEnabled conf, toS $ configDbChannel conf)
serverSettings =
setHost ((fromString . toS) host) -- Warp settings
. setPort port
. setServerName (toS $ "postgrest/" <> prettyVersion) $
defaultSettings
poolSize = configDbPoolSize conf
poolTimeout = configDbPoolTimeout' conf
logLevel = configLogLevel conf
dbConfigEnabled = configDbConfig conf

-- create connection pool with the provided settings, returns either a 'Connection' or a 'ConnectionError'. Does not throw.
pool <- P.acquire (poolSize, poolTimeout, dbUri)

-- Used to sync the listener(NOTIFY reload) with the connectionWorker. No connection for the listener at first. Only used if dbChannelEnabled=true.
mvarConnectionStatus <- newEmptyMVar

-- No schema cache at the start. Will be filled in by the connectionWorker
refDbStructure <- newIORef Nothing

-- Helper ref to make sure just one connectionWorker can run at a time
refIsWorkerOn <- newIORef False

-- Config that can change at runtime
refConf <- newIORef conf
appState@AppState{..} <- initState conf

let
-- re-reads config file + db config
dbConfigReReader startingUp = when dbConfigEnabled $
reReadConfig startingUp pool dbConfigEnabled env cliPath refConf dbUriFile secretFile
dbConfigReReader startingUp = when configDbConfig $
reReadConfig startingUp statePool configDbConfig env cliPath stateConf dbUriFile secretFile

-- re-reads jwt-secret external file + config file + db config
fullConfigReReader =
reReadConfig False pool dbConfigEnabled env cliPath refConf
reReadConfig False statePool configDbConfig env cliPath stateConf
dbUriFile =<< -- db-uri external file could be re-read, but it doesn't make sense as db-uri is not reloadable
readSecretFile (configJwtSecret pathEnvConf)
readSecretFile jwtSecret

-- Override the config with config options from the db
-- TODO: the same operation is repeated on connectionWorker, ideally this would be done only once, but dump CmdDumpConfig needs it for tests.
-- TODO: the same operation is repeated on connectionWorker, ideally this
-- would be done only once, but dump CmdDumpConfig needs it for tests.
dbConfigReReader True

case cliCommand of
CmdDumpConfig ->
do
dumpedConfig <- dumpAppConfig <$> readIORef refConf
dumpedConfig <- dumpAppConfig <$> readIORef stateConf
putStr dumpedConfig
exitSuccess
CmdDumpSchema ->
do
dumpedSchema <- dumpSchema pool =<< readIORef refConf
dumpedSchema <- dumpSchema statePool =<< readIORef stateConf
putStrLn dumpedSchema
exitSuccess
CmdRun ->
pass

-- This is passed to the connectionWorker method so it can kill the main thread if the PostgreSQL's version is not supported.
mainTid <- myThreadId

let connWorker = connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEnabled, mvarConnectionStatus) $
dbConfigReReader False
let
connWorker =
connectionWorker appState configDbChannelEnabled (dbConfigReReader False) (killThread mainTid)

-- Sets the initial refDbStructure
connWorker

#ifndef mingw32_HOST_OS
-- Only for systems with signals:
--
-- releases the connection pool whenever the program is terminated,
-- see https://github.com/PostgREST/postgrest/issues/268
forM_ [sigINT, sigTERM] $ \sig ->
void $ installHandler sig (Catch $ do
P.release pool
throwTo mainTid UserInterrupt
) Nothing

-- The SIGUSR1 signal updates the internal 'DbStructure' by running 'connectionWorker' exactly as before.
void $ installHandler sigUSR1 (
Catch connWorker
) Nothing

-- Re-read the config on SIGUSR2
void $ installHandler sigUSR2 (
Catch fullConfigReReader
) Nothing
let releasePool = P.release statePool >> throwTo mainTid UserInterrupt
installHandlers $ SignalHandlers fullConfigReReader connWorker releasePool
#endif

-- reload schema cache + config on NOTIFY
when dbChannelEnabled $
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker fullConfigReReader
when configDbChannelEnabled $
listener (toS configDbUri) (toS configDbChannel) appState connWorker fullConfigReReader

-- ask for the OS time at most once per second
getTime <- mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}

let postgrestApplication =
postgrest
logLevel
refConf
refDbStructure
pool
getTime
connWorker
let
postgrestApplication =
postgrest configLogLevel stateConf stateDbStructure statePool getTime connWorker

serverSettings =
defaultSettings
& setHost (fromString $ toS configServerHost)
& setPort configServerPort
& setServerName (toS $ "postgrest/" <> prettyVersion)

#ifndef mingw32_HOST_OS
-- run the postgrest application with user defined socket. Only for UNIX systems.
whenJust maybeSocketAddr $
runAppInSocket serverSettings postgrestApplication socketFileMode
case configServerUnixSocket of
Just socket ->
#ifndef mingw32_HOST_OS
runAppInSocket serverSettings postgrestApplication configServerUnixSocketMode socket
#else
pass
#endif
Nothing ->
do
putStrLn $ ("Listening on port " :: Text) <> show configServerPort
runSettings serverSettings postgrestApplication

setBuffering :: IO ()
setBuffering = do
-- LineBuffering: the entire output buffer is flushed whenever a newline is
-- output, the buffer overflows, a hFlush is issued or the handle is closed
hSetBuffering stdout LineBuffering
hSetBuffering stdin LineBuffering
-- NoBuffering: output is written immediately and never stored in the buffer
hSetBuffering stderr NoBuffering


-- SIGNALS

#ifndef mingw32_HOST_OS
data SignalHandlers = SignalHandlers
{ fullConfigReReader :: IO ()
, connWorker :: IO ()
, releasePool :: IO ()
}

-- | Set signal handlers, only for systems with signals
installHandlers :: SignalHandlers -> IO ()
installHandlers SignalHandlers{..} = do
-- Releases the connection pool whenever the program is terminated,
-- see https://github.com/PostgREST/postgrest/issues/268
install sigINT releasePool
install sigTERM releasePool

-- The SIGUSR1 signal updates the internal 'DbStructure' by running
-- 'connectionWorker' exactly as before.
install sigUSR1 connWorker

-- Re-read the config on SIGUSR2
install sigUSR2 fullConfigReReader
where
install signal handler = void $ installHandler signal (Catch handler) Nothing
#endif

-- run the postgrest application
whenNothing maybeSocketAddr $ do
putStrLn $ ("Listening on port " :: Text) <> show port
runSettings serverSettings postgrestApplication

-- APP STATE

data AppState = AppState
{ statePool :: P.Pool
, stateConnectionStatus :: MVar ConnectionStatus
, stateDbStructure :: IORef (Maybe DbStructure)
, stateIsWorkerOn :: IORef Bool
, stateConf :: IORef AppConfig
}

initState :: AppConfig -> IO AppState
initState conf@AppConfig{..} = do
-- create connection pool with the provided settings, returns either a 'Connection' or a 'ConnectionError'. Does not throw.
pool <- P.acquire (configDbPoolSize, fromRational . toRational $ configDbPoolTimeout, toS configDbUri)

-- Used to sync the listener(NOTIFY reload) with the connectionWorker. No connection for the listener at first. Only used if dbChannelEnabled=true.
mvarConnectionStatus <- newEmptyMVar

-- No schema cache at the start. Will be filled in by the connectionWorker
refDbStructure <- newIORef Nothing

-- Helper ref to make sure just one connectionWorker can run at a time
refIsWorkerOn <- newIORef False

-- Config that can change at runtime
refConf <- newIORef conf

return $ AppState pool mvarConnectionStatus refDbStructure refIsWorkerOn refConf

readEnvironment :: IO Environment
readEnvironment = getEnvironment <&> pgrst
where
pgrst env = M.filterWithKey (\k _ -> "PGRST_" `isPrefixOf` k) $ M.map T.pack $ M.fromList env
readEnvironment =
M.map T.pack . M.fromList . filter (isPrefixOf "PGRST_" . fst) <$> getEnvironment

-- Time constants
_32s :: Int
Expand All @@ -249,38 +266,30 @@ _1s = 1000000 :: Int -- 1 second
program.
3. Obtains the dbStructure. If this fails, it goes back to 1.
-}
connectionWorker
:: ThreadId -- ^ Main thread id. Killed if pg version is unsupported
-> P.Pool -- ^ The pg connection pool
-> IORef AppConfig -- ^ mutable reference to AppConfig
-> IORef (Maybe DbStructure) -- ^ mutable reference to 'DbStructure'
-> IORef Bool -- ^ Used as a binary Semaphore
-> (Bool, MVar ConnectionStatus) -- ^ For interacting with the LISTEN channel
-> IO ()
-> IO ()
connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEnabled, mvarConnectionStatus) dbCfReader = do
isWorkerOn <- readIORef refIsWorkerOn
connectionWorker :: AppState -> Bool -> IO () -> IO () -> IO ()
connectionWorker AppState{..} dbChannelEnabled dbCfReader kill = do
isWorkerOn <- readIORef stateIsWorkerOn
unless isWorkerOn $ do -- Prevents multiple workers to be running at the same time. Could happen on too many SIGUSR1s.
atomicWriteIORef refIsWorkerOn True
atomicWriteIORef stateIsWorkerOn True
void $ forkIO work
where
work = do
putStrLn ("Attempting to connect to the database..." :: Text)
connected <- connectionStatus pool
connected <- connectionStatus statePool
when dbChannelEnabled $
void $ tryPutMVar mvarConnectionStatus connected -- tryPutMVar doesn't lock the thread. It should always succeed since the worker is the only mvar producer.
void $ tryPutMVar stateConnectionStatus connected -- tryPutMVar doesn't lock the thread. It should always succeed since the worker is the only mvar producer.
case connected of
FatalConnectionError reason -> hPutStrLn stderr reason >> killThread mainTid -- Fatal error when connecting
NotConnected -> return () -- Unreachable because connectionStatus will keep trying to connect
Connected actualPgVersion -> do -- Procede with initialization
FatalConnectionError reason -> hPutStrLn stderr reason >> kill -- Fatal error when connecting
NotConnected -> return () -- Unreachable because connectionStatus will keep trying to connect
Connected actualPgVersion -> do -- Procede with initialization
putStrLn ("Connection successful" :: Text)
dbCfReader -- this could be fail because the connection drops, but the loadSchemaCache will pick the error and retry again
scStatus <- loadSchemaCache pool actualPgVersion refConf refDbStructure
scStatus <- loadSchemaCache statePool actualPgVersion stateConf stateDbStructure
case scStatus of
SCLoaded -> pure () -- do nothing and proceed if the load was successful
SCOnRetry -> work -- retry
SCFatalFail -> killThread mainTid -- die if our schema cache query has an error
liftIO $ atomicWriteIORef refIsWorkerOn False
SCLoaded -> pure () -- do nothing and proceed if the load was successful
SCOnRetry -> work -- retry
SCFatalFail -> kill -- die if our schema cache query has an error
liftIO $ atomicWriteIORef stateIsWorkerOn False

{-|
Check if a connection from the pool allows access to the PostgreSQL database.
Expand Down Expand Up @@ -323,8 +332,10 @@ connectionStatus pool =
-- | Load the DbStructure by using a connection from the pool.
loadSchemaCache :: P.Pool -> PgVersion -> IORef AppConfig -> IORef (Maybe DbStructure) -> IO SCacheStatus
loadSchemaCache pool actualPgVersion refConf refDbStructure = do
conf <- readIORef refConf
result <- P.use pool $ HT.transaction HT.ReadCommitted HT.Read $ getDbStructure (toList $ configDbSchemas conf) (configDbExtraSearchPath conf) actualPgVersion (configDbPreparedStatements conf)
AppConfig{..} <- readIORef refConf
result <-
P.use pool . HT.transaction HT.ReadCommitted HT.Read $
getDbStructure (toList configDbSchemas) configDbExtraSearchPath actualPgVersion configDbPreparedStatements
case result of
Left e -> do
let err = PgError False e
Expand All @@ -349,19 +360,19 @@ loadSchemaCache pool actualPgVersion refConf refDbStructure = do
When a NOTIFY <db-channel> - with an empty payload - is done, it refills the schema cache.
It uses the connectionWorker in case the LISTEN connection dies.
-}
listener :: ByteString -> Text -> P.Pool -> IORef AppConfig -> IORef (Maybe DbStructure) -> MVar ConnectionStatus -> IO () -> IO () -> IO ()
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker configLoader = start
listener :: ByteString -> Text -> AppState -> IO () -> IO () -> IO ()
listener dbUri dbChannel AppState{..} connWorker configLoader = start
where
start = do
connStatus <- takeMVar mvarConnectionStatus -- takeMVar makes the thread wait if the MVar is empty(until there's a connection).
connStatus <- takeMVar stateConnectionStatus -- takeMVar makes the thread wait if the MVar is empty(until there's a connection).
case connStatus of
Connected actualPgVersion -> void $ forkFinally (do -- forkFinally allows to detect if the thread dies
dbOrError <- C.acquire dbUri
case dbOrError of
Right db -> do
putStrLn $ "Listening for notifications on the " <> dbChannel <> " channel"
let channelToListen = N.toPgIdentifier dbChannel
scLoader = void $ loadSchemaCache pool actualPgVersion refConf refDbStructure -- It's not necessary to check the loadSchemaCache success here. If the connection drops, the thread will die and proceed to recover below.
scLoader = void $ loadSchemaCache statePool actualPgVersion stateConf stateDbStructure -- It's not necessary to check the loadSchemaCache success here. If the connection drops, the thread will die and proceed to recover below.
N.listen db channelToListen
N.waitForNotifications (\_ msg ->
if | BS.null msg -> scLoader -- reload the schema cache
Expand Down Expand Up @@ -437,19 +448,6 @@ timeToStderr fmtString a =
hPrintf stderr (fmtString ++ "\n") duration
return result


-- | 10^12 picoseconds per second
picoseconds :: Double
picoseconds = 1000000000000


-- Utility functions.
#ifndef mingw32_HOST_OS
whenJust :: Applicative f => Maybe a -> (a -> f ()) -> f ()
whenJust (Just x) f = f x
whenJust Nothing _ = pass
#endif

whenNothing :: Applicative f => Maybe a -> f () -> f ()
whenNothing Nothing f = f
whenNothing _ _ = pass
Loading

0 comments on commit 5da3cc7

Please sign in to comment.