Skip to content

Commit

Permalink
Clean up timeouts in the conn and add :max_concurrent_requests_per_co…
Browse files Browse the repository at this point in the history
…nnection (#358)
  • Loading branch information
whatyouhide committed Mar 6, 2024
1 parent e08907a commit bee3984
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 81 deletions.
14 changes: 14 additions & 0 deletions lib/xandra.ex
Expand Up @@ -392,6 +392,20 @@ defmodule Xandra do
*Available since v0.18.0*.
"""
],
max_concurrent_requests_per_connection: [
type: :pos_integer,
default: 100,
doc: """
The maximum number of requests that can be in flight at any given time on a single
connection. Xandra "multiplexes" requests on a single connection, since that is allowed
by the Cassandra protocol (via the use of stream IDs to identify in-flight requests on
a particular connection). Increasing this option means that a single connection will
handle more requests, so you can potentially lower the number of total connections in
your connection pool. However, the more requests are in flight on a single connection,
the more work that connection will have to do to decode and route requests and responses.
*Available since 0.19.0*.
"""
],
name: [
type: :any,
doc: """
Expand Down
181 changes: 100 additions & 81 deletions lib/xandra/connection.ex
Expand Up @@ -18,10 +18,15 @@ defmodule Xandra.Connection do
@behaviour :gen_statem

@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000

# How old a timed-out stream ID can be before we flush it.
@max_timed_out_stream_id_age_in_millisec :timer.minutes(5)

# How often to clean up timed-out requests.
@flush_timed_out_stream_id_interval_millisec :timer.seconds(30)

# This is the max stream ID value that we can use (a [short] in Cassandra).
@max_cassandra_stream_id 32_768
@timed_out_stream_id_timeout_minutes 5
@flush_timed_out_stream_id_interval 30 * 1000

# This record is used internally when we check out a "view" of the state of
# the connection. This holds all the necessary info to encode queries and more.
Expand Down Expand Up @@ -91,7 +96,7 @@ defmodule Xandra.Connection do
:telemetry.span([:xandra, :prepare_query], metadata, fn ->
with :ok <- send_prepare_frame(state, prepared, options),
{:ok, %Frame{} = frame} <-
receive_response_frame(conn_pid, req_alias, state, timeout, metadata) do
receive_response_frame(conn_pid, req_alias, state, timeout) do
case protocol_module.decode_response(frame, prepared, options) do
{%Prepared{} = prepared, warnings} ->
Prepared.Cache.insert(prepared_cache, prepared)
Expand Down Expand Up @@ -172,13 +177,7 @@ defmodule Xandra.Connection do
fun = fn ->
with :ok <- Transport.send(transport, payload),
{:ok, %Frame{} = frame} <-
receive_response_frame(
conn_pid,
req_alias,
checked_out_state,
timeout,
telemetry_meta
) do
receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do
case protocol_module.decode_response(frame, query, options) do
{%_{} = response, warnings} ->
maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings)
Expand Down Expand Up @@ -250,8 +249,7 @@ defmodule Xandra.Connection do
conn_pid,
req_alias,
checked_out_state(atom_keys?: atom_keys?, stream_id: stream_id),
timeout,
telemetry_metadata
timeout
) do
receive do
{^req_alias, {:ok, %Frame{} = frame}} ->
Expand All @@ -265,8 +263,7 @@ defmodule Xandra.Connection do
{:error, {:connection_crashed, reason}}
after
timeout ->
:telemetry.execute([:xandra, :client_timeout], %{}, telemetry_metadata)
:gen_statem.cast(conn_pid, {:timed_out_id, stream_id})
:gen_statem.cast(conn_pid, {:request_timed_out_at_caller, stream_id})
{:error, :timeout}
end
end
Expand All @@ -276,6 +273,11 @@ defmodule Xandra.Connection do
:gen_statem.call(conn, :get_transport)
end

# Made public for testing. Only meant to be used in tests.
def trigger_flush_timed_out_stream_ids(conn) do
:gen_statem.call(conn, :flush_timed_out_stream_ids)
end

## Data

