Skip to content

Commit

Permalink
Implement pipeline errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-volkov committed Apr 22, 2024
1 parent 5e8147b commit 8f5d6f7
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 72 deletions.
2 changes: 1 addition & 1 deletion hasql.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ library
text >=1 && <3,
text-builder >=0.6.7 && <0.7,
time >=1.9 && <2,
transformers >=0.3 && <0.7,
transformers >=0.6 && <0.7,
uuid >=1.3 && <2,
vector >=0.10 && <0.14,

Expand Down
16 changes: 8 additions & 8 deletions library/Hasql/Decoders/Results.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import Hasql.Prelude hiding (many, maybe)
import Hasql.Prelude qualified as Prelude

newtype Results a
= Results (ReaderT (Bool, LibPQ.Connection) (ExceptT QueryError IO) a)
= Results (ReaderT (Bool, LibPQ.Connection) (ExceptT CommandError IO) a)
deriving (Functor, Applicative, Monad)

{-# INLINE run #-}
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either QueryError a)
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either CommandError a)
run (Results stack) conn idt =
runExceptT (runReaderT stack (idt, conn))

Expand All @@ -32,7 +32,7 @@ clientError =
$ ReaderT
$ \(_, connection) ->
ExceptT
$ fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
$ fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)

-- |
-- Parse a single result.
Expand All @@ -45,9 +45,9 @@ single resultDec =
resultMaybe <- LibPQ.getResult connection
case resultMaybe of
Just result ->
mapLeft ResultQueryError <$> Result.run resultDec integerDatetimes result
mapLeft ResultCommandError <$> Result.run resultDec integerDatetimes result
Nothing ->
fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)

-- |
-- Fetch a single result.
Expand All @@ -60,7 +60,7 @@ getResult =
resultMaybe <- LibPQ.getResult connection
case resultMaybe of
Just result -> pure (Right result)
Nothing -> fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
Nothing -> fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)

-- |
-- Fetch a single result.
Expand All @@ -84,11 +84,11 @@ dropRemainders =
loop integerDatetimes connection <* checkErrors
where
checkErrors =
ExceptT $ fmap (mapLeft ResultQueryError) $ Result.run Result.noResult integerDatetimes result
ExceptT $ fmap (mapLeft ResultCommandError) $ Result.run Result.noResult integerDatetimes result

refine :: (a -> Either Text b) -> Results a -> Results b
refine refiner (Results stack) = Results
$ ReaderT
$ \env -> ExceptT $ do
resultEither <- runExceptT $ runReaderT stack env
return $ resultEither >>= mapLeft (ResultQueryError . UnexpectedResultError) . refiner
return $ resultEither >>= mapLeft (ResultCommandError . UnexpectedResultError) . refiner
56 changes: 32 additions & 24 deletions library/Hasql/Errors.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@ data SessionError
ByteString
-- | Parameters rendered as human-readable SQL literals.
[Text]
-- | Error details
QueryError
-- | Error details.
CommandError
| -- | Error during the execution of a pipeline.
PipelineSessionError
-- | Error details.
CommandError
deriving (Show, Eq, Typeable)

instance Exception SessionError where
displayException = \case
QuerySessionError query params commandError ->
let queryContext :: Maybe (ByteString, Int)
queryContext = case commandError of
ClientQueryError _ -> Nothing
ResultQueryError resultError -> case resultError of
ClientCommandError _ -> Nothing
ResultCommandError resultError -> case resultError of
ServerResultError _ message _ _ (Just position) -> Just (message, position)
_ -> Nothing

Expand Down Expand Up @@ -63,35 +67,39 @@ instance Exception SessionError where
<> "\n"
<> "\n Params: "
<> show params
<> "\n Error: "
<> case commandError of
ClientQueryError (Just message) -> "Client error: " <> show message
ClientQueryError Nothing -> "Client error without details"
ResultQueryError resultError -> case resultError of
ServerResultError code message details hint position ->
"Server error "
<> BC.unpack code
<> ": "
<> BC.unpack message
<> maybe "" (\d -> "\n Details: " <> BC.unpack d) details
<> maybe "" (\h -> "\n Hint: " <> BC.unpack h) hint
UnexpectedResultError message -> "Unexpected result: " <> show message
RowResultError row (ColumnRowError column rowError) ->
"Row error: " <> show row <> ":" <> show column <> " " <> show rowError
UnexpectedAmountOfRowsResultError amount ->
"Unexpected amount of rows: " <> show amount
<> "\n Reason: "
<> renderCommandErrorAsReason commandError
PipelineSessionError commandError ->
"PipelineSessionError!\n Reason: " <> renderCommandErrorAsReason commandError
where
renderCommandErrorAsReason = \case
ClientCommandError (Just message) -> "Client error: " <> show message
ClientCommandError Nothing -> "Client error without details"
ResultCommandError resultError -> case resultError of
ServerResultError code message details hint position ->
"Server error "
<> BC.unpack code
<> ": "
<> BC.unpack message
<> maybe "" (\d -> "\n Details: " <> BC.unpack d) details
<> maybe "" (\h -> "\n Hint: " <> BC.unpack h) hint
UnexpectedResultError message -> "Unexpected result: " <> show message
RowResultError row (ColumnRowError column rowError) ->
"Row error: " <> show row <> ":" <> show column <> " " <> show rowError
UnexpectedAmountOfRowsResultError amount ->
"Unexpected amount of rows: " <> show amount

