Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client-618] max connections enforced #114

Open
wants to merge 6 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ jobs:
test:
runs-on: ${{matrix.os}}-latest
continue-on-error: ${{matrix.experimental}}

strategy:
matrix:
os:
- ubuntu

ruby:
- 2.6
- 2.7

experimental: [false]
env: [""]

include:
- os: ubuntu
ruby: head
Expand All @@ -30,14 +30,14 @@ jobs:
with:
ruby-version: ${{matrix.ruby}}
bundler-cache: true

- name: Start server
timeout-minutes: 5
env:
TERM: dumb
run:
.github/workflows/start_cluster.sh 2

- name: Run tests
timeout-minutes: 30
env:
Expand Down
2 changes: 1 addition & 1 deletion aerospike.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Gem::Specification.new do |s|
s.name = "aerospike"
s.version = Aerospike::VERSION
s.platform = Gem::Platform::RUBY
s.authors = [ "Khosrow Afroozeh", "Jan Hecking" ]
s.authors = ["Khosrow Afroozeh", "Jan Hecking", "Sachin Venkatesha Murthy"]
s.email = [ "khosrow@aerospike.com", "jhecking@aerospike.com" ]
s.homepage = "http://www.github.com/aerospike/aerospike-client-ruby"
s.summary = "An Aerospike driver for Ruby."
Expand Down
6 changes: 6 additions & 0 deletions lib/aerospike/aerospike_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,11 @@ def initialize(msg=nil)
super(ResultCode::INVALID_NAMESPACE, msg)
end
end

class MaxConnectionsExceeded < Aerospike
def initialize(msg = nil)
super(ResultCode::MAX_CONNECTION_EXCEEDED, msg)
end
end
end
end
1 change: 0 additions & 1 deletion lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
policy = create_policy(options, Policy, default_info_policy)

node = @cluster.random_node
conn = node.get_connection(policy.timeout)

if set_name && !set_name.to_s.strip.empty?
str_cmd = "truncate:namespace=#{namespace}"
Expand Down
14 changes: 12 additions & 2 deletions lib/aerospike/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,24 @@ def update_partitions(parser)

def request_info(policy, *commands)
node = random_node
conn = node.get_connection(policy.timeout)
begin
conn = node.get_connection(policy.timeout)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
end

def request_node_info(node, policy, *commands)
conn = node.get_connection(policy.timeout)
begin
conn = node.get_connection(policy.timeout)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
Expand Down
15 changes: 6 additions & 9 deletions lib/aerospike/command/admin_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,13 @@ def execute_command(cluster, policy)
timeout = 1
timeout = policy.timeout if policy && policy.timeout > 0

conn = node.get_connection(timeout)

begin
conn = node.get_connection(timeout)
conn.write(@data_buffer, @data_offset)
conn.read(@data_buffer, HEADER_SIZE)
node.put_connection(conn)
rescue => e
conn.close if conn
node.close_connection(conn) if conn
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The better design should be adding a node attribute to the connection, adding the clean up code there if the node is set (node should be set after a successful connection is established to the database node), and then adding a self.finalize to the Connection class and setting it up in the initializer. That way, even a user takes a connection from the pool and forgets to put it back, the code will still work.

raise e
end

Expand All @@ -377,13 +376,11 @@ def read_users(cluster, policy)
status, list = read_user_blocks(conn)
node.put_connection(conn)
rescue => e
conn.close if conn
node.close_connection(conn) if conn
raise e
end

raise Exceptions::Aerospike.new(status) if status > 0

return list
list
end

def read_user_blocks(conn)
Expand Down Expand Up @@ -512,13 +509,13 @@ def read_roles(cluster, policy)
status, list = read_role_blocks(conn)
node.put_connection(conn)
rescue => e
conn.close if conn
node.close_connection(conn) if conn
raise e
end

raise Exceptions::Aerospike.new(status) if status > 0

return list
list
end

def read_role_blocks(conn)
Expand Down
9 changes: 6 additions & 3 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ def execute
@node = get_node
@conn = @node.get_connection(@policy.timeout)
rescue => e
if e.is_a?(Aerospike::Exceptions::MaxConnectionsExceeded)
Aerospike.logger.error("Maximum connections established. No new connection can be created. #{e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue. Logging without re-raising the exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not re-raised exception here as there is a retry loop here

end
if @node
# Socket connection error has occurred. Decrease health and retry.
@node.decrease_health
Expand All @@ -510,7 +513,7 @@ def execute

# All runtime exceptions are considered fatal. Do not retry.
# Close socket to flush out possible garbage. Do not put back in pool.
@conn.close if @conn
@node.close_connection(@conn) if @conn
raise e
end

Expand All @@ -523,7 +526,7 @@ def execute
rescue => e
# IO errors are considered temporary anomalies. Retry.
# Close socket to flush out possible garbage. Do not put back in pool.
@conn.close if @conn
@node.close_connection(@conn) if @conn

Aerospike.logger.error("Node #{@node.to_s}: #{e}")
# IO error means connection to server @node is unhealthy.
Expand All @@ -548,7 +551,7 @@ def execute
# cancelling/closing the batch/multi commands will return an error, which will
# close the connection to throw away its data and signal the server about the
# situation. We will not put back the connection in the buffer.
@conn.close if @conn
@node.close_connection(@conn) if @conn
raise e
end

Expand Down
4 changes: 2 additions & 2 deletions lib/aerospike/connection/authenticate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def call(conn, user, hashed_pass)
end
end
end

module AuthenticateNew
class << self
INVALID_SESSION_ERR = [ResultCode::INVALID_CREDENTIAL,
ResultCode::EXPIRED_SESSION]
ResultCode::EXPIRED_SESSION]

