diff --git a/include/riakc.hrl b/include/riakc.hrl
index 66bc9adb..eb6a983c 100644
--- a/include/riakc.hrl
+++ b/include/riakc.hrl
@@ -24,7 +24,7 @@
-define(PROTO_MAJOR, 1).
-define(PROTO_MINOR, 0).
-define(DEFAULT_PB_TIMEOUT, 60000).
--define(FIRST_RECONNECT_INTERVAL, 100).
+-define(FIRST_RECONNECT_INTERVAL, 10).
-define(MAX_RECONNECT_INTERVAL, 30000).
-type client_option() :: queue_if_disconnected |
diff --git a/src/riakc_obj.erl b/src/riakc_obj.erl
index 14d063cf..832616ef 100644
--- a/src/riakc_obj.erl
+++ b/src/riakc_obj.erl
@@ -67,7 +67,8 @@
clear_links/1,
delete_links/2,
set_link/2,
- add_link/2
+ add_link/2,
+ is_riakc_obj/1
]).
%% Internal library use only
-export([new_obj/4,index_id_to_bin/1]).
@@ -575,6 +576,9 @@ add_link(MD, [{T, IdList} | Rest]) ->
add_link(MD2, Rest)
end.
+is_riakc_obj(#riakc_obj{}) -> true;
+is_riakc_obj(_) -> false.
+
%% @doc INTERNAL USE ONLY. Set the contents of riakc_obj to the
%% {Metadata, Value} pairs in MVs. Normal clients should use the
%% set_update_[value|metadata]() + apply_updates() method for changing
diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl
index c8f21d50..4281c7d9 100644
--- a/src/riakc_pb_socket.erl
+++ b/src/riakc_pb_socket.erl
@@ -42,6 +42,7 @@
set_options/2, set_options/3,
is_connected/1, is_connected/2,
ping/1, ping/2,
+ queue_len/1,
get_client_id/1, get_client_id/2,
set_client_id/2, set_client_id/3,
get_server_info/1, get_server_info/2,
@@ -95,6 +96,7 @@
-deprecated({get_index,'_', eventually}).
+-type timeout2() :: timeout() | {timeout(), timeout()}.
-type ctx() :: any().
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().
-type rpb_resp() :: atom() | tuple().
@@ -141,6 +143,7 @@
transport = gen_tcp :: 'gen_tcp' | 'ssl',
active :: #request{} | undefined, % active request
queue :: queue() | undefined, % queue of pending requests
+ queue_len=0 :: non_neg_integer(), % queue size
connects=0 :: non_neg_integer(), % number of successful connects
failed=[] :: [connection_failure()], % breakdown of failed connects
connect_timeout=infinity :: timeout(), % timeout of TCP connection
@@ -231,6 +234,9 @@ ping(Pid) ->
ping(Pid, Timeout) ->
call_infinity(Pid, {req, rpbpingreq, Timeout}).
+queue_len(Pid) ->
+ call_infinity(Pid, {check, queue_len}).
+
%% @doc Get the client id for this connection
%% @equiv get_client_id(Pid, default_timeout(get_client_id_timeout))
-spec get_client_id(pid()) -> {ok, client_id()} | {error, term()}.
@@ -276,7 +282,7 @@ get(Pid, Bucket, Key) ->
%% @doc Get bucket/key from the server specifying timeout.
%% Will return {error, notfound} if the key is not on the server.
%% @equiv get(Pid, Bucket, Key, Options, Timeout)
--spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout() | get_options()) ->
+-spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | get_options()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
get(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
get(Pid, Bucket, Key, [], Timeout);
@@ -287,7 +293,7 @@ get(Pid, Bucket, Key, Options) ->
%% unchanged will be returned when the
%% {if_modified, Vclock} option is specified and the
%% object is unchanged.
--spec get(pid(), bucket(), key(), get_options(), timeout()) ->
+-spec get(pid(), bucket(), key(), get_options(), timeout2()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
get(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
@@ -305,7 +311,7 @@ put(Pid, Obj) ->
%% @doc Put the metadata/value in the object under bucket/key with options or timeout.
%% @equiv put(Pid, Obj, Options, Timeout)
%% @see put/4
--spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout() | put_options()) ->
+-spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout2() | put_options()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
put(Pid, Obj, [], Timeout);
@@ -323,7 +329,7 @@ put(Pid, Obj, Options) ->
%% `return_body' was specified.
%% @throws siblings
%% @end
--spec put(pid(), riakc_obj(), put_options(), timeout()) ->
+-spec put(pid(), riakc_obj(), put_options(), timeout2()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Options, Timeout) ->
Content = riak_pb_kv_codec:encode_content({riakc_obj:get_update_metadata(Obj),
@@ -344,7 +350,7 @@ delete(Pid, Bucket, Key) ->
%% @doc Delete the key/value specifying timeout or options. Note that the rw quorum is deprecated, use r and w.
%% @equiv delete(Pid, Bucket, Key, Options, Timeout)
--spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout() | delete_options()) ->
+-spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | delete_options()) ->
ok | {error, term()}.
delete(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
delete(Pid, Bucket, Key, [], Timeout);
@@ -352,7 +358,7 @@ delete(Pid, Bucket, Key, Options) ->
delete(Pid, Bucket, Key, Options, default_timeout(delete_timeout)).
%% @doc Delete the key/value with options and timeout. Note that the rw quorum is deprecated, use r and w.
--spec delete(pid(), bucket(), key(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete(pid(), bucket(), key(), delete_options(), timeout2()) -> ok | {error, term()}.
delete(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
Req = delete_options(Options, #rpbdelreq{type = T, bucket = B, key = Key}),
@@ -406,7 +412,7 @@ delete_obj(Pid, Obj, Options) ->
%% @doc Delete the riak object with options and timeout.
%% @equiv delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout)
%% @see delete_vclock/6
--spec delete_obj(pid(), riakc_obj(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete_obj(pid(), riakc_obj(), delete_options(), timeout2()) -> ok | {error, term()}.
delete_obj(Pid, Obj, Options, Timeout) ->
delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj),
riakc_obj:vclock(Obj), Options, Timeout).
@@ -1100,7 +1106,7 @@ cs_bucket_fold(Pid, Bucket, Opts) when is_pid(Pid), (is_binary(Bucket) orelse
%% @doc Return the default timeout for an operation if none is provided.
%% Falls back to the default timeout.
--spec default_timeout(timeout_name()) -> timeout().
+-spec default_timeout(timeout_name()) -> timeout2().
default_timeout(OpTimeout) ->
case application:get_env(riakc, OpTimeout) of
{ok, EnvTimeout} ->
@@ -1219,6 +1225,7 @@ modify_type(Pid, Fun, BucketAndType, Key, Options) ->
init([Address, Port, Options]) ->
%% Schedule a reconnect as the first action. If the server is up then
%% the handle_info(reconnect) will run before any requests can be sent.
+ process_flag(trap_exit,true),
State = parse_options(Options, #state{address = Address,
port = Port,
queue = queue:new()}),
@@ -1228,8 +1235,8 @@ init([Address, Port, Options]) ->
{ok, State};
false ->
case connect(State) of
- {error, Reason} ->
- {stop, {tcp, Reason}};
+ {error, _Reason} ->
+ {stop, normal};
Ok ->
Ok
end
@@ -1267,26 +1274,30 @@ handle_call(is_connected, _From, State) ->
end;
handle_call({set_options, Options}, _From, State) ->
{reply, ok, parse_options(Options, State)};
+handle_call({check, queue_len}, _From, #state{queue_len = QueueLen} = State) ->
+ {reply, QueueLen, State};
handle_call(stop, _From, State) ->
- _ = disconnect(State),
- {stop, normal, ok, State}.
+ disconnect(State, false),
+ {stop, normal, ok, State};
+handle_call(get_state, _From, State) ->
+ {reply, State, State}.
%% @private
handle_info({tcp_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client TCP error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({tcp_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client SSL error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
%% Make sure the two Sock's match. If a request timed out, but there was
%% a response queued up behind it we do not want to process it. Instead
@@ -1311,7 +1322,7 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
%% Send reply and get ready for the next request - send the next request
%% if one is queued up
cancel_req_timer(Active#request.tref),
- _ = send_caller(Response, NewState0#state.active),
+ send_caller(Response, NewState0#state.active),
dequeue_request(NewState0#state{active = undefined});
{pending, NewState0} -> %% Request is still pending - do not queue up a new one
NewActive = restart_req_timer(Active),
@@ -1325,19 +1336,13 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
ok = ssl:setopts(Sock, [{active, once}])
end,
{noreply, NewState};
-handle_info({req_timeout, Ref}, State) ->
- case State#state.active of %%
- undefined ->
- {noreply, remove_queued_request(Ref, State)};
- Active ->
- case Ref == Active#request.ref of
- true -> %% Matches the current operation
- NewState = maybe_reply(on_timeout(State#state.active, State)),
- disconnect(NewState#state{active = undefined});
- false ->
- {noreply, remove_queued_request(Ref, State)}
- end
- end;
+handle_info({TimeoutTag, Ref}, #state{active = #request{ref = Ref}} = State)
+ when TimeoutTag == op_timeout; TimeoutTag == req_timeout ->
+ NewState = maybe_reply(on_timeout(State#state.active, State)),
+ disconnect(NewState#state{active = undefined}, false);
+handle_info({TimeoutTag, Ref}, State)
+ when TimeoutTag == q_timeout; TimeoutTag == req_timeout ->
+ {noreply, remove_queued_request(Ref, State)};
handle_info(reconnect, State) ->
case connect(State) of
{ok, NewState} ->
@@ -1345,7 +1350,7 @@ handle_info(reconnect, State) ->
{error, Reason} ->
%% Update the failed count and reschedule a reconnection
NewState = State#state{failed = orddict:update_counter(Reason, 1, State#state.failed)},
- disconnect(NewState)
+ disconnect(NewState, true)
end;
handle_info(_, State) ->
{noreply, State}.
@@ -1634,7 +1639,7 @@ process_response(#request{msg = #rpbdelreq{}},
process_response(#request{msg = #rpblistbucketsreq{}}=Request,
#rpblistbucketsresp{buckets = Buckets, done = undefined},
State) ->
- _ = send_caller({buckets, Buckets}, Request),
+ send_caller({buckets, Buckets}, Request),
{pending, State};
process_response(#request{msg = #rpblistbucketsreq{}},
@@ -1644,13 +1649,13 @@ process_response(#request{msg = #rpblistbucketsreq{}},
process_response(#request{msg = #rpblistkeysreq{}}=Request,
#rpblistkeysresp{done = Done, keys = Keys}, State) ->
- _ = case Keys of
- undefined ->
- ok;
- _ ->
- %% Have to directly use send_caller as may want to reply with done below.
- send_caller({keys, Keys}, Request)
- end,
+ case Keys of
+ undefined ->
+ ok;
+ _ ->
+ %% Have to directly use send_caller as may want to reply with done below.
+ send_caller({keys, Keys}, Request)
+ end,
case Done of
true ->
{reply, done, State};
@@ -1678,13 +1683,13 @@ process_response(#request{msg = #rpbsetbuckettypereq{}},
process_response(#request{msg = #rpbmapredreq{content_type = ContentType}}=Request,
#rpbmapredresp{done = Done, phase=PhaseId, response=Data}, State) ->
- _ = case Data of
- undefined ->
- ok;
- _ ->
- Response = decode_mapred_resp(Data, ContentType),
- send_caller({mapred, PhaseId, Response}, Request)
- end,
+ case Data of
+ undefined ->
+ ok;
+ _ ->
+ Response = decode_mapred_resp(Data, ContentType),
+ send_caller({mapred, PhaseId, Response}, Request)
+ end,
case Done of
true ->
{reply, done, State};
@@ -1698,7 +1703,7 @@ process_response(#request{msg = #rpbindexreq{}}, rpbindexresp, State) ->
process_response(#request{msg = #rpbindexreq{stream=true, return_terms=Terms}}=Request,
#rpbindexresp{results=Results, keys=Keys, done=Done, continuation=Cont}, State) ->
ToSend = process_index_response(Terms, Keys, Results),
- _ = send_caller(ToSend, Request),
+ send_caller(ToSend, Request),
DoneResponse = {reply, {done, Cont}, State},
case Done of
true -> DoneResponse;
@@ -1725,7 +1730,7 @@ process_response(#request{msg = #rpbcsbucketreq{bucket=Bucket}}=Request, #rpbcsb
Objects),
{ok, CObjects}
end,
- _ = send_caller(ToSend, Request),
+ send_caller(ToSend, Request),
DoneResponse = {reply, {done, Cont}, State},
case Done of
true -> DoneResponse;
@@ -1925,6 +1930,8 @@ create_req_timer(infinity, _Ref) ->
undefined;
create_req_timer(undefined, _Ref) ->
undefined;
+create_req_timer({Msecs,_}, Ref) ->
+ erlang:send_after(Msecs, self(), {q_timeout, Ref});
create_req_timer(Msecs, Ref) ->
erlang:send_after(Msecs, self(), {req_timeout, Ref}).
@@ -1933,7 +1940,7 @@ create_req_timer(Msecs, Ref) ->
cancel_req_timer(undefined) ->
ok;
cancel_req_timer(Tref) ->
- _ = erlang:cancel_timer(Tref),
+ erlang:cancel_timer(Tref),
ok.
%% @private
@@ -2032,14 +2039,14 @@ start_auth(State=#state{credentials={User,Pass}, sock=Sock}) ->
%% @private
%% Disconnect socket if connected
-disconnect(State) ->
+disconnect(State, DelayReconnect) ->
%% Tell any pending requests we've disconnected
- _ = case State#state.active of
- undefined ->
- ok;
- Request ->
- send_caller({error, disconnected}, Request)
- end,
+ case State#state.active of
+ undefined ->
+ ok;
+ Request ->
+ send_caller({error, disconnected}, Request)
+ end,
%% Make sure the connection is really closed
case State#state.sock of
@@ -2052,13 +2059,16 @@ disconnect(State) ->
%% Decide whether to reconnect or exit
NewState = State#state{sock = undefined, active = undefined},
- case State#state.auto_reconnect of
- true ->
+ case {State#state.auto_reconnect, DelayReconnect} of
+ {true, true} ->
%% Schedule the reconnect message and return state
erlang:send_after(State#state.reconnect_interval, self(), reconnect),
{noreply, increase_reconnect_interval(NewState)};
- false ->
- {stop, disconnected, NewState}
+ {true, false} ->
+ self() ! reconnect,
+ {noreply, NewState};
+ {false, _} ->
+ {stop, normal, NewState}
end.
%% Double the reconnect interval up to the maximum
@@ -2073,16 +2083,26 @@ increase_reconnect_interval(State) ->
%% Send a request to the server and prepare the state for the response
%% @private
-send_request(Request0, State) when State#state.active =:= undefined ->
- {Request, Pkt} = encode_request_message(Request0),
+send_request(#request{ref = Ref,
+ tref = TRef,
+ timeout = Timeout} = Request0, State)
+ when State#state.active =:= undefined ->
+ {Request1, Pkt} = encode_request_message(Request0),
Transport = State#state.transport,
case Transport:send(State#state.sock, Pkt) of
ok ->
- maybe_reply(after_send(Request, State#state{active = Request}));
+ case Timeout of
+ {_,Msecs} ->
+ cancel_req_timer(TRef),
+ Request2 = Request1#request{tref = erlang:send_after(Msecs, self(), {op_timeout, Ref})},
+ maybe_reply(after_send(Request2, State#state{active = Request2}));
+ _ ->
+ maybe_reply(after_send(Request1, State#state{active = Request1}))
+ end;
{error, Reason} ->
error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]),
Transport:close(State#state.sock),
- maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
+ maybe_enqueue_and_reconnect(Request1, State#state{sock=undefined})
end.
%% Already encoded (for tunneled messages), but must provide Message Id
@@ -2107,37 +2127,44 @@ maybe_reconnect(_) -> ok.
%% If we can queue while disconnected, do so, otherwise tell the
%% caller that the socket was disconnected.
enqueue_or_reply_error(Request, #state{queue_if_disconnected=true}=State) ->
- queue_request(Request, State);
+ case Request#request.timeout of
+ {_,_} -> send_caller({error, timeout}, Request); % we've already used part of the op timeout
+ _ -> queue_request_head(Request, State)
+ end;
enqueue_or_reply_error(Request, State) ->
- _ = send_caller({error, disconnected}, Request),
+ send_caller({error, disconnected}, Request),
State.
%% Queue up a request if one is pending
%% @private
-queue_request(Request, State) ->
- State#state{queue = queue:in(Request, State#state.queue)}.
+queue_request(Request, State) -> queue_request(Request, State, in).
+queue_request_head(Request, State) -> queue_request(Request, State, in_r).
+queue_request(Request, #state{queue_len = QLen, queue = Q} = State, Infunc) ->
+ State#state{queue_len = QLen + 1, queue = queue:Infunc(Request, Q)}.
%% Try and dequeue request and send onto the server if one is waiting
%% @private
-dequeue_request(State) ->
+dequeue_request(#state{queue_len = QLen} = State) ->
case queue:out(State#state.queue) of
{empty, _} ->
- State;
+ State#state{active = undefined};
{{value, Request}, Q2} ->
- send_request(Request, State#state{queue = Q2})
+ send_request(Request, State#state{active = undefined,
+ queue_len = QLen - 1,
+ queue = Q2})
end.
%% Remove a queued request by reference - returns same queue if ref not present
%% @private
-remove_queued_request(Ref, State) ->
- L = queue:to_list(State#state.queue),
- case lists:keytake(Ref, #request.ref, L) of
+remove_queued_request(Ref, #state{queue_len = QLen, queue = Q} = State) ->
+ case lists:keytake(Ref, #request.ref, queue:to_list(Q)) of
false -> % Ref not queued up
- State;
+ State;
{value, Req, L2} ->
{reply, Reply, NewState} = on_timeout(Req, State),
- _ = send_caller(Reply, Req),
- NewState#state{queue = queue:from_list(L2)}
+ send_caller(Reply, Req),
+ NewState#state{queue_len = QLen - 1,
+ queue = queue:from_list(L2)}
end.
%% @private
@@ -2478,7 +2505,7 @@ maybe_start_network() ->
bad_connect_test() ->
%% Start with an unlikely port number
- ?assertEqual({error, {tcp, econnrefused}}, start({127,0,0,1}, 65535)).
+ ?assertEqual({error, normal}, start({127,0,0,1}, 65535)).
queue_disconnected_test() ->
%% Start with an unlikely port number
@@ -2898,7 +2925,7 @@ live_node_tests() ->
%% Would really like this in a nested {setup, blah} structure
%% but eunit does not allow
{ok, Pid} = start_link(test_ip(), test_port()),
- Pid ! {req_timeout, make_ref()},
+ Pid ! {q_timeout, make_ref()},
?assertEqual(pong, ping(Pid))
end)},
@@ -3812,4 +3839,195 @@ live_node_tests() ->
end)}
].
+timeout_no_conn_test() ->
+ {ok, Pid} = start_link(test_ip(), 65225, [auto_reconnect, queue_if_disconnected]),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {150,10}), timer:sleep(1),
+ P02 = REQ(get, {100,10}), timer:sleep(1),
+ P03 = REQ(get, {150,10}), timer:sleep(1),
+ P04 = REQ(get, {100,10}), timer:sleep(1),
+ P05 = REQ(get, {150,10}), timer:sleep(1),
+ P06 = REQ(get, {100,10}), timer:sleep(1),
+ P07 = REQ(get, {150,10}), timer:sleep(1),
+ P08 = REQ(get, {100,10}), timer:sleep(1),
+ P09 = REQ(get, {150,10}), timer:sleep(1),
+ P10 = REQ(get, 20), timer:sleep(1),
+ P11 = REQ(get, 40), timer:sleep(1),
+ P12 = REQ(get, 60), timer:sleep(1),
+ P13 = REQ(get, 80), timer:sleep(1),
+ P14 = REQ(get, 20), timer:sleep(1),
+ P15 = REQ(get, 100), timer:sleep(250), 0 = queue_len(Pid),
+ P16 = REQ(get, {20,100}), timer:sleep(1),
+ P17 = REQ(get, {20,100}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+
+ io:format(user, "~n150 TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]),
+ io:format(user, "~n100 TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]),
+ lists:foreach(fun(T) -> true = T > 145, true = T < 155 end, [T01,T03,T05,T07,T09]),
+ lists:foreach(fun(T) -> true = T > 95, true = T < 105 end, [T02,T04,T06,T08]),
+ io:format(user, "~nTIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]),
+ true = T10 > 19, true = T10 < 24,
+ true = T11 > 39, true = T11 < 44,
+ true = T12 > 59, true = T12 < 64,
+ true = T13 > 79, true = T13 < 84,
+ true = T14 > 19, true = T14 < 24,
+ true = T15 > 99, true = T15 < 104,
+ true = T16 > 19, true = T16 < 24,
+ true = T17 > 19, true = T17 < 24,
+
+ stop(Pid).
+
+timeout_conn_test() ->
+ %% Set up a dummy socket to send requests on
+ {ok, DummyServerPid, Port} = dummy_server(),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ erlang:monitor(process, DummyServerPid),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {100, 20}), timer:sleep(1),
+ P02 = REQ(get, {100, 20}), timer:sleep(1),
+ P03 = REQ(get, {100, 20}), timer:sleep(1),
+ P04 = REQ(get, {100, 20}), timer:sleep(1),
+ P05 = REQ(get, {100, 20}), timer:sleep(1),
+ P06 = REQ(get, {100, 20}), timer:sleep(1),
+ P07 = REQ(get, {100, 20}), timer:sleep(1),
+ P08 = REQ(get, {100, 20}), timer:sleep(1),
+ P09 = REQ(get, {100, 20}), timer:sleep(1),
+ P10 = REQ(get, {100, 20}), timer:sleep(1),
+ P11 = REQ(get, 20), timer:sleep(1),
+ P12 = REQ(get, 40), timer:sleep(1),
+ P13 = REQ(get, 60), timer:sleep(1),
+ P14 = REQ(get, 80), timer:sleep(1),
+ P15 = REQ(get, 20), timer:sleep(1),
+ P16 = REQ(get, 100), timer:sleep(200), 0 = queue_len(Pid),
+ P17 = REQ(get, {20, 100}), timer:sleep(1),
+ P18 = REQ(get, {20, 100}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+ {T18, {error, timeout}} = RES(P18),
+
+ io:format(user, "~nTIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n",
+ [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]),
+ true = T01 > 19, true = T01 < 24,
+ true = T02 > 39, true = T02 < 47,
+ true = T03 > 59, true = T03 < 70,
+ true = T04 > 79, true = T04 < 93,
+ true = T05 > 99, true = T05 < 117,
+ lists:foreach(fun(T) -> true = T > 99, true = T < 125 end, [T06,T07,T08,T09,T10]),
+ true = T11 > 19, true = T11 < 24,
+ true = T12 > 39, true = T12 < 44,
+ true = T13 > 59, true = T13 < 64,
+ true = T14 > 79, true = T14 < 84,
+ true = T15 > 19, true = T15 < 24,
+ true = T16 > 99, true = T16 < 104,
+ true = T17 > 99, true = T17 < 104,
+ true = T18 > 19, true = T18 < 24,
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "~nMSG: ~p", [_Msg])
+ after 1 -> ok % io:format(user, "~nNO MSG: ~p", [process_info(DummyServerPid, messages)])
+ end,
+
+ stop(Pid).
+
+dummy_server() ->
+ {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]),
+ {ok, Port} = inet:port(Listen),
+ Pid = spawn(?MODULE, dummy_server, [{Listen, no_conn}]),
+ {ok, Pid, Port}.
+
+dummy_server({Listen, no_conn}) ->
+ {ok, Sock} = gen_tcp:accept(Listen),
+ dummy_server({Listen, Sock});
+dummy_server({Listen, Sock}) ->
+ receive
+ stop -> ok;
+ {tcp_closed, Sock} -> dummy_server({Listen, no_conn});
+ _Data -> dummy_server({Listen, Sock}) % ignore requests, let them timeout
+ end.
+
+all_tests() ->
+ % acer(),
+ lists:foreach(
+ fun(TestFun) -> ok = apply(?MODULE, TestFun, []) end,
+ [bad_connect_test,
+ queue_disconnected_test,
+ auto_reconnect_bad_connect_test,
+ server_closes_socket_test,
+ auto_reconnect_server_closes_socket_test,
+ dead_socket_pid_returns_to_caller_test,
+ increase_reconnect_interval_test,
+ timeout_conn_test,
+ timeout_no_conn_test
+ ]).
+
-endif.