Skip to content

Commit

Permalink
Merge pull request #115 from aerospike/CLIENT-1176_suppot_background_…
Browse files Browse the repository at this point in the history
…query

[Client-1176] Added support for write operations in background queries
  • Loading branch information
vmsachin committed May 16, 2023
2 parents bd37d8b + 5e61570 commit 8ca27c9
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 33 deletions.
4 changes: 2 additions & 2 deletions aerospike.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Gem::Specification.new do |s|
s.name = "aerospike"
s.version = Aerospike::VERSION
s.platform = Gem::Platform::RUBY
s.authors = [ "Khosrow Afroozeh", "Jan Hecking" ]
s.email = [ "khosrow@aerospike.com", "jhecking@aerospike.com" ]
s.authors = ["Khosrow Afroozeh", "Jan Hecking", "Sachin Venkatesha Murthy"]
s.email = [ "khosrow@aerospike.com", "jhecking@aerospike.com", "smurthy@aerospike.com"]
s.homepage = "http://www.github.com/aerospike/aerospike-client-ruby"
s.summary = "An Aerospike driver for Ruby."
s.description = "Official Aerospike Client for ruby. Access your Aerospike cluster with ease of Ruby."
Expand Down
1 change: 1 addition & 0 deletions lib/aerospike.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
require "aerospike/query/scan_partition_command"
require "aerospike/query/query_executor"
require "aerospike/query/query_partition_command"
require "aerospike/query/server_command"

require "aerospike/exp/exp"
require "aerospike/exp/exp_map"
Expand Down
56 changes: 54 additions & 2 deletions lib/aerospike/client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2014-2020 Aerospike, Inc.
# Copyright 2014-2023 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
Expand Down Expand Up @@ -225,7 +225,6 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
policy = create_policy(options, Policy, default_info_policy)

node = @cluster.random_node
conn = node.get_connection(policy.timeout)

if set_name && !set_name.to_s.strip.empty?
str_cmd = "truncate:namespace=#{namespace}"
Expand Down Expand Up @@ -731,6 +730,59 @@ def query(statement, options = nil)
query_partitions(Aerospike::PartitionFilter.all, statement, options)
end

#----------------------------------------------------------
# Query/Execute (Supported by Aerospike 3+ servers only)
#----------------------------------------------------------

# QueryExecute applies operations on records that match the statement filter.
# Records are not returned to the client.
# This asynchronous server call will return before the command is complete.
# The user can optionally wait for command completion by using the returned
# ExecuteTask instance.
#
# This method is only supported by Aerospike 3+ servers.
# If the policy is nil, the default relevant policy will be used.
#
# @param statement [Aerospike::Statement] The query or batch read statement.
# @param operations [Array<Aerospike::Operation>] An optional list of operations.
# @param options [Hash] An optional hash of policy options.
# @return [Aerospike::ExecuteTask] An ExecuteTask instance that can be used to wait for command completion.
#
# @raise [Aerospike::Exceptions::Aerospike] if an error occurs during the operation.
def query_execute(statement, operations = [], options = nil)
policy = create_policy(options, WritePolicy, default_write_policy)

if statement.nil?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INVALID_COMMAND, "Query failed of invalid statement.")
end

statement = statement.clone
unless operations.empty?
statement.operations = operations
end

task_id = statement.task_id
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
end

# Use a thread per node
nodes.each do |node|
Thread.new do
Thread.current.abort_on_exception = true
begin
command = ServerCommand.new(@cluster, node, policy, statement, true, task_id)
execute_command(command)
rescue => e
Aerospike.logger.error(e)
raise e
end
end
end
ExecuteTask.new(@cluster, statement)
end

#-------------------------------------------------------
# User administration
#-------------------------------------------------------
Expand Down
255 changes: 251 additions & 4 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2014-2020 Aerospike, Inc.
# Copyright 2014-2024 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
Expand Down Expand Up @@ -465,6 +465,254 @@ def set_scan(policy, namespace, set_name, bin_names, node_partitions)
end_cmd
end

def set_query(policy, statement, background, node_partitions)
function_arg_buffer = nil
field_count = 0
filter_size = 0

begin_cmd

if statement.namespace
@data_offset += statement.namespace.bytesize + FIELD_HEADER_SIZE
field_count += 1
end

