diff --git a/mod_multicast/src/ejabberd_c2s.erl b/mod_multicast/src/ejabberd_c2s.erl index 6fdbcb7..ee626c6 100644 --- a/mod_multicast/src/ejabberd_c2s.erl +++ b/mod_multicast/src/ejabberd_c2s.erl @@ -45,6 +45,7 @@ set_aux_field/3, del_aux_field/2, get_subscription/2, + send_filtered/5, broadcast/4, get_subscribed/1, transform_listen_option/2]). @@ -94,12 +95,12 @@ tls_options = [], authenticated = false, jid, - user = "", server = <<"">>, resource = <<"">>, + user = <<"">>, server = <<"">>, resource = <<"">>, sid, pres_t = ?SETS:new(), pres_f = ?SETS:new(), pres_a = ?SETS:new(), - pres_last, pres_pri, + pres_last, pres_timestamp, privacy_list = #userlist{}, conn = unknown, @@ -247,6 +248,9 @@ get_subscription(LFrom, StateData) -> true -> none end. +send_filtered(FsmRef, Feature, From, To, Packet) -> + FsmRef ! {send_filtered, Feature, From, To, Packet}. + broadcast(FsmRef, Type, From, Packet) -> FsmRef ! {broadcast, Type, From, Packet}. @@ -1690,12 +1694,23 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - SentStateData = send_packet(NewState, FixedPacket), - ejabberd_hooks:run(user_receive_packet, - SentStateData#state.server, - [SentStateData#state.jid, From, To, FixedPacket]), + FinalState = + case ejabberd_hooks:run_fold(c2s_filter_packet_in, + NewState#state.server, FixedPacket, + [NewState#state.jid, From, To]) + of + drop -> + NewState; + FinalPacket = #xmlel{} -> + SentState = send_packet(NewState, FinalPacket), + ejabberd_hooks:run(user_receive_packet, + SentState#state.server, + [SentState#state.jid, From, To, + FinalPacket]), + SentState + end, ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, SentStateData); + fsm_next_state(StateName, FinalState); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1738,6 +1753,32 @@ handle_info({force_update_presence, LUser}, StateName, _ -> StateData end, fsm_next_state(StateName, NewStateData); +handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) -> + Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server, + true, [StateData#state.server, StateData, + Feature, To, Packet]), + NewStateData = if Drop -> + ?DEBUG("Dropping packet from ~p to ~p", + [jlib:jid_to_string(From), + jlib:jid_to_string(To)]), + StateData; + true -> + FinalPacket = jlib:replace_from_to(From, To, Packet), + case StateData#state.jid of + To -> + case privacy_check_packet(StateData, From, To, + FinalPacket, in) of + deny -> + StateData; + allow -> + send_stanza(StateData, FinalPacket) + end; + _ -> + ejabberd_router:route(From, To, FinalPacket), + StateData + end + end, + fsm_next_state(StateName, NewStateData); handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> Recipients = ejabberd_hooks:run_fold( c2s_broadcast_recipients, StateData#state.server, @@ -2008,13 +2049,9 @@ process_presence_probe(From, To, StateData) -> ?SETS:is_element(LBFrom, StateData#state.pres_f)), if Cond -> - Timestamp = StateData#state.pres_timestamp, - Packet = xml:append_subtags( - StateData#state.pres_last, - %% To is the one sending the presence (the target of the probe) - [jlib:timestamp_to_xml(Timestamp, utc, To, <<"">>), - %% TODO: Delete the next line once XEP-0091 is Obsolete - jlib:timestamp_to_xml(Timestamp)]), + %% To is the one sending the presence (the probe target) + Packet = jlib:add_delay_info(StateData#state.pres_last, To, + StateData#state.pres_timestamp), case privacy_check_packet(StateData, To, From, Packet, out) of deny -> ok; @@ -2066,12 +2103,11 @@ presence_update(From, Packet, StateData) -> OldPresence -> get_priority_from_presence(OldPresence) end, NewPriority = get_priority_from_presence(Packet), - Timestamp = calendar:now_to_universal_time(now()), update_priority(NewPriority, Packet, StateData), FromUnavail = (StateData#state.pres_last == undefined), ?DEBUG("from unavail = ~p~n", [FromUnavail]), NewStateData = StateData#state{pres_last = Packet, - pres_timestamp = Timestamp}, + pres_timestamp = now()}, NewState = if FromUnavail -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, @@ -2757,7 +2793,10 @@ handle_resume(StateData, Attrs) -> {<<"h">>, AttrH}, {<<"previd">>, AttrId}], children = []}), - SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, + SendFun = fun(_F, _T, El, Time) -> + NewEl = add_resent_delay_info(NewState, El, Time), + send_element(NewState, NewEl) + end, handle_unacked_stanzas(NewState, SendFun), send_element(NewState, #xmlel{name = <<"r">>, @@ -2813,13 +2852,13 @@ mgmt_queue_add(StateData, El) -> Num -> Num + 1 end, - NewQueue = queue:in({NewNum, El}, StateData#state.mgmt_queue), + NewQueue = queue:in({NewNum, now(), El}, StateData#state.mgmt_queue), NewState = StateData#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}, check_queue_length(NewState). mgmt_queue_drop(StateData, NumHandled) -> - NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end, StateData#state.mgmt_queue), StateData#state{mgmt_queue = NewQueue}. @@ -2847,12 +2886,12 @@ handle_unacked_stanzas(StateData, F) ?INFO_MSG("~B stanzas were not acknowledged by ~s", [N, jlib:jid_to_string(StateData#state.jid)]), lists:foreach( - fun({_, #xmlel{attrs = Attrs} = El}) -> + fun({_, Time, #xmlel{attrs = Attrs} = El}) -> From_s = xml:get_attr_s(<<"from">>, Attrs), From = jlib:string_to_jid(From_s), To_s = xml:get_attr_s(<<"to">>, Attrs), To = jlib:string_to_jid(To_s), - F(From, To, El) + F(From, To, El, Time) end, queue:to_list(Queue)) end; handle_unacked_stanzas(_StateData, _F) -> @@ -2871,16 +2910,19 @@ handle_unacked_stanzas(StateData) end, ReRoute = case ResendOnTimeout of true -> - fun ejabberd_router:route/3; + fun(From, To, El, Time) -> + NewEl = add_resent_delay_info(StateData, El, Time), + ejabberd_router:route(From, To, NewEl) + end; false -> - fun(From, To, El) -> + fun(From, To, El, _Time) -> Err = jlib:make_error_reply(El, ?ERR_SERVICE_UNAVAILABLE), ejabberd_router:route(To, From, Err) end end, - F = fun(From, To, El) -> + F = fun(From, To, El, Time) -> %% We'll drop the stanza if it was by some %% encapsulating protocol as per XEP-0297. One such protocol is %% XEP-0280, which says: "When a receiving server attempts to @@ -2893,7 +2935,7 @@ handle_unacked_stanzas(StateData) ?DEBUG("Dropping forwarded stanza from ~s", [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]); false -> - ReRoute(From, To, El) + ReRoute(From, To, El, Time) end end, handle_unacked_stanzas(StateData, F); @@ -2956,7 +2998,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> pres_f = OldStateData#state.pres_f, pres_a = OldStateData#state.pres_a, pres_last = OldStateData#state.pres_last, - pres_pri = OldStateData#state.pres_pri, pres_timestamp = OldStateData#state.pres_timestamp, privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, @@ -2985,6 +3026,9 @@ make_resume_id(StateData) -> {Time, _} = StateData#state.sid, jlib:term_to_base64({StateData#state.resource, Time}). +add_resent_delay_info(#state{server = From}, El, Time) -> + jlib:add_delay_info(El, From, Time, <<"Resent">>). + %%%---------------------------------------------------------------------- %%% XEP-0352 %%%---------------------------------------------------------------------- @@ -3007,37 +3051,36 @@ csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData, StateData2#state{csi_state = CsiState} end. -csi_queue_add(#state{csi_queue = Queue, server = Host} = StateData, - #xmlel{children = Els} = Stanza) -> - From = xml:get_tag_attr_s(<<"from">>, Stanza), - Time = calendar:now_to_universal_time(os:timestamp()), - DelayTag = [jlib:timestamp_to_xml(Time, utc, - jlib:make_jid(<<"">>, Host, <<"">>), - <<"Client Inactive">>)], - NewStanza = Stanza#xmlel{children = Els ++ DelayTag}, +csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) -> case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of - true -> csi_queue_add(csi_queue_flush(StateData), NewStanza); + true -> csi_queue_add(csi_queue_flush(StateData), Stanza); false -> - NewQueue = lists:keystore(From, 1, Queue, {From, NewStanza}), + From = xml:get_tag_attr_s(<<"from">>, Stanza), + NewQueue = lists:keystore(From, 1, Queue, {From, now(), Stanza}), StateData#state{csi_queue = NewQueue} end. -csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState} = StateData, - From) -> +csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} = + StateData, From) -> case lists:keytake(From, 1, Queue) of - {value, {From, Stanza}, NewQueue} -> + {value, {From, Time, Stanza}, NewQueue} -> + NewStanza = jlib:add_delay_info(Stanza, Host, Time, + <<"Client Inactive">>), NewStateData = send_stanza(StateData#state{csi_state = active}, - Stanza), + NewStanza), NewStateData#state{csi_queue = NewQueue, csi_state = CsiState}; false -> StateData end. -csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID} = - StateData) -> +csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID, + server = Host} = StateData) -> ?DEBUG("Flushing CSI queue for ~s", [jlib:jid_to_string(JID)]), NewStateData = - lists:foldl(fun({_From, Stanza}, AccState) -> - send_stanza(AccState, Stanza) + lists:foldl(fun({_From, Time, Stanza}, AccState) -> + NewStanza = + jlib:add_delay_info(Stanza, Host, Time, + <<"Client Inactive">>), + send_stanza(AccState, NewStanza) end, StateData#state{csi_state = active}, Queue), NewStateData#state{csi_queue = [], csi_state = CsiState}. diff --git a/mod_multicast/src/mod_muc_room.erl b/mod_multicast/src/mod_muc_room.erl index 3ea7f34..e27549d 100644 --- a/mod_multicast/src/mod_muc_room.erl +++ b/mod_multicast/src/mod_muc_room.erl @@ -174,7 +174,7 @@ normal_state({route, From, <<"">>, Now = now_to_usec(now()), MinMessageInterval = trunc(gen_mod:get_module_opt(StateData#state.server_host, - mod_muc, min_message_interval, fun(MMI) when is_integer(MMI) -> MMI end, 0) + mod_muc, min_message_interval, fun(MMI) when is_number(MMI) -> MMI end, 0) * 1000000), Size = element_size(Packet), {MessageShaper, MessageShaperInterval} = @@ -1509,15 +1509,17 @@ get_user_activity(JID, StateData) -> store_user_activity(JID, UserActivity, StateData) -> MinMessageInterval = - gen_mod:get_module_opt(StateData#state.server_host, - mod_muc, min_message_interval, - fun(I) when is_integer(I), I>=0 -> I end, - 0), + trunc(gen_mod:get_module_opt(StateData#state.server_host, + mod_muc, min_message_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) + * 1000), MinPresenceInterval = - gen_mod:get_module_opt(StateData#state.server_host, - mod_muc, min_presence_interval, - fun(I) when is_integer(I), I>=0 -> I end, - 0), + trunc(gen_mod:get_module_opt(StateData#state.server_host, + mod_muc, min_presence_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) + * 1000), Key = jlib:jid_tolower(JID), Now = now_to_usec(now()), Activity1 = clean_treap(StateData#state.activity, @@ -1548,8 +1550,8 @@ store_user_activity(JID, UserActivity, StateData) -> 100000), Delay = lists:max([MessageShaperInterval, PresenceShaperInterval, - MinMessageInterval * 1000, - MinPresenceInterval * 1000]) + MinMessageInterval, + MinPresenceInterval]) * 1000, Priority = {1, -(Now + Delay)}, StateData#state{activity = @@ -2428,24 +2430,21 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) -> false -> false; _ -> true end, - TimeStamp = calendar:now_to_universal_time(now()), + TimeStamp = now(), SenderJid = case (StateData#state.config)#config.anonymous of true -> StateData#state.jid; false -> FromJID end, - TSPacket = xml:append_subtags(Packet, - [jlib:timestamp_to_xml(TimeStamp, utc, - SenderJid, <<"">>), - jlib:timestamp_to_xml(TimeStamp)]), + TSPacket = jlib:add_delay_info(Packet, SenderJid, TimeStamp), SPacket = jlib:replace_from_to(jlib:jid_replace_resource(StateData#state.jid, FromNick), StateData#state.jid, TSPacket), Size = element_size(SPacket), Q1 = lqueue_in({FromNick, TSPacket, HaveSubject, - TimeStamp, Size}, + calendar:now_to_universal_time(TimeStamp), Size}, StateData#state.history), add_to_log(text, {FromNick, Packet}, StateData), StateData#state{history = Q1}.