-- |
-- An error of some command in the session.
data QueryError
data CommandError
= -- |
-- An error on the client-side,
-- with a message generated by the \"libpq\" driver.
-- Usually indicates problems with connection.
ClientQueryError (Maybe ByteString)
ClientCommandError (Maybe ByteString)
| -- |
-- Some error with a command result.
ResultQueryError ResultError
ResultCommandError ResultError
deriving (Show, Eq)

-- |
Expand Down
16 changes: 8 additions & 8 deletions library/Hasql/IO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ initConnection c =
void $ LibPQ.exec c (Commands.asBytes (Commands.setEncodersToUTF8 <> Commands.setMinClientMessagesToWarning))

{-# INLINE getResults #-}
getResults :: LibPQ.Connection -> Bool -> ResultsDecoders.Results a -> IO (Either QueryError a)
getResults :: LibPQ.Connection -> Bool -> ResultsDecoders.Results a -> IO (Either CommandError a)
getResults connection integerDatetimes decoder =
{-# SCC "getResults" #-}
(<*) <$> get <*> dropRemainders
Expand All @@ -72,7 +72,7 @@ getPreparedStatementKey ::
PreparedStatementRegistry.PreparedStatementRegistry ->
ByteString ->
[LibPQ.Oid] ->
IO (Either QueryError ByteString)
IO (Either CommandError ByteString)
getPreparedStatementKey connection registry template oidList =
{-# SCC "getPreparedStatementKey" #-}
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
Expand All @@ -96,10 +96,10 @@ getPreparedStatementKey connection registry template oidList =
pure (pure key)

{-# INLINE checkedSend #-}
checkedSend :: LibPQ.Connection -> IO Bool -> IO (Either QueryError ())
checkedSend :: LibPQ.Connection -> IO Bool -> IO (Either CommandError ())
checkedSend connection send =
send >>= \case
False -> fmap (Left . ClientQueryError) $ LibPQ.errorMessage connection
False -> fmap (Left . ClientCommandError) $ LibPQ.errorMessage connection
True -> pure (Right ())

{-# INLINE sendPreparedParametricStatement #-}
Expand All @@ -110,7 +110,7 @@ sendPreparedParametricStatement ::
ByteString ->
ParamsEncoders.Params a ->
a ->
IO (Either QueryError ())
IO (Either CommandError ())
sendPreparedParametricStatement connection registry integerDatetimes template encoder input =
runExceptT $ do
key <- ExceptT $ getPreparedStatementKey connection registry template oidList
Expand All @@ -126,7 +126,7 @@ sendUnpreparedParametricStatement ::
ByteString ->
ParamsEncoders.Params a ->
a ->
IO (Either QueryError ())
IO (Either CommandError ())
sendUnpreparedParametricStatement connection integerDatetimes template encoder input =
checkedSend connection
$ LibPQ.sendQueryParams
Expand All @@ -144,14 +144,14 @@ sendParametricStatement ::
ParamsEncoders.Params a ->
Bool ->
a ->
IO (Either QueryError ())
IO (Either CommandError ())
sendParametricStatement connection integerDatetimes registry template encoder prepared params =
{-# SCC "sendParametricStatement" #-}
if prepared
then sendPreparedParametricStatement connection registry integerDatetimes template encoder params
else sendUnpreparedParametricStatement connection integerDatetimes template encoder params

{-# INLINE sendNonparametricStatement #-}
sendNonparametricStatement :: LibPQ.Connection -> ByteString -> IO (Either QueryError ())
sendNonparametricStatement :: LibPQ.Connection -> ByteString -> IO (Either CommandError ())
sendNonparametricStatement connection sql =
checkedSend connection $ LibPQ.sendQuery connection sql
67 changes: 40 additions & 27 deletions library/Hasql/Pipeline/Core.hs
Original file line number Diff line number Diff line change
@@ -1,41 +1,54 @@
{-# OPTIONS_GHC -Wno-unused-imports -Wno-unused-binds #-}

module Hasql.Pipeline.Core where

import Database.PostgreSQL.LibPQ qualified as Pq
import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.IO qualified as IO
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Statement qualified as Statement

run :: Pipeline a -> Connection.Connection -> IO (Either SessionError a)
run (Pipeline send) (Connection.Connection pqConnectionRef integerDatetimes registry) =
withMVar pqConnectionRef \pqConnection -> do
runCommandFailing pqConnection $ Pq.enterPipelineMode pqConnection
sendResult <- send pqConnection registry integerDatetimes
case sendResult of
Left err -> do
pure (Left err)
Right recv -> do
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
recvResult <- recv
handleEither =<< Decoders.Results.run (Decoders.Results.single Decoders.Result.pipelineSync) pqConnection integerDatetimes
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
pure recvResult
run :: forall a. Pipeline a -> Pq.Connection -> PreparedStatementRegistry.PreparedStatementRegistry -> Bool -> IO (Either SessionError a)
run (Pipeline sendQueriesInIO) connection registry integerDatetimes = do
runExceptT do
enterPipelineMode
flip finallyE exitPipelineMode do
recvQueries <- sendQueries
pipelineSync
queriesResult <- recvQueries
recvPipelineSync
pure queriesResult
where
runCommandFailing :: Pq.Connection -> IO Bool -> IO ()
runCommandFailing pqConn runCmd =
IO.checkedSend pqConn runCmd >>= handleEither
handleEither = \case
Right a -> pure a
Left err -> fail $ show err
enterPipelineMode :: ExceptT SessionError IO ()
enterPipelineMode =
runCommand $ Pq.enterPipelineMode connection

exitPipelineMode :: ExceptT SessionError IO ()
exitPipelineMode =
runCommand $ Pq.exitPipelineMode connection

sendQueries :: ExceptT SessionError IO (ExceptT SessionError IO a)
sendQueries =
fmap ExceptT $ ExceptT $ sendQueriesInIO connection registry integerDatetimes

pipelineSync :: ExceptT SessionError IO ()
pipelineSync =
runCommand $ Pq.pipelineSync connection

recvPipelineSync :: ExceptT SessionError IO ()
recvPipelineSync =
ExceptT
$ fmap (mapLeft PipelineSessionError)
$ Decoders.Results.run (Decoders.Results.single Decoders.Result.pipelineSync) connection integerDatetimes

runCommand :: IO Bool -> ExceptT SessionError IO ()
runCommand action =
lift action >>= \case
True -> pure ()
False -> ExceptT (Left . PipelineSessionError . ClientCommandError <$> Pq.errorMessage connection)

newtype Pipeline a
= Pipeline
Expand Down Expand Up @@ -89,7 +102,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
else (False,) . Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
where
recv =
fmap (mapLeft commandToSessionError)
Expand All @@ -101,7 +114,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re

sendQuery key =
Pq.sendQueryPrepared connection key valueAndFormatList Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
False -> Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
Expand All @@ -112,7 +125,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re

runUnprepared =
Pq.sendQueryParams connection sql (Encoders.Params.compileUnpreparedStatementData encoder integerDatetimes params) Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
False -> Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
Expand Down
2 changes: 1 addition & 1 deletion library/Hasql/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import Control.Monad.Reader.Class as Exports (MonadReader (..))
import Control.Monad.ST as Exports
import Control.Monad.Trans.Class as Exports
import Control.Monad.Trans.Cont as Exports hiding (callCC, shift)
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, finallyE, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
import Control.Monad.Trans.Maybe as Exports
import Control.Monad.Trans.Reader as Exports (Reader, ReaderT (ReaderT), mapReader, mapReaderT, runReader, runReaderT, withReader, withReaderT)
import Control.Monad.Trans.State.Strict as Exports (State, StateT (StateT), evalState, evalStateT, execState, execStateT, mapState, mapStateT, runState, runStateT, withState, withStateT)
Expand Down
4 changes: 3 additions & 1 deletion library/Hasql/Session/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ statement input (Statement.Statement template (Encoders.Params paramsEncoder) (D

pipeline :: Pipeline.Pipeline result -> Session result
pipeline pipeline =
Session $ ReaderT $ ExceptT . Pipeline.run pipeline
Session $ ReaderT \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
ExceptT $ withMVar pqConnectionRef \pqConnection ->
Pipeline.run pipeline pqConnection registry integerDatetimes
2 changes: 1 addition & 1 deletion tasty/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ tree =
where
resultTest =
\case
Right (Left (Session.QuerySessionError _ _ (Session.ResultQueryError (Session.ServerResultError "26000" _ _ _ _)))) -> False
Right (Left (Session.QuerySessionError _ _ (Session.ResultCommandError (Session.ServerResultError "26000" _ _ _ _)))) -> False
_ -> True
session =
catchError session (const (pure ())) *> session
Expand Down
2 changes: 1 addition & 1 deletion testing-utils/Hasql/TestingUtils/TestingDsl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Hasql.TestingUtils.TestingDsl
( Session.Session,
Error (..),
Session.SessionError (..),
Session.QueryError (..),
Session.CommandError (..),
Pipeline.Pipeline,
Statement.Statement (..),
runSessionOnLocalDb,
Expand Down

0 comments on commit 8f5d6f7

Please sign in to comment.