Skip to content

Commit

Permalink
Return results of multiple read operations on same bin as array (#12)
Browse files Browse the repository at this point in the history
* support multiple results on same bin

* add record_bin_multiplicity policy for backwards compatibility

The default value Aerospike::RecordBinMultiplicity::SINGLE maintains the
client behavior pre-2.2 of returning only a single result value for any
record bin; if multiple read operations return values for the same bin,
the last result of the last operation is returned.

Set OperatePolicy#record_bin_multiplicity to
Aerospike::RecordBinMultiplicity::ARRAY to return the results of
multiple read operations on the same record in as an array.

* avoid duplicate key lookup

* fix names for get_by_rank(_range) operations

* update docs re. results for multiple read ops on same bin

* cleanup/reformat policy docs

* document OperatePolicy & RecordBinMultiplicity
  • Loading branch information
jhecking committed Sep 19, 2016
1 parent c4fe3bb commit c1b09ff
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 171 deletions.
29 changes: 20 additions & 9 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ followed by a read operation on the same bin, will return the new value
post-increment.

The results are returned as a hash map with the bin names as keys and the
result of the last operation on that bin as the value. I.e. if multiple
operations on the same bin return a value for that bin, the result of the last
operation will be returned. Note that many write operations return values as
well, e.g. a list append operation returns the new size of the list after the
append operation.
result of the operation as values. If multiple operations return values for the
same bin, by default only the result of the last operation is returned. To
return the values of all operations, set `OperatePolicy#record_bin_multiplicity`
to `Aerospike::RecordBinMultiplicity#ARRAY`:

policy = Aerospike::OperatePolicy.new(record_bin_multiplicity: Aerospike::RecordBinMultiplicity::ARRAY)
return_type = Aerospike::CDT::MapReturnType::VALUE
ops = [
Aerospike::CDT::MapOperation.get_key("mapBin", "keyA", return_type: return_type),
Aerospike::CDT::MapOperation.get_key("mapBin", "keyB", return_type: return_type)
]
results = client.operate(key, ops, policy)
results.bins // => {"mapBin" => [{"keyA" => "valueA"}, {"keyB" => "valueB"}]}

Note that many write operations return values as well, e.g. a list append
operation returns the new size of the list after the append operation.

Operations are grouped by the type of bins that they operate on. Operations are
instantiated via one of the following three classes:
Expand Down Expand Up @@ -530,8 +541,8 @@ Parameters:
- `count` ` - [optional] Number of elements to return.
- `return_type` - [optional] Type of data to return. Default is none.

<a name="map-get_rank"></a>
### Aerospike::CDT::MapOperation#get_rank(bin_name, rank, return_type:)
<a name="map-get_by_rank"></a>
### Aerospike::CDT::MapOperation#get_by_rank(bin_name, rank, return_type:)

Selects map items identified by rank and returns selected data specified by
return_type.
Expand All @@ -542,8 +553,8 @@ Parameters:
- `rank` - Rank of map item to return.
- `return_type` - [optional] Type of data to return. Default is none.

<a name="map-get_rank_range"></a>
### Aerospike::CDT::MapOperation#get_rank_range(bin_name, rank, count, return_type:)
<a name="map-get_by_rank_range"></a>
### Aerospike::CDT::MapOperation#get_by_rank_range(bin_name, rank, count, return_type:)

Server selects "count" map items starting at specified rank and returns
selected data specified by return_type. If "count" is not specified, server
Expand Down
241 changes: 82 additions & 159 deletions docs/policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ operations and the allowed values for some of the policies.
- [`Policy Objects`](#Objects)
- [`Policy Values`](#Values)


<a name="Objects"></a>
## Objects

Expand All @@ -27,200 +26,124 @@ Usage Example:
client.get(key, [], policy.new);
```

<!--
################################################################################
Policy
################################################################################
-->
<a name="Policy"></a>

### Policy Object

A policy effecting the behaviour of read operations.
A policy affecting the behaviour of read operations.

Attributes:

- `priority` – Specifies the behavior for the key.
For values, see [Priority Values](policies.md#priority).
* Default: `Priority.DEFAULT`
- `timeout` – Maximum time to wait for
the operation to complete. If 0 (zero), then the value
means there will be no timeout enforced.
Value should be in seconds.
* Default: `0 * time.Milliseconds` (no timeout)
- `max_retries` – Number of times to try on connection errors.
* Default: `2`
- `sleep_between_retries` – Duration of waiting between retries.
* Default: `0.500` (500ms)


<!--
################################################################################
WritePolicy
################################################################################
-->
<a name="WritePolicy"></a>
* `priority` – Specifies the behavior for the key.
* For values, see [Priority Values](#priority).
* Default: `Priority.DEFAULT`
* `timeout` – Maximum time to wait for the operation to complete.
* If 0 (zero), then the value means there will be no timeout enforced. Value
should be in seconds.
* Default: 0 (no timeout)
* `max_retries` – Number of times to try on connection errors.
* Default: 2
* `sleep_between_retries` – Duration of waiting between retries.
* Default: `0.500` (500ms)

<a name="WritePolicy"></a>
### WritePolicy Object

A policy effecting the behaviour of write operations.

Includes All Policy attributes, plus:
A policy affecting the behaviour of write operations.

Includes all [Policy](#Policy) attributes, plus:

* `send_key` – Qualify whether the server should store the record's primary key, or just use its digest.
* Default: `true`
* `record_exists_action` – Qualify how to handle writes where the record already exists.
* For values, see [RecordExistsAction Values](policies.md#exists).
* Default: `RecordExistsAction.UPDATE`
* `generation_policy` – Qualify how to handle record writes based on record generation.
* For values, see [GenerationPolicy Values](policies.md#gen).
* Default: `GenerationPolicy.NONE` (generation is not used to restrict writes)
* `commit_level` – Desired consistency guarantee when committing a transaction on the server.
* For values, see [CommitLevel Values](policies.md#commit).
* Default: `CommitLevel.COMMIT_ALL` (wait for write confirmation from all replicas)
* `generation` – Expected generation.
* Generation is the number of times a record has been modified (including
creation) on the server. If a write operation is creating a record, the
expected generation would be 0
* Default: 0
* `expiration` – Record expiration. Also known as ttl (time to live).
* Seconds record will live before being removed by the server.
* Expiration values:
* -1: Never expire for Aerospike 2 server versions >= 2.7.2 and Aerospike 3 server versions >= 3.1.4. Do not use -1 for older servers.
* 0: Default to namespace configuration variable "default-ttl" on the server.
* > 0: Actual expiration in seconds.
* Default: 0

<a name="OperatePolicy"></a>
### OperatePolicy Object

A policy affecting the behavior of operate commands.

Includes all [WritePolicy](#WritePolicy) attributes, plus:

* `record_bin_multiplicity` - Specifies how to merge results from multiple operations on the same record bin.
* Allowed values: See [RecordBinMultiplicity Values](#RecordBinMultiplicity)
* Default: `RecordBinMultiplicity::SINGLE`

- `send_key` – Qualify whether the server should store the record's primary key, or just use its digest.
* Default: `true`
- `record_exists_action` – Qualify how to handle writes where the record already exists.
For values, see [RecordExistsAction Values](policies.md#exists).
* Default: `RecordExistsAction.UPDATE`
- `generation_policy` – Qualify how to handle record writes based on record generation.
For values, see [GenerationPolicy Values](policies.md#gen).
* Default: `GenerationPolicy.NONE` (generation is not used to restrict writes)
- `commit_level` – Desired consistency guarantee when committing a transaction on the server.
For values, see [CommitLevel Values](policies.md#commit).
* Default: `CommitLevel.COMMIT_ALL` (wait for write confirmation from all replicas)
- `generation` – Expected generation. Generation is the number of times a record has been modified
(including creation) on the server. If a write operation is creating a record,
the expected generation would be 0
* Default: `0`
- `expiration` – Record expiration. Also known as ttl (time to live). Seconds record will live before being removed by the server.
Expiration values:
* -1: Never expire for Aerospike 2 server versions >= 2.7.2 and Aerospike 3 server versions >= 3.1.4.
Do not use -1 for older servers.
* 0: Default to namespace configuration variable "default-ttl" on the server.
* > 0: Actual expiration in seconds.
* Default: `0`

<!--
################################################################################
QueryPolicy
################################################################################
-->
<a name="QueryPolicy"></a>

### QueryPolicy Object

A policy effecting the behaviour of query and scan operations.
A policy affecting the behaviour of query and scan operations.

Includes All Policy attributes, plus:

- `record_queue_size` - The record set buffers the query results locally.
This attribute controls the size of the buffer (a
`SizedQueue` instance).
* Default: `5000`
* `record_queue_size` - The record set buffers the query results locally.
* This attribute controls the size of the buffer (a `SizedQueue` instance).
* Default: 5000

<a name="Values"></a>
## Values

The following are values allowed for various policies.

<!--
################################################################################
gen
################################################################################
-->
<a name="gen"></a>

### GenerationPolicy Values

- **NONE**

Writes a record, regardless of generation.

- **EXPECT_GEN_EQUAL**

Writes a record, ONLY if generations are equal.
* `NONE` - Writes a record, regardless of generation.
* `EXPECT_GEN_EQUAL` - Writes a record, ONLY if generations are equal.
* `EXPECT_GEN_GT` - Writes a record, ONLY if local generation is greater-than remote generation.
* `DUPLICATE` - Writes a record creating a duplicate, ONLY if the generation collides.

- **EXPECT_GEN_GT**

Writes a record, ONLY if local generation is greater-than remote generation.

- **DUPLICATE**

Writes a record creating a duplicate, ONLY if the generation collides.

<!--
################################################################################
exists
################################################################################
-->
<a name="exists"></a>

### RecordExistsAction Values

- **UPDATE**

Create or update record.

Merge write command bins with existing bins.

- **UPDATE_ONLY**

Update record only. Fail if record does not exist.

Merge write command bins with existing bins.

- **REPLACE**
* `UPDATE` - Create or update record.
* Merge write command bins with existing bins.
* `UPDATE_ONLY` - Update record only. Fail if record does not exist.
* Merge write command bins with existing bins.
* `REPLACE` - Create or replace record.
* Delete existing bins not referenced by write command bins.
* Supported by Aerospike 2 server versions >= 2.7.5 and
Aerospike 3 server versions >= 3.1.6.
* `REPLACE_ONLY` - Replace record only. Fail if record does not exist.
* Delete existing bins not referenced by write command bins.
* Supported by Aerospike 2 server versions >= 2.7.5 and
Aerospike 3 server versions >= 3.1.6.
* `CREATE_ONLY` - Create only. Fail if record exists.

Create or replace record.

Delete existing bins not referenced by write command bins.

Supported by Aerospike 2 server versions >= 2.7.5 and

Aerospike 3 server versions >= 3.1.6.

- **REPLACE_ONLY**

Replace record only. Fail if record does not exist.

Delete existing bins not referenced by write command bins.

Supported by Aerospike 2 server versions >= 2.7.5 and

Aerospike 3 server versions >= 3.1.6.

- **CREATE_ONLY**

Create only. Fail if record exists.

<!--
################################################################################
commit
################################################################################
-->
<a name="commit"></a>

### CommitLevel Values

- **COMMIT_ALL**

Wait until successfully committing master and all replicas.

- **COMMIT_MASTER**
* `COMMIT_ALL` - Wait until successfully committing master and all replicas.
* `COMMIT_MASTER`- Wait until successfully committing master only.

Wait until successfully committing master only.

<!--
################################################################################
priority
################################################################################
-->
<a name="priority"></a>

### Priority Values

- **DEFAULT**

The server defines the priority.

- **LOW**

Run the database operation in a background thread.

- **MEDIUM**

Run the database operation at medium priority.
* `DEFAULT` - The server defines the priority.
* `LOW` - Run the database operation in a background thread.
* `MEDIUM` - Run the database operation at medium priority.
* `HIGH` - Run the database operation at the highest priority.

- **HIGH**
<a name="RecordBinMultiplicity"></a>
### RecordBinMultiplicity Values

Run the database operation at the highest priority.
* `SINGLE` - Returns only the value of the last operation on the bin.
* `ARRAY` - Returns all results of operations on the same bin as an array.
15 changes: 12 additions & 3 deletions lib/aerospike/command/read_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'aerospike/record'

require 'aerospike/command/single_command'
require 'aerospike/policy/operate_policy'
require 'aerospike/utils/epoc'
require 'aerospike/value/value'

Expand All @@ -26,7 +27,7 @@ module Aerospike

class ReadCommand < SingleCommand #:nodoc:

attr_reader :record
attr_reader :record, :policy

def initialize(cluster, policy, key, bin_names)
super(cluster, key)
Expand Down Expand Up @@ -108,6 +109,7 @@ def handle_udf_error(result_code)
def parse_record(op_count, field_count, generation, expiration)
bins = op_count > 0 ? {} : nil
receive_offset = 0
single_bin_value = (!(OperatePolicy === policy) || policy.record_bin_multiplicity == RecordBinMultiplicity::SINGLE)

# There can be fields in the response (setname etc).
# But for now, ignore them. Expose them to the API if needed in the future.
Expand All @@ -129,12 +131,17 @@ def parse_record(op_count, field_count, generation, expiration)
name = @data_buffer.read(receive_offset+8, name_size).force_encoding('utf-8')
receive_offset += 4 + 4 + name_size


particle_bytes_size = op_size - (4 + name_size)
value = Aerospike.bytes_to_particle(particle_type, @data_buffer, receive_offset, particle_bytes_size)
receive_offset += particle_bytes_size

bins[name] = value
if single_bin_value || !bins.has_key?(name)
bins[name] = value
elsif (prev = bins[name]).kind_of?(OpResults)
prev << value
else
bins[name] = OpResults.new << prev << value
end

i = i.succ
end
Expand All @@ -144,4 +151,6 @@ def parse_record(op_count, field_count, generation, expiration)

end # class

class OpResults < Array; end

end # module

0 comments on commit c1b09ff

Please sign in to comment.