# [short] - a 2-byte integer, which clients can only use as a *positive* integer (so
Expand All @@ -296,8 +298,9 @@ defmodule Xandra.Connection do
current_keyspace: String.t() | nil,
default_consistency: atom(),
disconnection_reason: term(),
max_concurrent_requests: pos_integer(),
in_flight_requests: %{optional(stream_id()) => term()},
timed_out_ids: %{optional(stream_id()) => DateTime.t()},
timed_out_ids: %{optional(stream_id()) => integer()},
options: keyword(),
original_options: keyword(),
peername: {:inet.ip_address(), :inet.port_number()},
Expand All @@ -319,6 +322,7 @@ defmodule Xandra.Connection do
:connection_name,
:default_consistency,
:disconnection_reason,
:max_concurrent_requests,
:options,
:original_options,
:peername,
Expand All @@ -344,7 +348,7 @@ defmodule Xandra.Connection do

actions = [
{:next_event, :internal, :connect},
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}
flush_timed_out_stream_ids_timeout_action()
]

{:ok, :disconnected, data, actions}
Expand All @@ -368,11 +372,22 @@ defmodule Xandra.Connection do
send_reply(req_alias, {:error, :disconnected})
end)

data = put_in(data.in_flight_requests, %{})
# Reset in-flight requests and timed out stream IDs. We just disconnected, so all the
# in-flight requests (including the timed-out ones) are now invalid and the server should
# have killed them anyway.
data = %__MODULE__{data | in_flight_requests: %{}, timed_out_ids: %{}}

if data.backoff do
{backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1)
{:keep_state, data, {{:timeout, :reconnect}, backoff_time, _content = nil}}

# Set a reconnection timer and cancel the timer that flushes timed out stream IDs,
# since we just emptied them.
actions = [
{{:timeout, :reconnect}, backoff_time, _content = nil},
{{:timeout, :flush_timed_out_stream_ids}, :infinity, nil}
]

{:keep_state, data, actions}
else
{:stop, reason}
end
Expand Down Expand Up @@ -408,6 +423,7 @@ defmodule Xandra.Connection do
address: address,
port: port,
connect_timeout: Keyword.fetch!(options, :connect_timeout),
max_concurrent_requests: Keyword.fetch!(options, :max_concurrent_requests_per_connection),
connection_name: Keyword.get(options, :name),
cluster_pid: Keyword.get(options, :cluster_pid),
protocol_version: data.protocol_version || Keyword.get(options, :protocol_version),
Expand Down Expand Up @@ -534,30 +550,11 @@ defmodule Xandra.Connection do
{:keep_state, data}
end

def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
# The caller notified the conn that, on its side, the request timed out. However,
# here we're disconnected so we really don't need to do anything as there are no
# in-flight requests or timed-out requests to clean up.
def disconnected(:cast, {:request_timed_out_at_caller, _stream_id}, %__MODULE__{}) do
:keep_state_and_data
end

## "Connected" state
Expand Down Expand Up @@ -592,13 +589,18 @@ defmodule Xandra.Connection do
end
end

def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{
in_flight_requests: in_flight_requests
})
when map_size(in_flight_requests) == @max_concurrent_requests do
# We reached the max number of in-flight requests, so we don't do anything and just
# return an error to the caller.
def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{} = data)
when map_size(data.in_flight_requests) == data.max_concurrent_requests do
{:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_requests}}}
end

# When we check out the state to a caller so that that caller can perform a request,
# we don't need to *monitor* that caller. This is because the caller is identified
# by an alias (Process.alias/1). Even if the caller dies, and only *after* C*
# sends us the corresponding response, we can still route that C* response to the
# alias and it'll not go anywhere and not cause any issues.
def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

Expand All @@ -622,10 +624,17 @@ defmodule Xandra.Connection do
{:keep_state, data, {:reply, from, {:ok, response}}}
end

# Only used in tests.
def connected({:call, from}, :get_transport, %__MODULE__{transport: transport}) do
{:keep_state_and_data, {:reply, from, {:ok, transport}}}
end

