Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Commit

Permalink
Merge pull request #21 from gyounes/master
Browse files Browse the repository at this point in the history
Changes to antidote CRDTs (as per M12)

- new module names
- removed experimental CRDTs: integer, rga, map_aw
- code cleanup
  • Loading branch information
peterzeller committed Mar 21, 2018
2 parents b41a44d + 303dc70 commit 656a8c6
Show file tree
Hide file tree
Showing 32 changed files with 291 additions and 1,292 deletions.
13 changes: 1 addition & 12 deletions include/antidote_crdt.hrl
Expand Up @@ -4,19 +4,8 @@
-type value() :: term().
-type reason() :: term().

-type pncounter() :: integer().
-type pncounter_update() :: {increment, integer()} |
{decrement, integer()}.
-type pncounter_effect() :: integer().
-type pncounter_value() :: integer().


-export_type([ crdt/0,
update/0,
effect/0,
value/0,
pncounter/0,
pncounter_update/0,
pncounter_effect/0,
pncounter_value/0
value/0
]).
46 changes: 30 additions & 16 deletions src/antidote_crdt.erl
Expand Up @@ -19,29 +19,43 @@
%% -------------------------------------------------------------------

%% antidote_crdt.erl : behaviour for op-based CRDTs
%% Naming pattern of antidote crdts: <type>_<semantics>
%% if there is only one kind of semantics implemented for a certain type
%% only the type is used in the name e.g. rga
%% counter_pn: PN-Counter aka Posistive Negative Counter
%% counter_b: Bounded Counter
%% counter_fat: Fat Counter
%% integer: Integer (Experimental)
%% flag_ew: Enable Wins Flag aka EW-Flag
%% flag_dw: Disable Wins Flag DW-Flag
%% set_go: Grow Only Set aka G-Set
%% set_aw: Add Wins Set aka AW-Set, previously OR-Set (Observed Remove Set)
%% set_rw: Remove Wins Set aka RW-Set
%% register_lww: Last Writer Wins Register aka LWW-Reg
%% register_mv: MultiValue Register aka MV-Reg
%% map_go: Grow Only Map aka G-Map
%% map_aw: Add Wins Map aka AW-Map (Experimental)
%% map_rr: Recursive Resets Map akak RR-Map
%% rga: Replicated Growable Array (Experimental)



-module(antidote_crdt).

-include("antidote_crdt.hrl").

-define(CRDTS, [antidote_crdt_counter,
antidote_crdt_orset,
antidote_crdt_gset,
antidote_crdt_rga,
antidote_crdt_bcounter,
antidote_crdt_mvreg,
antidote_crdt_map,
antidote_crdt_lwwreg,
antidote_crdt_gmap,
antidote_crdt_set_rw,
antidote_crdt_integer,
antidote_crdt_map_aw,
antidote_crdt_map_rr,
antidote_crdt_fat_counter,
-define(CRDTS, [antidote_crdt_counter_pn,
antidote_crdt_counter_b,
antidote_crdt_counter_fat,
antidote_crdt_flag_ew,
antidote_crdt_flag_dw
]).
antidote_crdt_flag_dw,
antidote_crdt_set_go,
antidote_crdt_set_aw,
antidote_crdt_set_rw,
antidote_crdt_register_lww,
antidote_crdt_register_mv,
antidote_crdt_map_go,
antidote_crdt_map_rr]).

-export([is_type/1
]).
Expand Down
60 changes: 30 additions & 30 deletions src/antidote_crdt_bcounter.erl → src/antidote_crdt_counter_b.erl
@@ -1,7 +1,7 @@
%% -*- coding: utf-8 -*-
%% --------------------------------------------------------------------------
%%
%% crdt_bcounter: A convergent, replicated, operation based bounded counter.
%% antidote_crdt_counter_b: A convergent, replicated, operation-based bounded counter.
%%
%% --------------------------------------------------------------------------

Expand All @@ -12,7 +12,7 @@
%% All operations on this CRDT are monotonic and do not keep extra tombstones.
%% @end

-module(antidote_crdt_bcounter).
-module(antidote_crdt_counter_b).

-behaviour(antidote_crdt).

Expand Down Expand Up @@ -40,23 +40,23 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-export_type([bcounter/0, binary_bcounter/0, bcounter_op/0, id/0]).
-export_type([antidote_crdt_counter_b/0, binary_antidote_crdt_counter_b/0, antidote_crdt_counter_b_op/0, id/0]).

