Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(electric): Load and clear client reconnection info per client #1188

Merged
merged 10 commits into from
May 6, 2024
21 changes: 17 additions & 4 deletions components/electric/lib/electric/postgres/repo/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Electric.Postgres.Repo.Client do
alias Electric.Replication.Connectors

@type row :: [term]
@type query_result :: {[String.t()], [row]}

@doc """
Execute the given function using a pooled DB connection.
Expand Down Expand Up @@ -51,11 +52,23 @@ defmodule Electric.Postgres.Repo.Client do
queries/statements on a single DB connection by wrapping them in an anonymous function and
passing it to `checkout_from_pool/2` or `pooled_transaction/2`.
"""
@spec query!(String.t(), [term]) :: {[String.t()], [row]}
def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
@spec query(String.t(), [term]) :: {:ok, query_result} | {:error, Exception.t()}
def query(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
true = Repo.checked_out?()

%Postgrex.Result{columns: columns, rows: rows} = Repo.query!(query_str, params)
{columns, rows}
with {:ok, %Postgrex.Result{columns: columns, rows: rows}} <- Repo.query(query_str, params) do
{:ok, {columns, rows}}
end
end

@doc """
Same as `query/2` but raises on invalid queries.
"""
@spec query!(String.t(), [term]) :: query_result
def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
case query(query_str, params) do
{:ok, result} -> result
{:error, exception} -> raise exception
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Electric.Replication.Postgres.Client do
@spec connect(Connectors.connection_opts()) ::
{:ok, connection :: pid()} | {:error, reason :: :epgsql.connect_error()}
def connect(conn_opts) do
Logger.debug("#{inspect(__MODULE__)}.connect(#{inspect(sanitize_conn_opts(conn_opts))})")
Logger.debug("Postgres.Client.connect(#{inspect(sanitize_conn_opts(conn_opts))})")

{%{ip_addr: ip_addr}, %{username: username, password: password} = epgsql_conn_opts} =
Connectors.pop_extraneous_conn_opts(conn_opts)
Expand All @@ -42,7 +42,7 @@ defmodule Electric.Replication.Postgres.Client do
end
end

Logger.info("#{inspect(__MODULE__)}.with_conn(#{inspect(sanitize_conn_opts(conn_opts))})")
Logger.info("Postgres.Client.with_conn(#{inspect(sanitize_conn_opts(conn_opts))})")

{:ok, conn} = :epgsql_sock.start_link()

Expand Down Expand Up @@ -149,7 +149,7 @@ defmodule Electric.Replication.Postgres.Client do
end

defp squery(conn, query) do
Logger.debug("#{__MODULE__}: #{query}")
Logger.debug("Postgres.Client: #{query}")
:epgsql.squery(conn, query)
end

Expand Down