# Only used in tests.
def connected({:call, from}, :flush_timed_out_stream_ids, %__MODULE__{} = data) do
{:keep_state, data, actions} = connected({:timeout, :flush_timed_out_stream_ids}, nil, data)
{:keep_state, data, List.wrap(actions) ++ [{:reply, from, :ok}]}
end

def connected(:info, message, data) when is_data_message(data.transport, message) do
:ok = Transport.setopts(data.transport, active: :once)
{_mod, _socket, bytes} = message
Expand All @@ -651,30 +660,31 @@ defmodule Xandra.Connection do
{:keep_state, data}
end

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
# The caller is notifying the connection that the request (on stream_id) timed
# out on its side (that is, the caller reached its "after" in the receive block).
# We need to remove the stream ID from the in-flight requests but still keep
# track of it, because C* might still send us a response for that query (and when it
# does, we will throw it away and free the timed-out stream ID).
#
# C* does not support CANCELING queries, by the way, otherwise that's what we'd do here.
def connected(:cast, {:request_timed_out_at_caller, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

data = put_in(data.timed_out_ids[stream_id], System.system_time(:millisecond))
{:keep_state, data}
end

def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()
def connected({:timeout, :flush_timed_out_stream_ids}, _content, %__MODULE__{} = data) do
now = System.system_time(:millisecond)

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)
new_timed_out_ids =
for {id, ts} <- data.timed_out_ids,
now - ts < @max_timed_out_stream_id_age_in_millisec,
into: %{},
do: {id, ts}

data = %__MODULE__{data | timed_out_ids: new_timed_out_ids}

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
{:keep_state, data, flush_timed_out_stream_ids_timeout_action()}
end

## Helpers
Expand Down Expand Up @@ -757,25 +767,30 @@ defmodule Xandra.Connection do
end
end

defp handle_frame(%__MODULE__{} = data, %Frame{stream_id: stream_id} = frame) do
defp handle_frame(
%__MODULE__{timed_out_ids: timed_out_ids} = data,
%Frame{stream_id: stream_id} = frame
) do
case pop_in(data.in_flight_requests[stream_id]) do
{nil, data} ->
if Map.has_key?(data.timed_out_ids, stream_id) do
:telemetry.execute(
[:xandra, :timed_out_response],
telemetry_meta(data, %{stream_id: stream_id})
)
# There is no in-flight req for this response frame, BUT there is a request
# for it that timed out on the caller's side. Let's just emit a
{nil, data} when is_map_key(timed_out_ids, stream_id) ->
:telemetry.execute(
[:xandra, :debug, :received_timed_out_response],
%{},
telemetry_meta(data, %{stream_id: stream_id})
)

update_in(data.timed_out_ids, &Map.delete(&1, stream_id))
else
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
%__MODULE__{data | timed_out_ids: Map.delete(timed_out_ids, stream_id)}

#{inspect(frame)}
"""
end
{nil, _data} ->
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
#{inspect(frame)}
"""

{req_alias, data} ->
send_reply(req_alias, {:ok, frame})
Expand Down Expand Up @@ -901,4 +916,8 @@ defmodule Xandra.Connection do
random_id
end
end

defp flush_timed_out_stream_ids_timeout_action do
{{:timeout, :flush_timed_out_stream_ids}, @flush_timed_out_stream_id_interval_millisec, nil}
end
end
13 changes: 13 additions & 0 deletions lib/xandra/connection_error.ex
Expand Up @@ -75,6 +75,19 @@ defmodule Xandra.ConnectionError do
"request timeout"
end

defp format_reason(:too_many_concurrent_requests) do
"""
this connection has too many requests in flight; to fix this, consider:
1. increasing the size of the connection pool so that you'll have more
connections available and you'll be able to spread the request load
over more connections
2. increasing the maximum number of allowed in-flight requests per connection,
which can be configured through the :max_concurrent_requests_per_connection
option when starting connections (defaults to 100)
"""
end

defp format_reason({:cluster, :not_connected}) do
"not connected to any of the nodes"
end
Expand Down

0 comments on commit bee3984

Please sign in to comment.