diff --git a/apps/epl/src/epl_app.erl b/apps/epl/src/epl_app.erl index cc059cf..3b29568 100644 --- a/apps/epl/src/epl_app.erl +++ b/apps/epl/src/epl_app.erl @@ -348,7 +348,7 @@ set_log_level(Verbose) when is_integer(Verbose) -> true = ets:insert(epl_priv, {log_level, LogLevel}). start_distributed(Args) -> - application:set_env(kernel, dist_auto_connect, never), + %% application:set_env(kernel, dist_auto_connect, never), %% By default we start as -name erlangpl@127.0.0.1 %% if --sname or --name flag was passed, we start accordingly diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 993d6ac..d540f4d 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -26,7 +26,8 @@ -record(state, {subscribers = [], traffic = [], - msg_pass = #{}}). + msg_pass = #{}, + cluster_traffic = []}). %%%=================================================================== %%% API functions @@ -63,14 +64,22 @@ handle_cast({unsubscribe, Pid}, State = #state{subscribers = Subs}) -> handle_cast(Request, _State) -> exit({not_implemented, Request}). -handle_info({data, {Node, _Timestamp}, Proplist}, +handle_info({data, {Node, _Timestamp}, _Proplist}, State = #state{subscribers = Subs, traffic = OldTraffic, - msg_pass = OldMsgPass}) -> + msg_pass = OldMsgPass, + cluster_traffic = OldClusterTraffic}) -> {Viz1, NewTraffic} = update_traffic_graph(Node, OldTraffic, epl_viz_map:new(Node)), + NewClusterTraffic = get_cluster_traffic_counters(nodes()), + TrafficDelta = compute_delta(NewClusterTraffic, OldClusterTraffic), + + + Proplist = [{send, TrafficDelta}], + io:format("Old Traffic ~p~n~n", [OldClusterTraffic]), + io:format("New Delta ~p~n~n", [TrafficDelta]), %% We're starting from observed node which is our graph entry point Viz2 = get_message_passing_counters(Node, Proplist, @@ -80,10 +89,58 @@ handle_info({data, {Node, _Timestamp}, Proplist}, JSON = epl_json:encode(Viz2, <<"traffic-info">>), [Pid ! {data, JSON} || Pid <- Subs], - {noreply, State#state{traffic = NewTraffic}}; + {noreply, State#state{traffic = NewTraffic, + cluster_traffic = NewClusterTraffic}}; handle_info(Request, _State) -> exit({not_implemented, Request}). +compute_delta(NewClusterTraffic, OldClusterTraffic) -> + lists:map( + fun({Key, In, Out}) -> + case lists:keyfind(Key, 1, OldClusterTraffic) of + {_, OldIn, OldOut} -> {Key, In-OldIn, Out-OldOut}; + false -> {Key, 0, 0} + end + end, + NewClusterTraffic). + +get_cluster_traffic_counters(Nodes) -> + NodesInfo = [{Node, element(2, rpc:call(Node, net_kernel, nodes_info, []))} + || Node <- Nodes], + + %% Example of NodesInfo: + %% {'michal@127.0.0.1', + %% [{'bartosz@127.0.0.1', + %% [{owner,<13641.75.0>}, + %% {state,up}, + %% {address, + %% {net_address, + %% {{127,0,0,1},49358}, + %% "127.0.0.1",tcp,inet}}, + %% {type,normal}, + %% {in,38}, + %% {out,38}]}, + %% {'erlangpl@127.0.0.1', + %% [{owner,<13641.144.0>}, + %% {state,up}, + %% {address, + %% {net_address, + %% {{127,0,0,1},49404}, + %% "127.0.0.1",tcp,inet}}, + %% {type,normal}, + %% {in,32}, + %% {out,22}]}]} + DataFlowInfo = lists:flatten( + lists:map(fun({NodeA, NeighboursInfo}) -> + [{{NodeA, NodeB}, NodeIn, NodeOut} || + {NodeB, [_, _, _, _,{in,NodeIn}, {out,NodeOut}]} + <- NeighboursInfo] + end, NodesInfo)), + Self = node(), + lists:filter(fun({{NodeA, NodeB}, _, _}) + when NodeA =/= Self andalso NodeB =/= Self -> true; + (_) -> false + end, DataFlowInfo). terminate(_Reason, _State) -> ok. @@ -99,7 +156,7 @@ get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) -> fun({send, Send}, V) -> %% Examples of send trace: %% {{global_name_server,<13104.13.0>},0,1} - %% {#Port<13104.431>,<13104.28.0>},0,72} + %% {{#Port<13104.431>,<13104.28.0>},0,72} %% {{<13104.12.0>,{alias,'erlangpl@127.0.0.1'}},2,0} update_message_passing_graph(Node, Send, V, OldMsgPass); (_, V) ->