Skip to content

Commit

Permalink
Merge pull request #117 from aerospike/CLIENT-1432_min_conns
Browse files Browse the repository at this point in the history
[Client-1432] creates minimum connections on client startup
  • Loading branch information
vmsachin committed Jun 22, 2023
2 parents 258ce59 + b5c1804 commit 68b17fa
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 17 deletions.
17 changes: 9 additions & 8 deletions lib/aerospike/cluster.rb
Expand Up @@ -23,13 +23,8 @@

module Aerospike
class Cluster
attr_reader :connection_timeout, :connection_queue_size, :user, :password
attr_reader :features, :tls_options
attr_reader :cluster_id, :aliases
attr_reader :cluster_name
attr_reader :client_policy
attr_accessor :rack_aware, :rack_id
attr_accessor :session_token, :session_expiration
attr_reader :connection_timeout, :connection_queue_size, :user, :password, :features, :tls_options, :cluster_id, :aliases, :cluster_name, :client_policy
attr_accessor :rack_aware, :rack_id, :session_token, :session_expiration

def initialize(policy, hosts)
@client_policy = policy
Expand Down Expand Up @@ -63,6 +58,10 @@ def initialize(policy, hosts)
end

initialize_tls_host_names(hosts) if tls_enabled?

if policy.min_connections_per_node > policy.max_connections_per_node
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR, "Invalid policy configuration: Minimum connections per node cannot be greater than maximum connections per node.")
end
end

def connect
Expand Down Expand Up @@ -584,7 +583,9 @@ def remove_alias(aliass)
end

def create_node(nv)
::Aerospike::Node.new(self, nv)
node = ::Aerospike::Node.new(self, nv)
node.fill_connection_pool_up_to(@client_policy.min_connections_per_node)
node
end

def create_connection(host)
Expand Down
13 changes: 12 additions & 1 deletion lib/aerospike/node.rb
Expand Up @@ -22,7 +22,7 @@
module Aerospike
class Node

attr_reader :reference_count, :responded, :name, :features, :cluster_name, :partition_generation, :rebalance_generation, :peers_generation, :failures, :cluster, :peers_count, :host
attr_reader :reference_count, :responded, :name, :features, :cluster_name, :partition_generation, :rebalance_generation, :peers_generation, :failures, :cluster, :peers_count, :host, :connections

PARTITIONS = 4096
FULL_HEALTH = 100
Expand Down Expand Up @@ -69,6 +69,17 @@ def has_rack(ns, rack_id)
racks[ns] == rack_id
end

def fill_connection_pool_up_to(min_connection_size)
current_number_of_connections = @connections.length
if min_connection_size > 0
while current_number_of_connections < min_connection_size
conn = @connections.create
@connections.offer(conn)
current_number_of_connections += 1
end
end
end

# Get a connection to the node. If no cached connection is not available,
# a new connection will be created
def get_connection(timeout)
Expand Down
22 changes: 21 additions & 1 deletion lib/aerospike/policy/client_policy.rb
Expand Up @@ -23,7 +23,7 @@ module Aerospike
class ClientPolicy

attr_accessor :user, :password, :auth_mode
attr_accessor :timeout, :connection_queue_size, :fail_if_not_connected, :tend_interval
attr_accessor :timeout, :connection_queue_size, :fail_if_not_connected, :tend_interval, :max_connections_per_node, :min_connections_per_node
attr_accessor :cluster_name
attr_accessor :tls
attr_accessor :policies
Expand Down Expand Up @@ -74,6 +74,26 @@ def initialize(opt={})
# ClientPolicy#rack_aware, Replica#PREFER_RACK and server rack
# configuration must also be set to enable this functionality.
@rack_id = opt[:rack_id] || 0

# Maximum number of synchronous connections allowed per server node. Transactions will go
# through retry logic and potentially fail with "ResultCode.NO_MORE_CONNECTIONS" if the maximum
# number of connections would be exceeded.
# The number of connections used per node depends on concurrent commands in progress
# plus sub-commands used for parallel multi-node commands (batch, scan, and query).
# One connection will be used for each command.
# Default: 100
@max_connections_per_node = opt[:max_connections_per_node] || 100

# MinConnectionsPerNode specifies the minimum number of synchronous connections allowed per server node.
# Preallocate min connections on client node creation.
# The client will periodically allocate new connections if count falls below min connections.
#
# Server proto-fd-idle-ms may also need to be increased substantially if min connections are defined.
# The proto-fd-idle-ms default directs the server to close connections that are idle for 60 seconds
# which can defeat the purpose of keeping connections in reserve for a future burst of activity.
#
# Default: 0
@min_connections_per_node = opt[:min_connections_per_node] || 0
end