-opaque bcounter() :: {orddict:orddict(), orddict:orddict()}.
-type binary_bcounter() :: binary().
-type bcounter_op() :: bcounter_anon_op() | bcounter_src_op().
-type bcounter_anon_op() :: {transfer, {pos_integer(), id(), id()}} |
-opaque antidote_crdt_counter_b() :: {orddict:orddict(), orddict:orddict()}.
-type binary_antidote_crdt_counter_b() :: binary().
-type antidote_crdt_counter_b_op() :: antidote_crdt_counter_b_anon_op() | antidote_crdt_counter_b_src_op().
-type antidote_crdt_counter_b_anon_op() :: {transfer, {pos_integer(), id(), id()}} |
{increment, {pos_integer(), id()}} | {decrement, {pos_integer(), id()}}.
-type bcounter_src_op() :: {bcounter_anon_op(), id()}.
-type antidote_crdt_counter_b_src_op() :: {antidote_crdt_counter_b_anon_op(), id()}.
-opaque id() :: term. %% A replica's identifier.

%% @doc Return a new, empty `bcounter()'.
-spec new() -> bcounter().
%% @doc Return a new, empty `antidote_crdt_counter_b()'.
-spec new() -> antidote_crdt_counter_b().
new() ->
{orddict:new(), orddict:new()}.

