diff --git a/src/riakc_set.erl b/src/riakc_set.erl index a10e9642..69de0ecf 100644 --- a/src/riakc_set.erl +++ b/src/riakc_set.erl @@ -65,7 +65,8 @@ del_element/2]). %% Query functions --export([size/1, +-export([context/1, + size/1, is_element/2, fold/3]). @@ -140,6 +141,10 @@ del_element(_Bin, #set{context=undefined}) -> del_element(Bin, #set{removes=R0}=Set) when is_binary(Bin) -> Set#set{removes=ordsets:add_element(Bin, R0)}. +%% @doc Returns the original context of the set. +-spec context(riakc_set()) -> riakc_datatype:context(). +context(#set{context=C}) -> C. + %% @doc Returns the cardinality (size) of the set. Note: this only %% operates on the original value as retrieved from Riak. -spec size(riakc_set()) -> pos_integer(). diff --git a/test/riakc_pb_socket_tests.erl b/test/riakc_pb_socket_tests.erl index d7ea3b76..510f7378 100644 --- a/test/riakc_pb_socket_tests.erl +++ b/test/riakc_pb_socket_tests.erl @@ -1401,9 +1401,126 @@ integration_tests() -> Rsp -> ?debugFmt("gsets bucket is not present, skipping (~p)", [Rsp]) end + end)}, + {"delete with vclock set with context", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + SC1 = riakc_set:context(S1), + ?debugFmt("The set with context ~1000p is empty - attempt once to delete the corresponding object. In order to do so, on the server: fetch the latest object by key (mapreduce), assert it has no siblings, assert it stores a CRDT, extract object vclock and CRDT context, finally return to the client. If successfully returned object vclock and CRDT context, on the client: if the CRDT context is the expected one, delete the object specifying the vclock detected as corresponding to the CRDT context.~n", [SC1]), + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertEqual(SC1, SC2), + ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>) + end)}, + {"delete with vclock set with context - case determination of vclock corresponding to context failed for concurrent write", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + %% Set is empty hence worth deleting. But + %% concurrent actor modifies object... + {ok, PidConcurrent} = riakc_test_utils:start_link(), + {ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(0, riakc_set:size(S0Concurrent)), + ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(1, riakc_set:size(S1Concurrent)), + %% ... before managing to determine object vclock + %% corresponding to CRDT context... + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, _OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertNotEqual(riakc_set:context(S1), SC2) + %% ... hence decision can be taken on what to do + %% e.g. not calling + %% `riakc_pb_socket:delete_vclock/3`. + end)}, + {"delete with vclock set with context - case delete failed for concurrent write", + ?_test(begin + riakc_test_utils:reset_riak(), + {ok, Pid} = riakc_test_utils:start_link(), + {error, {notfound, set}} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"X">>, riakc_set:new()))), + {ok, S0} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assert(riakc_set:is_element(<<"X">>, S0)), + ?assertEqual(1, riakc_set:size(S0)), + ok = riakc_pb_socket:update_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:del_element(<<"X">>, S0))), + {ok, S1} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertNot(riakc_set:is_element(<<"X">>, S1)), + ?assertEqual(0, riakc_set:size(S1)), + %% Set is empty hence worth deleting. Determine + %% object vclock corresponding to CRDT context. + S = map_strfun_returning_object_vclock_and_crdt_context(), + R = riakc_pb_socket:mapred( + Pid, + [{{{<<"sets">>, <<"b">>}, <<"k">>}, undefined}], + [{map, {strfun, S}, none, true}]), + {ok, [{_, [{{object_vclock, OV2}, + {crdt_context, SC2}}]}]} = R, + ?assertEqual(riakc_set:context(S1), SC2), + %% But concurrent actor modifies object... + {ok, PidConcurrent} = riakc_test_utils:start_link(), + {ok, S0Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(0, riakc_set:size(S0Concurrent)), + ok = riakc_pb_socket:update_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>, riakc_set:to_op(riakc_set:add_element(<<"Y">>, S0Concurrent))), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(PidConcurrent, {<<"sets">>, <<"b">>}, <<"k">>), + ?assertEqual(1, riakc_set:size(S1Concurrent)), + %% ... hence attempt to delete object at specific + %% vclock does not delete the object. + ok = riakc_pb_socket:delete_vclock(Pid, {<<"sets">>, <<"b">>}, <<"k">>, OV2), + {ok, S1Concurrent} = riakc_pb_socket:fetch_type(Pid, {<<"sets">>, <<"b">>}, <<"k">>) end)} ]. +map_strfun_returning_object_vclock_and_crdt_context() -> + "fun(O, undefined, none) -> + %% TODO Tombstone in mapreduce %% Ref http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/advanced-mapreduce/#map-phase + %% Assert no siblings. + 1 = riak_object:value_count(O), + %% riak_kv_crdt:value/1 infers type + %% but discards CRDT context: follow + %% its implementation but keep CRDT + %% context. + B = riak_object:bucket(O), + BProps = [_|_] = riak_core_bucket:get_bucket(B), + {{Ctx, _V}, _Stats} = + riak_kv_crdt:value( + O, riak_kv_crdt:to_mod( + proplists:get_value( + datatype, BProps))), + [{{object_vclock, riak_object:encode_vclock( + riak_object:vclock(O))}, + {crdt_context, Ctx}}] + end.". + integration_test_() -> SetupFun = fun() -> %% Grab the riakclient_pb.proto file diff --git a/tools b/tools index 72939314..97bf3671 160000 --- a/tools +++ b/tools @@ -1 +1 @@ -Subproject commit 72939314ab3151db776fcc01c92c26f6ee3dc499 +Subproject commit 97bf3671b73478394279ecbc45a4a4d0cc7fed9a