def requires_authentication
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/utils/pool.rb
Expand Up @@ -18,7 +18,7 @@ module Aerospike

class Pool #:nodoc:

attr_accessor :create_proc, :cleanup_proc, :check_proc
attr_accessor :create_proc, :cleanup_proc, :check_proc, :max_size

def initialize(max_size = 256, &block)
@create_proc = block
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/version.rb
@@ -1,4 +1,4 @@
# encoding: utf-8
module Aerospike
VERSION = "2.27.0"
VERSION = "2.28.0"
end
27 changes: 23 additions & 4 deletions spec/aerospike/cluster_spec.rb
Expand Up @@ -18,21 +18,39 @@
# the License.

RSpec.describe Aerospike::Cluster do
let(:policy) { spy(min_connections_per_node: 10, max_connections_per_node: 20) }
let(:instance) { described_class.new(policy, hosts) }
let(:policy) { spy }
let(:hosts) { [] }

describe '#create_node' do
let(:nv) { double('nv') }
let(:node) { instance_double(Aerospike::Node) }

before do
allow(Aerospike::Node).to receive(:new).with(instance, nv).and_return(node)
allow(node).to receive(:fill_connection_pool_up_to).with(policy)
end

it 'creates a new node and calls create_min_connections' do
expect(Aerospike::Node).to receive(:new).with(instance, nv).and_return(node)
expect(node).to receive(:fill_connection_pool_up_to)
new_node = instance.create_node(nv)
expect(new_node).to eq(node)
end

end

describe '#refresh_nodes' do
subject(:refresh_nodes) { instance.refresh_nodes }
let!(:peers) { ::Aerospike::Peers.new }
let!(:peers) { Aerospike::Peers.new }
let(:node) { spy }
let(:node_generation_changed) { false }
let(:generation_changed) { false }
let(:nodes_to_remove) { [] }
let(:peer_nodes) { {} }

before do
allow(::Aerospike::Peers).to receive(:new).and_return(peers)
allow(Aerospike::Peers).to receive(:new).and_return(peers)
allow(instance).to receive(:nodes).and_return(nodes)
allow(instance).to receive(:add_nodes)
allow(instance).to receive(:remove_nodes)
Expand All @@ -42,6 +60,7 @@
allow(node).to receive(:refresh_reset)
allow(node).to receive(:refresh_peers)
allow(node).to receive(:refresh_partitions)
allow(node).to receive(:create_min_connection)
allow(node.partition_generation).to receive(:changed?).and_return(node_generation_changed)
allow(peers).to receive(:generation_changed?).and_return(generation_changed)
allow(peers).to receive(:reset_refresh_count!)
Expand Down Expand Up @@ -76,7 +95,7 @@

it { expect(node).to have_received(:refresh_info).twice.with(peers) }
it { expect(node).to have_received(:refresh_peers).twice.with(peers) }
it { expect(node).not_to have_received(:refresh_partitions)}
it { expect(node).not_to have_received(:refresh_partitions) }
it { expect(instance).to have_received(:find_nodes_to_remove).with(peers.refresh_count) }
it { expect(peers).to have_received(:reset_refresh_count!) }
end
Expand Down
33 changes: 32 additions & 1 deletion spec/aerospike/node_spec.rb
Expand Up @@ -19,10 +19,41 @@

RSpec.describe Aerospike::Node do
let(:cluster) { spy }
let(:instance) { described_class.new(cluster, nv) }
let(:nv) { spy }
let(:instance) { described_class.new(cluster, nv) }
let(:peers) { double }

describe '#create_min_connections' do
subject(:connections) { instance.connections }
before do
allow(cluster).to receive(:min_connections_per_node).and_return(10)
allow(connections).to receive(:length).and_return(current_number_of_connections)
allow(connections).to receive(:create)
allow(connections).to receive(:offer)
allow(instance).to receive(:connections).and_return(connections)
end

context 'when current connections is less than minimum connections' do
let(:current_number_of_connections) { 5 }

it 'creates the expected number of minimum connections' do
instance.fill_connection_pool_up_to(10)
expect(connections).to have_received(:create).exactly(5).times
expect(connections).to have_received(:offer).exactly(5).times
end
end

context 'when current connections is equal to minimum connections' do
let(:current_number_of_connections) { 10 }

it 'does not create any additional connections' do
expect(connections).not_to have_received(:create)
expect(connections).not_to have_received(:offer)
end
end
end


describe '#failed!' do
subject(:failed!) { instance.failed! }

Expand Down

0 comments on commit 68b17fa

Please sign in to comment.