%% @doc Return the available permissions of replica `Id' in a `bcounter()'.
-spec localPermissions(id(), bcounter()) -> non_neg_integer().
%% @doc Return the available permissions of replica `Id' in a `antidote_crdt_counter_b()'.
-spec localPermissions(id(), antidote_crdt_counter_b()) -> non_neg_integer().
localPermissions(Id, {P, D}) ->
Received = lists:foldl(
fun(
Expand Down Expand Up @@ -88,8 +88,8 @@ localPermissions(Id, {P, D}) ->
Received - Granted
end.

%% @doc Return the total available permissions in a `bcounter()'.
-spec permissions(bcounter()) -> non_neg_integer().
%% @doc Return the total available permissions in a `antidote_crdt_counter_b()'.
-spec permissions(antidote_crdt_counter_b()) -> non_neg_integer().
permissions({P, D}) ->
TotalIncrements = orddict:fold(
fun
Expand All @@ -105,22 +105,22 @@ permissions({P, D}) ->
end, 0, D),
TotalIncrements - TotalDecrements.

%% @doc Return the read value of a given `bcounter()', itself.
-spec value(bcounter()) -> bcounter().
%% @doc Return the read value of a given `antidote_crdt_counter_b()', itself.
-spec value(antidote_crdt_counter_b()) -> antidote_crdt_counter_b().
value(Counter) -> Counter.

%% @doc Generate a downstream operation.
%% The first parameter is either `{increment, pos_integer()}' or `{decrement, pos_integer()}',
%% which specify the operation and amount, or `{transfer, pos_integer(), id()}'
%% that additionally specifies the target replica.
%% The second parameter is an `actor()' who identifies the source replica,
%% and the third parameter is a `bcounter()' which holds the current snapshot.
%% and the third parameter is a `antidote_crdt_counter_b()' which holds the current snapshot.
%%
%% Return a tuple containing the operation and source replica.
%% This operation fails and returns `{error, no_permissions}'
%% if it tries to consume resources unavailable to the source replica
%% (which prevents logging of forbidden attempts).
-spec downstream(bcounter_op(), bcounter()) -> {ok, term()} | {error, no_permissions}.
-spec downstream(antidote_crdt_counter_b_op(), antidote_crdt_counter_b()) -> {ok, term()} | {error, no_permissions}.
downstream({increment, {V, Actor}}, _Counter) when is_integer(V), V > 0 ->
{ok, {{increment, V}, Actor}};
downstream({decrement, {V, Actor}}, Counter) when is_integer(V), V > 0 ->
Expand All @@ -134,11 +134,11 @@ generate_downstream_check(Op, Actor, Counter, V) ->
Available < V -> {error, no_permissions}
end.

%% @doc Update a `bcounter()' with a downstream operation,
%% @doc Update a `antidote_crdt_counter_b()' with a downstream operation,
%% usually created with `generate_downstream'.
%%
%% Return the resulting `bcounter()' after applying the operation.
-spec update(term(), bcounter()) -> {ok, bcounter()}.
%% Return the resulting `antidote_crdt_counter_b()' after applying the operation.
-spec update(term(), antidote_crdt_counter_b()) -> {ok, antidote_crdt_counter_b()}.
update({{increment, V}, Id}, Counter) ->
increment(Id, V, Counter);
update({{decrement, V}, Id}, Counter) ->
Expand All @@ -158,12 +158,12 @@ decrement(Id, V, {P, D}) ->
transfer(From, To, V, {P, D}) ->
{ok, {orddict:update_counter({From, To}, V, P), D}}.

%% doc Return the binary representation of a `bcounter()'.
-spec to_binary(bcounter()) -> binary().
%% doc Return the binary representation of a `antidote_crdt_counter_b()'.
-spec to_binary(antidote_crdt_counter_b()) -> binary().
to_binary(C) -> term_to_binary(C).

%% doc Return a `bcounter()' from its binary representation.
-spec from_binary(binary()) -> {ok, bcounter()}.
%% doc Return a `antidote_crdt_counter_b()' from its binary representation.
-spec from_binary(binary()) -> {ok, antidote_crdt_counter_b()}.
from_binary(<<B/binary>>) -> {ok, binary_to_term(B)}.

%% @doc The following operation verifies
Expand Down Expand Up @@ -199,7 +199,7 @@ apply_op(Op, Counter) ->
{ok, NewCounter} = update(OP_DS, Counter),
NewCounter.

%% Tests creating a new `bcounter()'.
%% Tests creating a new `antidote_crdt_counter_b()'.
new_test() ->
?assertEqual({[], []}, new()).

Expand Down Expand Up @@ -271,22 +271,22 @@ transfer_test() ->

%% Tests the function `value()'.
value_test() ->
%% Test on `bcounter()' resulting from applying all kinds of operation.
%% Test on `antidote_crdt_counter_b()' resulting from applying all kinds of operation.
Counter0 = new(),
Counter1 = apply_op({increment, {10, r1}}, Counter0),
Counter2 = apply_op({decrement, {6, r1}}, Counter1),
Counter3 = apply_op({transfer, {2, r2, r1}}, Counter2),
%% Assert `value()' returns `bcounter()' itself.
%% Assert `value()' returns `antidote_crdt_counter_b()' itself.
?assertEqual(Counter3, value(Counter3)).

%% Tests serialization functions `to_binary()' and `from_binary()'.
binary_test() ->
%% Test on `bcounter()' resulting from applying all kinds of operation.
%% Test on `antidote_crdt_counter_b()' resulting from applying all kinds of operation.
Counter0 = new(),
Counter1 = apply_op({increment, {10, r1}}, Counter0),
Counter2 = apply_op({decrement, {6, r1}}, Counter1),
Counter3 = apply_op({transfer, {2, r2, r1}}, Counter2),
%% Assert marshaling and unmarshaling holds the same `bcounter()'.
%% Assert marshaling and unmarshaling holds the same `antidote_crdt_counter_b()'.
B = to_binary(Counter3),
?assertEqual({ok, Counter3}, from_binary(B)).

Expand Down
Expand Up @@ -18,16 +18,17 @@
%%
%% -------------------------------------------------------------------

%% antidote_crdt_fat_counter: A convergent, replicated, operation based Fat Counter
%% antidote_crdt_counter_fat: A convergent, replicated, operation based Fat Counter
%% The state of this fat counter is list of pairs where each pair is an integer
%% and a related token.
%% Basically when the counter recieves {increment, N} or {decrement, N} it generates
%% a pair {N, NewToken}.
%% On update, all seen tokens are removed and the new pair is then added to the state.
%% This token keeps growing ("Fat" Counter) but it useful as it allows the reset
%% functionaility, On reset(), all seen tokens are removed.
%% link to paper: http://haslab.uminho.pt/cbm/files/a3-younes.pdf

-module(antidote_crdt_fat_counter).
-module(antidote_crdt_counter_fat).

-behaviour(antidote_crdt).

Expand Down
28 changes: 16 additions & 12 deletions src/antidote_crdt_counter.erl → src/antidote_crdt_counter_pn.erl
Expand Up @@ -18,15 +18,13 @@
%%
%% -------------------------------------------------------------------

%% antidote_crdt_pncounter: A convergent, replicated, operation
%% antidote_crdt_counter_pn: A convergent, replicated, operation
%% based PN-Counter

-module(antidote_crdt_counter).
-module(antidote_crdt_counter_pn).

-behaviour(antidote_crdt).

-include("antidote_crdt.hrl").

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Expand All @@ -43,27 +41,33 @@
require_state_downstream/1
]).

%% @doc Create a new, empty 'pncounter()'

-type state() :: integer().
-type op() :: {increment, integer()} |
{decrement, integer()}.
-type effect() :: integer().

%% @doc Create a new, empty 'antidote_crdt_counter_pn'
new() ->
0.

%% @doc Create 'pncounter()' with initial value
-spec new(integer()) -> pncounter().
%% @doc Create 'antidote_crdt_counter_pn' with initial value
-spec new(integer()) -> state().
new(Value) when is_integer(Value) ->
Value;
new(_) ->
new().