if statement.set_name
@data_offset += statement.set_name.bytesize + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate recordsPerSecond field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
if statement.records_per_second > 0
@data_offset += 4 + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate socket timeout field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
@data_offset += 4 + FIELD_HEADER_SIZE
field_count += 1

# Estimate task_id field.
@data_offset += 8 + FIELD_HEADER_SIZE
field_count += 1

filter = statement.filters[0]
bin_names = statement.bin_names
packed_ctx = nil

if filter
col_type = filter.collection_type

# Estimate INDEX_TYPE field.
if col_type > 0
@data_offset += FIELD_HEADER_SIZE + 1
field_count += 1
end

# Estimate INDEX_RANGE field.
@data_offset += FIELD_HEADER_SIZE
filter_size += 1 # num filters
filter_size += filter.estimate_size

@data_offset += filter_size
field_count += 1

packed_ctx = filter.packed_ctx
if packed_ctx
@data_offset += FIELD_HEADER_SIZE + packed_ctx.length
field_count += 1
end
end

statement.set_task_id
predexp = policy.predexp || statement.predexp

if predexp
@data_offset += FIELD_HEADER_SIZE
pred_size = Aerospike::PredExp.estimate_size(predexp)
@data_offset += pred_size
field_count += 1
end

unless policy.filter_exp.nil?
exp_size = estimate_expression_size(policy.filter_exp)
field_count += 1 if exp_size > 0
end

# Estimate aggregation/background function size.
if statement.function_name
@data_offset += FIELD_HEADER_SIZE + 1 # udf type
@data_offset += statement.package_name.bytesize + FIELD_HEADER_SIZE
@data_offset += statement.function_name.bytesize + FIELD_HEADER_SIZE

function_arg_buffer = ""
if statement.function_args && statement.function_args.length > 0
function_arg_buffer = Value.of(statement.function_args).to_bytes
end
@data_offset += FIELD_HEADER_SIZE + function_arg_buffer.bytesize
field_count += 4
end

max_records = 0
parts_full_size = 0
parts_partial_digest_size = 0
parts_partial_bval_size = 0

unless node_partitions.nil?
parts_full_size = node_partitions.parts_full.length * 2
parts_partial_digest_size = node_partitions.parts_partial.length * 20

unless filter.nil?
parts_partial_bval_size = node_partitions.parts_partial.length * 8
end
max_records = node_partitions.record_max
end

if parts_full_size > 0
@data_offset += parts_full_size + FIELD_HEADER_SIZE
field_count += 1
end

if parts_partial_digest_size > 0
@data_offset += parts_partial_digest_size + FIELD_HEADER_SIZE
field_count += 1
end

if parts_partial_bval_size > 0
@data_offset += parts_partial_bval_size + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate max records field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
if max_records > 0
@data_offset += 8 + FIELD_HEADER_SIZE
field_count += 1
end

operations = statement.operations
operation_count = 0

if !operations.empty?

unless background
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR)
end

operations.each do |operation|
estimate_operation_size_for_operation(operation)
end
operation_count = operations.size
elsif !bin_names.empty?
bin_names.each do |bin_name|
estimate_operation_size_for_bin_name(bin_name)
end
operation_count = bin_names.length
# Estimate size for selected bin names (query bin names already handled for old servers).
end

size_buffer

if background
write_header_with_policy(policy, 0, INFO2_WRITE, field_count, operation_count)
else
read_attr = INFO1_READ
read_attr |= INFO1_NOBINDATA unless policy.include_bin_data
read_attr |= INFO1_SHORT_QUERY if policy.short_query
write_header(policy, read_attr, 0, field_count, operation_count)
end

write_field_string(statement.namespace, FieldType::NAMESPACE) if statement.namespace
write_field_string(statement.set_name, FieldType::TABLE) if statement.set_name

# Write records per second.
write_field_int(statement.records_per_second, FieldType::RECORDS_PER_SECOND) if statement.records_per_second > 0

write_filter_exp(policy.filter_exp, exp_size)

# Write socket idle timeout.
write_field_int(policy.socket_timeout, FieldType::SOCKET_TIMEOUT)

# Write task_id field
write_field_int64(statement.task_id, FieldType::TRAN_ID)

unless predexp.nil?
write_field_header(pred_size, Aerospike::FieldType::PREDEXP)
@data_offset = Aerospike::PredExp.write(
predexp, @data_buffer, @data_offset
)
end

