Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: force listener to work on primary cluster #3462

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- #3478, Media Types are parsed case insensitively - @develop7
- #3533, #3536, Fix listener silently failing on read replica - @steve-chavez
+ If the LISTEN connection fails, it's retried with exponential backoff
- #3414, Force listener to connect to read-write instances using `target_session_attrs` - @steve-chavez

### Deprecated

Expand Down
3 changes: 2 additions & 1 deletion src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import Data.Time.Clock (UTCTime, getCurrentTime)

import PostgREST.Config (AppConfig (..),
addFallbackAppName,
addTargetSessionAttrs,
readAppConfig)
import PostgREST.Config.Database (queryDbSettings,
queryPgVersion,
Expand Down Expand Up @@ -560,7 +561,7 @@ retryingListen appState@AppState{stateObserver=observer, stateMainThreadId=mainT
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs in
observer $ DBListenRetry delay

connection <- acquire $ toUtf8 (addFallbackAppName prettyVersion configDbUri)
connection <- acquire $ toUtf8 (addTargetSessionAttrs $ addFallbackAppName prettyVersion configDbUri)
case connection of
Right conn -> do

Expand Down
58 changes: 41 additions & 17 deletions src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module PostgREST.Config
, toURI
, parseSecret
, addFallbackAppName
, addTargetSessionAttrs
) where

import qualified Crypto.JOSE.Types as JOSE
Expand Down Expand Up @@ -480,6 +481,18 @@ readPGRSTEnvironment :: IO Environment
readPGRSTEnvironment =
M.map T.pack . M.fromList . filter (isPrefixOf "PGRST_" . fst) <$> getEnvironment

data PGConnString = PGURI | PGKeyVal

-- Uses same logic as libpq recognized_connection_string
-- https://github.com/postgres/postgres/blob/5eafacd2797dc0b04a0bde25fbf26bf79903e7c2/src/interfaces/libpq/fe-connect.c#L5923-L5936
pgConnString :: Text -> Maybe PGConnString
pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.isPrefixOf` conn = Just PGURI
| "=" `T.isInfixOf` conn = Just PGKeyVal
| otherwise = Nothing
where
uriDesignator = "postgresql://"
shortUriDesignator = "postgres://"

-- | Adds a `fallback_application_name` value to the connection string. This allows querying the PostgREST version on pg_stat_activity.
--
-- >>> let ver = "11.1.0 (5a04ec7)"::ByteString
Expand Down Expand Up @@ -519,7 +532,32 @@ readPGRSTEnvironment =
-- addFallbackAppName ver "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass"
-- "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass&fallback_application_name=PostgREST%2011.1.0%20%285a04ec7%29"
addFallbackAppName :: ByteString -> Text -> Text
addFallbackAppName version dbUri = dbUri <>
addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_application_name" pgrstVer
where
pgrstVer = "PostgREST " <> T.decodeUtf8 version

-- | Adds `target_session_attrs=read-write` to the connection string. This allows using PostgREST listener when multiple hosts are specified in the connection string.
--
-- >>> addTargetSessionAttrs "postgres:///postgres?host=/dir/0kN/socket_replica_24378,/dir/0kN/socket"
-- "postgres:///postgres?host=/dir/0kN/socket_replica_24378,/dir/0kN/socket&target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb"
-- "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb?fallback_application_name=foo"
-- "postgresql://host1:123,host2:456/somedb?fallback_application_name=foo&target_session_attrs=read-write"
--
-- adds target_session_attrs despite one existing
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-only"
-- "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-only&target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "host=localhost port=5432 dbname=postgres"
-- "host=localhost port=5432 dbname=postgres target_session_attrs='read-write'"
addTargetSessionAttrs :: Text -> Text
addTargetSessionAttrs dbUri = addConnStringOption dbUri "target_session_attrs" "read-write"

addConnStringOption :: Text -> Text -> Text -> Text
addConnStringOption dbUri key val = dbUri <>
case pgConnString dbUri of
Nothing -> mempty
Just PGKeyVal -> " " <> keyValFmt
Expand All @@ -528,20 +566,6 @@ addFallbackAppName version dbUri = dbUri <>
(_, "?") -> uriFmt
(_, _) -> "&" <> uriFmt
where
uriFmt = pKeyWord <> toS (escapeURIString isUnescapedInURIComponent $ toS pgrstVer)
keyValFmt = pKeyWord <> "'" <> T.replace "'" "\\'" pgrstVer <> "'"
pKeyWord = "fallback_application_name="
pgrstVer = "PostgREST " <> T.decodeUtf8 version
uriFmt = key <> "=" <> toS (escapeURIString isUnescapedInURIComponent $ toS val)
keyValFmt = key <> "=" <> "'" <> T.replace "'" "\\'" val <> "'"
lookAtOptions x = T.breakOn "?" . snd $ T.breakOnEnd "@" x -- start from after `@` to not mess passwords that include `?`, see https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS

data PGConnString = PGURI | PGKeyVal

-- Uses same logic as libpq recognized_connection_string
-- https://github.com/postgres/postgres/blob/5eafacd2797dc0b04a0bde25fbf26bf79903e7c2/src/interfaces/libpq/fe-connect.c#L5923-L5936
pgConnString :: Text -> Maybe PGConnString
pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.isPrefixOf` conn = Just PGURI
| "=" `T.isInfixOf` conn = Just PGKeyVal
| otherwise = Nothing
where
uriDesignator = "postgresql://"
shortUriDesignator = "postgres://"
2 changes: 1 addition & 1 deletion test/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def replicaenv(defaultenv):
"replica": {
**defaultenv,
**conf,
"PGHOST": os.environ["PGREPLICAHOST"],
"PGHOST": os.environ["PGREPLICAHOST"] + "," + os.environ["PGHOST"],
"PGREPLICASLOT": os.environ["PGREPLICASLOT"],
},
}
Expand Down
7 changes: 1 addition & 6 deletions test/io/test_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ def test_sanity_replica(replicaenv):
response = postgrest.session.get("/items?select=count")
assert response.text == '[{"count":10}]'

working_replica_env = {
**replicaenv["replica"],
"PGRST_DB_CHANNEL_ENABLED": "false", # LISTEN doesn't work on read replicas
}

with run(env=working_replica_env) as postgrest:
with run(env=replicaenv["replica"]) as postgrest:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LISTEN now works and this test proves it

response = postgrest.session.get("/rpc/is_replica")
assert response.text == "true"

Expand Down
Loading