%% @doc The single, total value of a `pncounter()'
-spec value(pncounter()) -> integer().
-spec value(state()) -> integer().
value(PNCnt) when is_integer(PNCnt) ->
PNCnt.

%% @doc Generate a downstream operation.
%% The first parameter is either `increment' or `decrement' or the two tuples
%% `{increment, pos_integer()}' or `{decrement, pos_integer()}'. The second parameter
%% is the pncounter (this parameter is not actually used).
-spec downstream(pncounter_update(), pncounter()) -> {ok, pncounter_effect()}.
-spec downstream(op(), state()) -> {ok, effect()}.
downstream(increment, _PNCnt) ->
{ok, 1};
downstream(decrement, _PNCnt) ->
Expand All @@ -81,17 +85,17 @@ downstream({decrement, By}, _PNCnt) when is_integer(By) ->
%% The 2nd argument is the `pncounter()' to update.
%%
%% returns the updated `pncounter()'
-spec update(pncounter_effect(), pncounter()) -> {ok, pncounter()}.
-spec update(effect(), state()) -> {ok, state()}.
update(N, PNCnt) ->
{ok, PNCnt + N}.

%% @doc Compare if two `pncounter()' are equal. Only returns `true()' if both
%% of their positive and negative entries are equal.
-spec equal(pncounter(), pncounter()) -> boolean().
-spec equal(state(), state()) -> boolean().
equal(PNCnt1, PNCnt2) ->
PNCnt1 =:= PNCnt2.

-spec to_binary(pncounter()) -> binary().
-spec to_binary(state()) -> binary().
to_binary(PNCounter) ->
term_to_binary(PNCounter).

Expand Down
16 changes: 8 additions & 8 deletions src/antidote_crdt_flag_dw.erl
Expand Up @@ -47,10 +47,10 @@
-define(V1_VERS, 1).

-export_type([flag_dw/0]).
-opaque flag_dw() :: {antidote_crdt_flag:tokens(), antidote_crdt_flag:tokens()}.
-opaque flag_dw() :: {antidote_crdt_flag_helper:tokens(), antidote_crdt_flag_helper:tokens()}.

%% SeenTokens, NewEnableTokens, NewDisableTokens
-type downstream_op() :: {antidote_crdt_flag:tokens(), antidote_crdt_flag:tokens(), antidote_crdt_flag:tokens()}.
-type downstream_op() :: {antidote_crdt_flag_helper:tokens(), antidote_crdt_flag_helper:tokens(), antidote_crdt_flag_helper:tokens()}.

-spec new() -> flag_dw().
new() ->
Expand All @@ -60,11 +60,11 @@ new() ->
value({EnableTokens, DisableTokens}) ->
DisableTokens == [] andalso EnableTokens =/= [].

-spec downstream(antidote_crdt_flag:op(), flag_dw()) -> {ok, downstream_op()}.
-spec downstream(antidote_crdt_flag_helper:op(), flag_dw()) -> {ok, downstream_op()}.
downstream({disable, {}}, {EnableTokens, DisableTokens}) ->
{ok, {EnableTokens ++ DisableTokens, [], [antidote_crdt_flag:unique()]}};
{ok, {EnableTokens ++ DisableTokens, [], [antidote_crdt_flag_helper:unique()]}};
downstream({enable, {}}, {EnableTokens, DisableTokens}) ->
{ok, {EnableTokens ++ DisableTokens, [antidote_crdt_flag:unique()], []}};
{ok, {EnableTokens ++ DisableTokens, [antidote_crdt_flag_helper:unique()], []}};
downstream({reset, {}}, {EnableTokens, DisableTokens}) ->
{ok, {EnableTokens ++ DisableTokens, [], []}}.

Expand All @@ -78,7 +78,7 @@ downstream({reset, {}}, {EnableTokens, DisableTokens}) ->
equal(Flag1, Flag2) ->
Flag1 == Flag2.

-spec to_binary(flag_dw()) -> antidote_crdt_flag:binary_flag().
-spec to_binary(flag_dw()) -> antidote_crdt_flag_helper:binary_flag().
to_binary(Flag) ->
%% @TODO something smarter
<<?TAG:8/integer, ?V1_VERS:8/integer, (term_to_binary(Flag))/binary>>.
Expand All @@ -87,12 +87,12 @@ from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, Bin/binary>>) ->
%% @TODO something smarter
{ok, binary_to_term(Bin)}.

is_operation(A) -> antidote_crdt_flag:is_operation(A).
is_operation(A) -> antidote_crdt_flag_helper:is_operation(A).

is_bottom(Flag) ->
Flag == new().

require_state_downstream(A) -> antidote_crdt_flag:require_state_downstream(A).
require_state_downstream(A) -> antidote_crdt_flag_helper:require_state_downstream(A).


%% ===================================================================
Expand Down

0 comments on commit 656a8c6

Please sign in to comment.