if filter
type = filter.collection_type

if type > 0
write_field_header(1, FieldType::INDEX_TYPE)
@data_offset += @data_buffer.write_byte(type, @data_offset)
end

write_field_header(filter_size, FieldType::INDEX_RANGE)
@data_offset += @data_buffer.write_byte(1, @data_offset)
@data_offset = filter.write(@data_buffer, @data_offset)

if packed_ctx
write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT)
@data_offset += @data_buffer.write_binary(packed_ctx, @data_offset)
end
end

if statement.function_name
write_field_header(1, FieldType::UDF_OP)
@data_offset += @data_buffer.write_byte(1, @data_offset)
write_field_string(statement.package_name, FieldType::UDF_PACKAGE_NAME)
write_field_string(statement.function_name, FieldType::UDF_FUNCTION)
write_field_string(function_arg_buffer, FieldType::UDF_ARGLIST)
end

if parts_full_size > 0
write_field_header(parts_full_size, FieldType::PID_ARRAY)
node_partitions.parts_full.each do |part|
@data_offset += @data_buffer.write_uint16_little_endian(part.id, @data_offset)
end
end

if parts_partial_digest_size > 0
write_field_header(parts_partial_digest_size, FieldType::DIGEST_ARRAY)
node_partitions.parts_partial.each do |part|
@data_offset += @data_buffer.write_binary(part.digest, @data_offset)
end
end

if parts_partial_bval_size > 0
write_field_header(parts_partial_bval_size, FieldType::BVAL_ARRAY)
@node_partitions.parts_partial.each do |part|
@data_offset += @data_buffer.write_uint64_little_endian(part.bval, @data_offset)
end
end

if max_records > 0
write_field(max_records, FieldType::MAX_RECORDS)
end

if operations.empty?
if bin_names.empty?
bin_names.each do |bin_name|
write_operation_for_bin_name(bin_name, Operation::READ)
end
end
else
operations.each do |operation|
write_operation_for_operation(operation)
end
end

end_cmd

nil
end

def execute
iterations = 0

Expand Down Expand Up @@ -537,7 +785,7 @@ def execute
parse_result
rescue => e
case e
# do not log the following exceptions
# do not log the following exceptions
when Aerospike::Exceptions::ScanTerminated
when Aerospike::Exceptions::QueryTerminated
else
Expand Down Expand Up @@ -703,9 +951,8 @@ def write_header_with_policy(policy, read_attr, write_attr, field_count, operati
read_attr |= INFO1_CONSISTENCY_ALL if policy.consistency_level == Aerospike::ConsistencyLevel::CONSISTENCY_ALL
write_attr |= INFO2_DURABLE_DELETE if policy.durable_delete
read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression

# Write all header data except total size which must be written last.
@data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message heade.length.
@data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length.
@data_buffer.write_byte(read_attr, 9)
@data_buffer.write_byte(write_attr, 10)
@data_buffer.write_byte(info_attr, 11)
Expand Down
12 changes: 11 additions & 1 deletion lib/aerospike/policy/policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Aerospike
# Container object for client policy command.
class Policy
attr_accessor :filter_exp, :priority, :timeout, :max_retries, :sleep_between_retries, :consistency_level,
:predexp, :fail_on_filtered_out, :replica, :use_compression
:predexp, :fail_on_filtered_out, :replica, :use_compression, :socket_timeout

alias total_timeout timeout
alias total_timeout= timeout=
Expand Down Expand Up @@ -133,6 +133,16 @@ def initialize(opt = {})
# Duration to sleep between retries if a transaction fails and the
# timeout was not exceeded. Enter zero to skip sleep.
@sleep_between_retries = opt[:sleep_between_retries] || 0.5

# Determines network timeout for each attempt.
#
# If socket_timeout is not zero and socket_timeout is reached before an attempt completes,
# the Timeout above is checked. If Timeout is not exceeded, the transaction
# is retried. If both socket_timeout and Timeout are non-zero, socket_timeout must be less
# than or equal to Timeout, otherwise Timeout will also be used for socket_timeout.
#
# Default: 30s
@socket_timeout = opt[:socket_timeout] || 30000
end
end # class
end # module

0 comments on commit 8ca27c9

Please sign in to comment.