def call(conn, cluster)
command = LoginCommand.new
Expand Down Expand Up @@ -65,4 +66,3 @@ def call(conn, cluster)
end
end
end

16 changes: 10 additions & 6 deletions lib/aerospike/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ def get_connection(timeout)
# Put back a connection to the cache. If cache is full, the connection will be
# closed and discarded
def put_connection(conn)
conn.close if !active?
@connections.cleanup(conn) unless active?
@connections.offer(conn)
end

# Separate connection for refreshing
def tend_connection
if @tend_connection.nil? || @tend_connection.closed?
@tend_connection = Cluster::CreateConnection.(cluster, host)
@tend_connection = @connections.create
end
@tend_connection
end
Expand Down Expand Up @@ -177,7 +177,7 @@ def aliases
@aliases.value
end

# Marks node as inactice and closes all cached connections
# Marks node as inactive and closes all cached connections
def close
inactive!
close_connections
Expand Down Expand Up @@ -224,14 +224,18 @@ def refresh_reset
Node::Refresh::Reset.(self)
end

def close_connection(conn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed if the suggested changes are applied.

@connections.cleanup(conn)
end

private

def close_connections
@tend_connection.close if @tend_connection
@connections.cleanup(@tend_connection) if @tend_connection
# drain connections and close all of them
# non-blocking, does not call create_block when passed false
while conn = @connections.poll(false)
conn.close if conn
while (conn = @connections.poll(create_new: false))
@connections.cleanup(conn)
end
end
end # class Node
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/node/refresh/info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def call(node, peers)
peers.refresh_count += 1
node.reset_failures!
rescue ::Aerospike::Exceptions::Aerospike => e
conn.close if conn
node.close_connection(conn) if conn
node.decrease_health
peers.generation_changed = true if peers.use_peers?
Refresh::Failed.(node, e)
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/node/refresh/partitions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def call(node, peers)
parser = PartitionParser.new(node, conn)
node.cluster.update_partitions(parser)
rescue ::Aerospike::Exceptions::Aerospike => e
conn.close
node.close_connection(conn)
Refresh::Failed.(node, e)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/node/refresh/racks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def call(node)
parser = RackParser.new(node, conn)
node.update_racks(parser)
rescue ::Aerospike::Exceptions::Aerospike => e
conn.close
node.close_connection(conn)
Refresh::Failed.(node, e)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/node_validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@ def is_ip?(hostname)
end

end # class
end # module
end # module
7 changes: 7 additions & 0 deletions lib/aerospike/result_code.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ module ResultCode

attr_reader :code

#Maximum connections established. New connections cannot be created.
MAX_CONNECTION_EXCEEDED = -21

# One or more keys failed in a batch.
BATCH_FAILED = -20

Expand Down Expand Up @@ -299,6 +302,7 @@ module ResultCode
# Internal error.
QUERY_DUPLICATE = 215


def self.message(code)
case code
when BATCH_FAILED
Expand Down Expand Up @@ -577,6 +581,9 @@ def self.message(code)
when QUERY_DUPLICATE
"Internal query error"

when MAX_CONNECTION_EXCEEDED
"Maximum new connections exceeded."

else
"ResultCode #{code} unknown in the client. Please file a github issue."
end # case
Expand Down
8 changes: 6 additions & 2 deletions lib/aerospike/task/execute_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def all_nodes_done?
elsif node.supports_feature?(Aerospike::Features::QUERY_SHOW)
command = cmd2
end

conn = node.get_connection(0)
begin
conn = node.get_connection(0)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
responseMap, _ = Info.request(conn, command)
node.put_connection(conn)

Expand Down
7 changes: 6 additions & 1 deletion lib/aerospike/task/index_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
response_map = Info.request(conn, command)
_, response = response_map.first
match = response.to_s.match(MATCHER)
Expand Down
9 changes: 7 additions & 2 deletions lib/aerospike/task/udf_register_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
response_map = Info.request(conn, command)
_, response = response_map.first
index = response.to_s.index("filename=#{@package_name}")

return false if index.nil?
end

return true
true
end

end # class
Expand Down
7 changes: 6 additions & 1 deletion lib/aerospike/task/udf_remove_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
raise e
end
response_map = Info.request(conn, command)
_, response = response_map.first
index = response.to_s.index("filename=#{@package_name}")
Expand Down