Skip to content

Commit

Permalink
chore(electric): Load and clear client reconnection info per client (#…
Browse files Browse the repository at this point in the history
…1188)

This is a follow-up to
#1116, specifically
addressing [Ilia's
request](#1116 (comment)).

Client reconnection info is now reloaded from the database at client
connection time. This way we can ensure consistency between the data
cached in ETS and data stored in the database.
  • Loading branch information
alco committed May 6, 2024
1 parent b2031c4 commit f60ff0a
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 115 deletions.
21 changes: 17 additions & 4 deletions components/electric/lib/electric/postgres/repo/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Electric.Postgres.Repo.Client do
alias Electric.Replication.Connectors

@type row :: [term]
@type query_result :: {[String.t()], [row]}

@doc """
Execute the given function using a pooled DB connection.
Expand Down Expand Up @@ -51,11 +52,23 @@ defmodule Electric.Postgres.Repo.Client do
queries/statements on a single DB connection by wrapping them in an anonymous function and
passing it to `checkout_from_pool/2` or `pooled_transaction/2`.
"""
@spec query!(String.t(), [term]) :: {[String.t()], [row]}
def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
@spec query(String.t(), [term]) :: {:ok, query_result} | {:error, Exception.t()}
def query(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
true = Repo.checked_out?()

%Postgrex.Result{columns: columns, rows: rows} = Repo.query!(query_str, params)
{columns, rows}
with {:ok, %Postgrex.Result{columns: columns, rows: rows}} <- Repo.query(query_str, params) do
{:ok, {columns, rows}}
end
end

@doc """
Same as `query/2` but raises on invalid queries.
"""
@spec query!(String.t(), [term]) :: query_result
def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
case query(query_str, params) do
{:ok, result} -> result
{:error, exception} -> raise exception
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Electric.Replication.Postgres.Client do
@spec connect(Connectors.connection_opts()) ::
{:ok, connection :: pid()} | {:error, reason :: :epgsql.connect_error()}
def connect(conn_opts) do
Logger.debug("#{inspect(__MODULE__)}.connect(#{inspect(sanitize_conn_opts(conn_opts))})")
Logger.debug("Postgres.Client.connect(#{inspect(sanitize_conn_opts(conn_opts))})")

{%{ip_addr: ip_addr}, %{username: username, password: password} = epgsql_conn_opts} =
Connectors.pop_extraneous_conn_opts(conn_opts)
Expand All @@ -42,7 +42,7 @@ defmodule Electric.Replication.Postgres.Client do
end
end

Logger.info("#{inspect(__MODULE__)}.with_conn(#{inspect(sanitize_conn_opts(conn_opts))})")
Logger.info("Postgres.Client.with_conn(#{inspect(sanitize_conn_opts(conn_opts))})")

{:ok, conn} = :epgsql_sock.start_link()

Expand Down Expand Up @@ -149,7 +149,7 @@ defmodule Electric.Replication.Postgres.Client do
end

defp squery(conn, query) do
Logger.debug("#{__MODULE__}: #{query}")
Logger.debug("Postgres.Client: #{query}")
:epgsql.squery(conn, query)
end

Expand Down

0 comments on commit f60ff0a

Please sign in to comment.