Update mod_multicast to work with ejabberd 14.12

This commit is contained in:
Badlop 2015-03-12 13:41:44 +01:00 committed by Christophe Romain
parent 416e203e66
commit f4fb0d7c7a
2 changed files with 103 additions and 61 deletions

View File

@ -45,6 +45,7 @@
set_aux_field/3, set_aux_field/3,
del_aux_field/2, del_aux_field/2,
get_subscription/2, get_subscription/2,
send_filtered/5,
broadcast/4, broadcast/4,
get_subscribed/1, get_subscribed/1,
transform_listen_option/2]). transform_listen_option/2]).
@ -94,12 +95,12 @@
tls_options = [], tls_options = [],
authenticated = false, authenticated = false,
jid, jid,
user = "", server = <<"">>, resource = <<"">>, user = <<"">>, server = <<"">>, resource = <<"">>,
sid, sid,
pres_t = ?SETS:new(), pres_t = ?SETS:new(),
pres_f = ?SETS:new(), pres_f = ?SETS:new(),
pres_a = ?SETS:new(), pres_a = ?SETS:new(),
pres_last, pres_pri, pres_last,
pres_timestamp, pres_timestamp,
privacy_list = #userlist{}, privacy_list = #userlist{},
conn = unknown, conn = unknown,
@ -247,6 +248,9 @@ get_subscription(LFrom, StateData) ->
true -> none true -> none
end. end.
send_filtered(FsmRef, Feature, From, To, Packet) ->
FsmRef ! {send_filtered, Feature, From, To, Packet}.
broadcast(FsmRef, Type, From, Packet) -> broadcast(FsmRef, Type, From, Packet) ->
FsmRef ! {broadcast, Type, From, Packet}. FsmRef ! {broadcast, Type, From, Packet}.
@ -1690,12 +1694,23 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs), jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
SentStateData = send_packet(NewState, FixedPacket), FinalState =
case ejabberd_hooks:run_fold(c2s_filter_packet_in,
NewState#state.server, FixedPacket,
[NewState#state.jid, From, To])
of
drop ->
NewState;
FinalPacket = #xmlel{} ->
SentState = send_packet(NewState, FinalPacket),
ejabberd_hooks:run(user_receive_packet, ejabberd_hooks:run(user_receive_packet,
SentStateData#state.server, SentState#state.server,
[SentStateData#state.jid, From, To, FixedPacket]), [SentState#state.jid, From, To,
FinalPacket]),
SentState
end,
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, SentStateData); fsm_next_state(StateName, FinalState);
true -> true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState) fsm_next_state(StateName, NewState)
@ -1738,6 +1753,32 @@ handle_info({force_update_presence, LUser}, StateName,
_ -> StateData _ -> StateData
end, end,
fsm_next_state(StateName, NewStateData); fsm_next_state(StateName, NewStateData);
handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) ->
Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server,
true, [StateData#state.server, StateData,
Feature, To, Packet]),
NewStateData = if Drop ->
?DEBUG("Dropping packet from ~p to ~p",
[jlib:jid_to_string(From),
jlib:jid_to_string(To)]),
StateData;
true ->
FinalPacket = jlib:replace_from_to(From, To, Packet),
case StateData#state.jid of
To ->
case privacy_check_packet(StateData, From, To,
FinalPacket, in) of
deny ->
StateData;
allow ->
send_stanza(StateData, FinalPacket)
end;
_ ->
ejabberd_router:route(From, To, FinalPacket),
StateData
end
end,
fsm_next_state(StateName, NewStateData);
handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
Recipients = ejabberd_hooks:run_fold( Recipients = ejabberd_hooks:run_fold(
c2s_broadcast_recipients, StateData#state.server, c2s_broadcast_recipients, StateData#state.server,
@ -2008,13 +2049,9 @@ process_presence_probe(From, To, StateData) ->
?SETS:is_element(LBFrom, StateData#state.pres_f)), ?SETS:is_element(LBFrom, StateData#state.pres_f)),
if if
Cond -> Cond ->
Timestamp = StateData#state.pres_timestamp, %% To is the one sending the presence (the probe target)
Packet = xml:append_subtags( Packet = jlib:add_delay_info(StateData#state.pres_last, To,
StateData#state.pres_last, StateData#state.pres_timestamp),
%% To is the one sending the presence (the target of the probe)
[jlib:timestamp_to_xml(Timestamp, utc, To, <<"">>),
%% TODO: Delete the next line once XEP-0091 is Obsolete
jlib:timestamp_to_xml(Timestamp)]),
case privacy_check_packet(StateData, To, From, Packet, out) of case privacy_check_packet(StateData, To, From, Packet, out) of
deny -> deny ->
ok; ok;
@ -2066,12 +2103,11 @@ presence_update(From, Packet, StateData) ->
OldPresence -> get_priority_from_presence(OldPresence) OldPresence -> get_priority_from_presence(OldPresence)
end, end,
NewPriority = get_priority_from_presence(Packet), NewPriority = get_priority_from_presence(Packet),
Timestamp = calendar:now_to_universal_time(now()),
update_priority(NewPriority, Packet, StateData), update_priority(NewPriority, Packet, StateData),
FromUnavail = (StateData#state.pres_last == undefined), FromUnavail = (StateData#state.pres_last == undefined),
?DEBUG("from unavail = ~p~n", [FromUnavail]), ?DEBUG("from unavail = ~p~n", [FromUnavail]),
NewStateData = StateData#state{pres_last = Packet, NewStateData = StateData#state{pres_last = Packet,
pres_timestamp = Timestamp}, pres_timestamp = now()},
NewState = if FromUnavail -> NewState = if FromUnavail ->
ejabberd_hooks:run(user_available_hook, ejabberd_hooks:run(user_available_hook,
NewStateData#state.server, NewStateData#state.server,
@ -2757,7 +2793,10 @@ handle_resume(StateData, Attrs) ->
{<<"h">>, AttrH}, {<<"h">>, AttrH},
{<<"previd">>, AttrId}], {<<"previd">>, AttrId}],
children = []}), children = []}),
SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, SendFun = fun(_F, _T, El, Time) ->
NewEl = add_resent_delay_info(NewState, El, Time),
send_element(NewState, NewEl)
end,
handle_unacked_stanzas(NewState, SendFun), handle_unacked_stanzas(NewState, SendFun),
send_element(NewState, send_element(NewState,
#xmlel{name = <<"r">>, #xmlel{name = <<"r">>,
@ -2813,13 +2852,13 @@ mgmt_queue_add(StateData, El) ->
Num -> Num ->
Num + 1 Num + 1
end, end,
NewQueue = queue:in({NewNum, El}, StateData#state.mgmt_queue), NewQueue = queue:in({NewNum, now(), El}, StateData#state.mgmt_queue),
NewState = StateData#state{mgmt_queue = NewQueue, NewState = StateData#state{mgmt_queue = NewQueue,
mgmt_stanzas_out = NewNum}, mgmt_stanzas_out = NewNum},
check_queue_length(NewState). check_queue_length(NewState).
mgmt_queue_drop(StateData, NumHandled) -> mgmt_queue_drop(StateData, NumHandled) ->
NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end,
StateData#state.mgmt_queue), StateData#state.mgmt_queue),
StateData#state{mgmt_queue = NewQueue}. StateData#state{mgmt_queue = NewQueue}.
@ -2847,12 +2886,12 @@ handle_unacked_stanzas(StateData, F)
?INFO_MSG("~B stanzas were not acknowledged by ~s", ?INFO_MSG("~B stanzas were not acknowledged by ~s",
[N, jlib:jid_to_string(StateData#state.jid)]), [N, jlib:jid_to_string(StateData#state.jid)]),
lists:foreach( lists:foreach(
fun({_, #xmlel{attrs = Attrs} = El}) -> fun({_, Time, #xmlel{attrs = Attrs} = El}) ->
From_s = xml:get_attr_s(<<"from">>, Attrs), From_s = xml:get_attr_s(<<"from">>, Attrs),
From = jlib:string_to_jid(From_s), From = jlib:string_to_jid(From_s),
To_s = xml:get_attr_s(<<"to">>, Attrs), To_s = xml:get_attr_s(<<"to">>, Attrs),
To = jlib:string_to_jid(To_s), To = jlib:string_to_jid(To_s),
F(From, To, El) F(From, To, El, Time)
end, queue:to_list(Queue)) end, queue:to_list(Queue))
end; end;
handle_unacked_stanzas(_StateData, _F) -> handle_unacked_stanzas(_StateData, _F) ->
@ -2871,16 +2910,19 @@ handle_unacked_stanzas(StateData)
end, end,
ReRoute = case ResendOnTimeout of ReRoute = case ResendOnTimeout of
true -> true ->
fun ejabberd_router:route/3; fun(From, To, El, Time) ->
NewEl = add_resent_delay_info(StateData, El, Time),
ejabberd_router:route(From, To, NewEl)
end;
false -> false ->
fun(From, To, El) -> fun(From, To, El, _Time) ->
Err = Err =
jlib:make_error_reply(El, jlib:make_error_reply(El,
?ERR_SERVICE_UNAVAILABLE), ?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err) ejabberd_router:route(To, From, Err)
end end
end, end,
F = fun(From, To, El) -> F = fun(From, To, El, Time) ->
%% We'll drop the stanza if it was <forwarded/> by some %% We'll drop the stanza if it was <forwarded/> by some
%% encapsulating protocol as per XEP-0297. One such protocol is %% encapsulating protocol as per XEP-0297. One such protocol is
%% XEP-0280, which says: "When a receiving server attempts to %% XEP-0280, which says: "When a receiving server attempts to
@ -2893,7 +2935,7 @@ handle_unacked_stanzas(StateData)
?DEBUG("Dropping forwarded stanza from ~s", ?DEBUG("Dropping forwarded stanza from ~s",
[xml:get_attr_s(<<"from">>, El#xmlel.attrs)]); [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]);
false -> false ->
ReRoute(From, To, El) ReRoute(From, To, El, Time)
end end
end, end,
handle_unacked_stanzas(StateData, F); handle_unacked_stanzas(StateData, F);
@ -2956,7 +2998,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
pres_f = OldStateData#state.pres_f, pres_f = OldStateData#state.pres_f,
pres_a = OldStateData#state.pres_a, pres_a = OldStateData#state.pres_a,
pres_last = OldStateData#state.pres_last, pres_last = OldStateData#state.pres_last,
pres_pri = OldStateData#state.pres_pri,
pres_timestamp = OldStateData#state.pres_timestamp, pres_timestamp = OldStateData#state.pres_timestamp,
privacy_list = OldStateData#state.privacy_list, privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields, aux_fields = OldStateData#state.aux_fields,
@ -2985,6 +3026,9 @@ make_resume_id(StateData) ->
{Time, _} = StateData#state.sid, {Time, _} = StateData#state.sid,
jlib:term_to_base64({StateData#state.resource, Time}). jlib:term_to_base64({StateData#state.resource, Time}).
add_resent_delay_info(#state{server = From}, El, Time) ->
jlib:add_delay_info(El, From, Time, <<"Resent">>).
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% XEP-0352 %%% XEP-0352
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -3007,37 +3051,36 @@ csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData,
StateData2#state{csi_state = CsiState} StateData2#state{csi_state = CsiState}
end. end.
csi_queue_add(#state{csi_queue = Queue, server = Host} = StateData, csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) ->
#xmlel{children = Els} = Stanza) ->
From = xml:get_tag_attr_s(<<"from">>, Stanza),
Time = calendar:now_to_universal_time(os:timestamp()),
DelayTag = [jlib:timestamp_to_xml(Time, utc,
jlib:make_jid(<<"">>, Host, <<"">>),
<<"Client Inactive">>)],
NewStanza = Stanza#xmlel{children = Els ++ DelayTag},
case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of
true -> csi_queue_add(csi_queue_flush(StateData), NewStanza); true -> csi_queue_add(csi_queue_flush(StateData), Stanza);
false -> false ->
NewQueue = lists:keystore(From, 1, Queue, {From, NewStanza}), From = xml:get_tag_attr_s(<<"from">>, Stanza),
NewQueue = lists:keystore(From, 1, Queue, {From, now(), Stanza}),
StateData#state{csi_queue = NewQueue} StateData#state{csi_queue = NewQueue}
end. end.
csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState} = StateData, csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} =
From) -> StateData, From) ->
case lists:keytake(From, 1, Queue) of case lists:keytake(From, 1, Queue) of
{value, {From, Stanza}, NewQueue} -> {value, {From, Time, Stanza}, NewQueue} ->
NewStanza = jlib:add_delay_info(Stanza, Host, Time,
<<"Client Inactive">>),
NewStateData = send_stanza(StateData#state{csi_state = active}, NewStateData = send_stanza(StateData#state{csi_state = active},
Stanza), NewStanza),
NewStateData#state{csi_queue = NewQueue, csi_state = CsiState}; NewStateData#state{csi_queue = NewQueue, csi_state = CsiState};
false -> StateData false -> StateData
end. end.
csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID} = csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID,
StateData) -> server = Host} = StateData) ->
?DEBUG("Flushing CSI queue for ~s", [jlib:jid_to_string(JID)]), ?DEBUG("Flushing CSI queue for ~s", [jlib:jid_to_string(JID)]),
NewStateData = NewStateData =
lists:foldl(fun({_From, Stanza}, AccState) -> lists:foldl(fun({_From, Time, Stanza}, AccState) ->
send_stanza(AccState, Stanza) NewStanza =
jlib:add_delay_info(Stanza, Host, Time,
<<"Client Inactive">>),
send_stanza(AccState, NewStanza)
end, StateData#state{csi_state = active}, Queue), end, StateData#state{csi_state = active}, Queue),
NewStateData#state{csi_queue = [], csi_state = CsiState}. NewStateData#state{csi_queue = [], csi_state = CsiState}.

View File

@ -174,7 +174,7 @@ normal_state({route, From, <<"">>,
Now = now_to_usec(now()), Now = now_to_usec(now()),
MinMessageInterval = MinMessageInterval =
trunc(gen_mod:get_module_opt(StateData#state.server_host, trunc(gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, min_message_interval, fun(MMI) when is_integer(MMI) -> MMI end, 0) mod_muc, min_message_interval, fun(MMI) when is_number(MMI) -> MMI end, 0)
* 1000000), * 1000000),
Size = element_size(Packet), Size = element_size(Packet),
{MessageShaper, MessageShaperInterval} = {MessageShaper, MessageShaperInterval} =
@ -1509,15 +1509,17 @@ get_user_activity(JID, StateData) ->
store_user_activity(JID, UserActivity, StateData) -> store_user_activity(JID, UserActivity, StateData) ->
MinMessageInterval = MinMessageInterval =
gen_mod:get_module_opt(StateData#state.server_host, trunc(gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, min_message_interval, mod_muc, min_message_interval,
fun(I) when is_integer(I), I>=0 -> I end, fun(I) when is_number(I), I>=0 -> I end,
0), 0)
* 1000),
MinPresenceInterval = MinPresenceInterval =
gen_mod:get_module_opt(StateData#state.server_host, trunc(gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, min_presence_interval, mod_muc, min_presence_interval,
fun(I) when is_integer(I), I>=0 -> I end, fun(I) when is_number(I), I>=0 -> I end,
0), 0)
* 1000),
Key = jlib:jid_tolower(JID), Key = jlib:jid_tolower(JID),
Now = now_to_usec(now()), Now = now_to_usec(now()),
Activity1 = clean_treap(StateData#state.activity, Activity1 = clean_treap(StateData#state.activity,
@ -1548,8 +1550,8 @@ store_user_activity(JID, UserActivity, StateData) ->
100000), 100000),
Delay = lists:max([MessageShaperInterval, Delay = lists:max([MessageShaperInterval,
PresenceShaperInterval, PresenceShaperInterval,
MinMessageInterval * 1000, MinMessageInterval,
MinPresenceInterval * 1000]) MinPresenceInterval])
* 1000, * 1000,
Priority = {1, -(Now + Delay)}, Priority = {1, -(Now + Delay)},
StateData#state{activity = StateData#state{activity =
@ -2428,24 +2430,21 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) ->
false -> false; false -> false;
_ -> true _ -> true
end, end,
TimeStamp = calendar:now_to_universal_time(now()), TimeStamp = now(),
SenderJid = case SenderJid = case
(StateData#state.config)#config.anonymous (StateData#state.config)#config.anonymous
of of
true -> StateData#state.jid; true -> StateData#state.jid;
false -> FromJID false -> FromJID
end, end,
TSPacket = xml:append_subtags(Packet, TSPacket = jlib:add_delay_info(Packet, SenderJid, TimeStamp),
[jlib:timestamp_to_xml(TimeStamp, utc,
SenderJid, <<"">>),
jlib:timestamp_to_xml(TimeStamp)]),
SPacket = SPacket =
jlib:replace_from_to(jlib:jid_replace_resource(StateData#state.jid, jlib:replace_from_to(jlib:jid_replace_resource(StateData#state.jid,
FromNick), FromNick),
StateData#state.jid, TSPacket), StateData#state.jid, TSPacket),
Size = element_size(SPacket), Size = element_size(SPacket),
Q1 = lqueue_in({FromNick, TSPacket, HaveSubject, Q1 = lqueue_in({FromNick, TSPacket, HaveSubject,
TimeStamp, Size}, calendar:now_to_universal_time(TimeStamp), Size},
StateData#state.history), StateData#state.history),
add_to_log(text, {FromNick, Packet}, StateData), add_to_log(text, {FromNick, Packet}, StateData),
StateData#state{history = Q1}. StateData#state{history = Q1}.