Skip to content

Commit

Permalink
feat: force read-write for listener connection
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez committed May 20, 2024
1 parent 3cf5656 commit 7e61c9d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- #3478, Media Types are parsed case insensitively - @develop7
- #3533, #3536, Fix listener silently failing on read replica - @steve-chavez
+ If the LISTEN connection fails, it's retried with exponential backoff
- #3414, Force listener to connect to read-write instances using `target_session_attrs` - @steve-chavez

### Deprecated

Expand Down
3 changes: 2 additions & 1 deletion src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import Data.Time.Clock (UTCTime, getCurrentTime)

import PostgREST.Config (AppConfig (..),
addFallbackAppName,
addTargetSessionAttrs,
readAppConfig)
import PostgREST.Config.Database (queryDbSettings,
queryPgVersion,
Expand Down Expand Up @@ -560,7 +561,7 @@ retryingListen appState@AppState{stateObserver=observer, stateMainThreadId=mainT
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs in
observer $ DBListenRetry delay

connection <- acquire $ toUtf8 (addFallbackAppName prettyVersion configDbUri)
connection <- acquire $ toUtf8 (addTargetSessionAttrs $ addFallbackAppName prettyVersion configDbUri)
case connection of
Right conn -> do

Expand Down
58 changes: 41 additions & 17 deletions src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module PostgREST.Config
, toURI
, parseSecret
, addFallbackAppName
, addTargetSessionAttrs
) where

import qualified Crypto.JOSE.Types as JOSE
Expand Down Expand Up @@ -480,6 +481,18 @@ readPGRSTEnvironment :: IO Environment
readPGRSTEnvironment =
M.map T.pack . M.fromList . filter (isPrefixOf "PGRST_" . fst) <$> getEnvironment

data PGConnString = PGURI | PGKeyVal

-- Uses same logic as libpq recognized_connection_string
-- https://github.com/postgres/postgres/blob/5eafacd2797dc0b04a0bde25fbf26bf79903e7c2/src/interfaces/libpq/fe-connect.c#L5923-L5936
pgConnString :: Text -> Maybe PGConnString
pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.isPrefixOf` conn = Just PGURI
| "=" `T.isInfixOf` conn = Just PGKeyVal
| otherwise = Nothing
where
uriDesignator = "postgresql://"
shortUriDesignator = "postgres://"

-- | Adds a `fallback_application_name` value to the connection string. This allows querying the PostgREST version on pg_stat_activity.
--
-- >>> let ver = "11.1.0 (5a04ec7)"::ByteString
Expand Down Expand Up @@ -519,7 +532,32 @@ readPGRSTEnvironment =
-- addFallbackAppName ver "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass"
-- "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass&fallback_application_name=PostgREST%2011.1.0%20%285a04ec7%29"
addFallbackAppName :: ByteString -> Text -> Text
addFallbackAppName version dbUri = dbUri <>
addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_application_name" pgrstVer
where
pgrstVer = "PostgREST " <> T.decodeUtf8 version

-- | Adds `target_session_attrs=read-write` to the connection string. This allows using PostgREST listener when multiple hosts are specified in the connection string.
--
-- >>> addTargetSessionAttrs "postgres:///postgres?host=/dir/0kN/socket_replica_24378,/dir/0kN/socket"
-- "postgres:///postgres?host=/dir/0kN/socket_replica_24378,/dir/0kN/socket&target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb"
-- "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb?fallback_application_name=foo"
-- "postgresql://host1:123,host2:456/somedb?fallback_application_name=foo&target_session_attrs=read-write"
--
-- adds target_session_attrs despite one existing
-- >>> addTargetSessionAttrs "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-only"
-- "postgresql://host1:123,host2:456/somedb?target_session_attrs=read-only&target_session_attrs=read-write"
--
-- >>> addTargetSessionAttrs "host=localhost port=5432 dbname=postgres"
-- "host=localhost port=5432 dbname=postgres target_session_attrs='read-write'"
addTargetSessionAttrs :: Text -> Text
addTargetSessionAttrs dbUri = addConnStringOption dbUri "target_session_attrs" "read-write"

addConnStringOption :: Text -> Text -> Text -> Text
addConnStringOption dbUri key val = dbUri <>
case pgConnString dbUri of
Nothing -> mempty
Just PGKeyVal -> " " <> keyValFmt
Expand All @@ -528,20 +566,6 @@ addFallbackAppName version dbUri = dbUri <>
(_, "?") -> uriFmt
(_, _) -> "&" <> uriFmt
where
uriFmt = pKeyWord <> toS (escapeURIString isUnescapedInURIComponent $ toS pgrstVer)
keyValFmt = pKeyWord <> "'" <> T.replace "'" "\\'" pgrstVer <> "'"
pKeyWord = "fallback_application_name="
pgrstVer = "PostgREST " <> T.decodeUtf8 version
uriFmt = key <> "=" <> toS (escapeURIString isUnescapedInURIComponent $ toS val)
keyValFmt = key <> "=" <> "'" <> T.replace "'" "\\'" val <> "'"
lookAtOptions x = T.breakOn "?" . snd $ T.breakOnEnd "@" x -- start from after `@` to not mess passwords that include `?`, see https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS

data PGConnString = PGURI | PGKeyVal

-- Uses same logic as libpq recognized_connection_string
-- https://github.com/postgres/postgres/blob/5eafacd2797dc0b04a0bde25fbf26bf79903e7c2/src/interfaces/libpq/fe-connect.c#L5923-L5936
pgConnString :: Text -> Maybe PGConnString
pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.isPrefixOf` conn = Just PGURI
| "=" `T.isInfixOf` conn = Just PGKeyVal
| otherwise = Nothing
where
uriDesignator = "postgresql://"
shortUriDesignator = "postgres://"
2 changes: 1 addition & 1 deletion test/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def replicaenv(defaultenv):
"replica": {
**defaultenv,
**conf,
"PGHOST": os.environ["PGREPLICAHOST"],
"PGHOST": os.environ["PGREPLICAHOST"] + "," + os.environ["PGHOST"],
"PGREPLICASLOT": os.environ["PGREPLICASLOT"],
},
}
Expand Down
7 changes: 1 addition & 6 deletions test/io/test_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ def test_sanity_replica(replicaenv):
response = postgrest.session.get("/items?select=count")
assert response.text == '[{"count":10}]'

working_replica_env = {
**replicaenv["replica"],
"PGRST_DB_CHANNEL_ENABLED": "false", # LISTEN doesn't work on read replicas
}

with run(env=working_replica_env) as postgrest:
with run(env=replicaenv["replica"]) as postgrest:
response = postgrest.session.get("/rpc/is_replica")
assert response.text == "true"

Expand Down

0 comments on commit 7e61c9d

Please sign in to comment.