diff --git a/mod_multicast/src/ejabberd_router_multicast.erl b/mod_multicast/src/ejabberd_router_multicast.erl index 3ea9560..8967105 100644 --- a/mod_multicast/src/ejabberd_router_multicast.erl +++ b/mod_multicast/src/ejabberd_router_multicast.erl @@ -3,7 +3,6 @@ %%% Author : Badlop %%% Purpose : Multicast router %%% Created : 11 Aug 2007 by Badlop -%%% Id : $Id: ejabberd_router_multicast.erl 440 2007-12-06 22:36:21Z badlop $ %%%---------------------------------------------------------------------- -module(ejabberd_router_multicast). @@ -25,8 +24,8 @@ terminate/2, code_change/3]). -include("ejabberd.hrl"). --include("jlib.hrl"). -include("logger.hrl"). +-include("jlib.hrl"). -record(route_multicast, {domain, pid}). -record(state, {}). @@ -71,11 +70,13 @@ unregister_route(Domain) -> LDomain -> Pid = self(), F = fun() -> - case mnesia:match(#route_multicast{domain = LDomain, - pid = Pid}) of + case mnesia:select(route_multicast, + [{#route_multicast{pid = Pid, domain = LDomain, _ = '_'}, + [], + ['$_']}]) of [R] -> mnesia:delete_object(R); _ -> ok - end + end end, mnesia:transaction(F) end. @@ -189,21 +190,48 @@ code_change(_OldVsn, State, _Extra) -> %% Destinations = [#jid] do_route(From, Domain, Destinations, Packet) -> - ?DEBUG("route_multicast~n\tfrom ~p~n\tdomain ~p~n\tdestinations ~p~n\tpacket ~p~n", - [From, Domain, Destinations, Packet]), + ?DEBUG("route_multicast~n\tfrom ~s~n\tdomain ~s~n\tdestinations ~p~n\tpacket ~p~n", + [jlib:jid_to_string(From), + Domain, + [jlib:jid_to_string(To) || To <- Destinations], + Packet]), + + {Groups, Rest} = lists:foldr( + fun(Dest, {Groups1, Rest1}) -> + case ejabberd_sm:get_session_pid(Dest#jid.luser, Dest#jid.lserver, Dest#jid.lresource) of + none -> + {Groups1, [Dest|Rest1]}; + Pid -> + Node = node(Pid), + if Node /= node() -> + {dict:append(Node, Dest, Groups1), Rest1}; + true -> + {Groups1, [Dest|Rest1]} + end + end + end, {dict:new(), []}, Destinations), + + dict:map( + fun(Node, [Single]) -> + ejabberd_cluster:send({ejabberd_sm, Node}, + {route, From, Single, Packet}); + (Node, Dests) -> + ejabberd_cluster:send({ejabberd_sm, Node}, + {route_multiple, From, Dests, Packet}) + end, Groups), %% Try to find an appropriate multicast service case mnesia:dirty_read(route_multicast, Domain) of %% If no multicast service is available in this server, send manually - [] -> do_route_normal(From, Destinations, Packet); + [] -> do_route_normal(From, Rest, Packet); %% If available, send the packet using multicast service [R] -> case R#route_multicast.pid of Pid when is_pid(Pid) -> - Pid ! {route_trusted, From, Destinations, Packet}; - _ -> do_route_normal(From, Destinations, Packet) + Pid ! {route_trusted, From, Rest, Packet}; + _ -> do_route_normal(From, Rest, Packet) end end. diff --git a/mod_multicast/src/mod_multicast.erl b/mod_multicast/src/mod_multicast.erl index dd610d8..03bf9d0 100644 --- a/mod_multicast/src/mod_multicast.erl +++ b/mod_multicast/src/mod_multicast.erl @@ -3,54 +3,57 @@ %%% Author : Badlop %%% Purpose : Extended Stanza Addressing (XEP-0033) support %%% Created : 29 May 2007 by Badlop -%%% Id : $Id: mod_multicast.erl 440 2007-12-06 22:36:21Z badlop $ %%%---------------------------------------------------------------------- -module(mod_multicast). + -author('badlop@ono.com'). -behaviour(gen_server). + -behaviour(gen_mod). %% API -export([start_link/2, start/2, stop/1]). %% gen_server callbacks --export([init/1, - handle_info/2, - handle_call/3, - handle_cast/2, - terminate/2, - code_change/3 - ]). +-export([init/1, handle_info/2, handle_call/3, + handle_cast/2, terminate/2, code_change/3]). --export([ - purge_loop/1 - ]). +-export([purge_loop/1]). -include("ejabberd.hrl"). --include("jlib.hrl"). -include("logger.hrl"). --record(state, {lserver, lservice, access, service_limits}). +-include("jlib.hrl"). + +-record(state, + {lserver, lservice, access, service_limits}). -record(multicastc, {rserver, response, ts}). + %% ts: timestamp (in seconds) when the cache item was last updated -record(dest, {jid_string, jid_jid, type, full_xml}). + %% jid_string = string() %% jid_jid = jid() %% full_xml = xml() --record(group, {server, dests, multicast, others, addresses}). +-record(group, + {server, dests, multicast, others, addresses}). + %% server = string() -%% dests = [string()] +%% dests = [string()] %% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached %% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()} %% others = [xml()] %% packet = xml() --record(waiter, {awaiting, group, renewal=false, sender, packet, aattrs, addresses}). +-record(waiter, + {awaiting, group, renewal = false, sender, packet, + aattrs, addresses}). + %% awaiting = {[Remote_service], Local_service, Type_awaiting} %% Remote_service = Local_service = string() %% Type_awaiting = info | items @@ -61,54 +64,46 @@ %% aattrs = [xml()] -record(limits, {message, presence}). + %% message = presence = integer() | infinite -record(service_limits, {local, remote}). -%% local = remote = limits() %% All the elements are of type value() --define(VERSION_MULTICAST, "$Revision: 440 $ "). +-define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>). + -define(PROCNAME, ejabberd_mod_multicast). --define(PURGE_PROCNAME, ejabberd_mod_multicast_purgeloop). +-define(PURGE_PROCNAME, + ejabberd_mod_multicast_purgeloop). -%% TODO: allow configuration instead of hard-coding -%% Time in seconds -define(MAXTIME_CACHE_POSITIVE, 86400). + -define(MAXTIME_CACHE_NEGATIVE, 86400). -%% Time in miliseconds --define(CACHE_PURGE_TIMER, 86400000). % Purge the cache every 24 hours --define(DISCO_QUERY_TIMEOUT, 10000). % After 10 seconds of delay the server is declared dead +-define(CACHE_PURGE_TIMER, 86400000). + +-define(DISCO_QUERY_TIMEOUT, 10000). + +-define(DEFAULT_LIMIT_LOCAL_MESSAGE, 100). -%% TODO: Put the correct values once XEP33 is updated --define(DEFAULT_LIMIT_LOCAL_MESSAGE, 100). -define(DEFAULT_LIMIT_LOCAL_PRESENCE, 100). + -define(DEFAULT_LIMIT_REMOTE_MESSAGE, 20). --define(DEFAULT_LIMIT_REMOTE_PRESENCE,20). +-define(DEFAULT_LIMIT_REMOTE_PRESENCE, 20). -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- start_link(LServerS, Opts) -> Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [LServerS, Opts], []). + gen_server:start_link({local, Proc}, ?MODULE, + [LServerS, Opts], []). start(LServerS, Opts) -> Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), - ChildSpec = { - Proc, - {?MODULE, start_link, [LServerS, Opts]}, - temporary, - 1000, - worker, - [?MODULE]}, + ChildSpec = {Proc, + {?MODULE, start_link, [LServerS, Opts]}, temporary, + 1000, worker, [?MODULE]}, supervisor:start_child(ejabberd_sup, ChildSpec). stop(LServerS) -> @@ -117,53 +112,34 @@ stop(LServerS) -> supervisor:terminate_child(ejabberd_sup, Proc), supervisor:delete_child(ejabberd_sup, Proc). - %%==================================================================== %% gen_server callbacks %%==================================================================== -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- init([LServerS, Opts]) -> - LServiceS = gen_mod:get_opt(host, Opts, "multicast." ++ LServerS), - Access = gen_mod:get_opt(access, Opts, all), - SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts, [])), + LServiceS = gen_mod:get_opt_host(LServerS, Opts, + <<"multicast.@HOST@">>), + Access = gen_mod:get_opt(access, Opts, + fun (A) when is_atom(A) -> A end, all), + SLimits = + build_service_limit_record(gen_mod:get_opt(limits, Opts, + fun (A) when is_list(A) -> + A + end, + [])), create_cache(), try_start_loop(), create_pool(), ejabberd_router_multicast:register_route(LServerS), ejabberd_router:register_route(LServiceS), - {ok, #state{lservice = LServiceS, - lserver = LServerS, - access = Access, - service_limits = SLimits}}. + {ok, + #state{lservice = LServiceS, lserver = LServerS, + access = Access, service_limits = SLimits}}. -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- handle_call(stop, _From, State) -> - try_stop_loop(), - {stop, normal, ok, State}. + try_stop_loop(), {stop, normal, ok, State}. -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% Function: handle_info(Info, State) -> {noreply, State} | @@ -172,258 +148,255 @@ handle_cast(_Msg, State) -> %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- -handle_info({route, From, To, {xmlelement, "iq", Attrs, _Els} = Packet}, State) -> +handle_info({route, From, To, + #xmlel{name = <<"iq">>, attrs = Attrs} = Packet}, + State) -> IQ = jlib:iq_query_info(Packet), case catch process_iq(From, IQ, State) of - Result when is_record(Result, iq) -> - ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); - {'EXIT', Reason} -> - ?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]), - Err = jlib:make_error_reply(Packet, ?ERR_INTERNAL_SERVER_ERROR), - ejabberd_router:route(To, From, Err); - reply -> - LServiceS = jts(To), - case xml:get_attr_s("type", Attrs) of - "result" -> process_iqreply_result(From, LServiceS, Packet, State); - "error" -> process_iqreply_error(From, LServiceS, Packet) - end + Result when is_record(Result, iq) -> + ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); + {'EXIT', Reason} -> + ?ERROR_MSG("Error when processing IQ stanza: ~p", + [Reason]), + Err = jlib:make_error_reply(Packet, + ?ERR_INTERNAL_SERVER_ERROR), + ejabberd_router:route(To, From, Err); + reply -> + LServiceS = jts(To), + case xml:get_attr_s(<<"type">>, Attrs) of + <<"result">> -> + process_iqreply_result(From, LServiceS, Packet, State); + <<"error">> -> + process_iqreply_error(From, LServiceS, Packet) + end end, {noreply, State}; - %% XEP33 allows only 'message' and 'presence' stanza type -handle_info({route, From, To, {xmlelement, Stanza_type, _, _} = Packet}, - #state{lservice = LServiceS, - lserver = LServerS, - access = Access, - service_limits = SLimits} = State) - when (Stanza_type == "message") or (Stanza_type == "presence") -> - %%io:format("Multicast packet: ~nFrom: ~p~nTo: ~p~nPacket: ~p~n", [From, To, Packet]), - route_untrusted(LServiceS, LServerS, Access, SLimits, From, To, Packet), +handle_info({route, From, To, + #xmlel{name = Stanza_type} = Packet}, + #state{lservice = LServiceS, lserver = LServerS, + access = Access, service_limits = SLimits} = + State) + when (Stanza_type == <<"message">>) or + (Stanza_type == <<"presence">>) -> + route_untrusted(LServiceS, LServerS, Access, SLimits, + From, To, Packet), {noreply, State}; - %% Handle multicast packets sent by trusted local services handle_info({route_trusted, From, Destinations, Packet}, - #state{lservice = LServiceS, - lserver = LServerS} = State) -> - %%io:format("Multicast packet2: ~nFrom: ~p~nDestinations: ~p~nPacket: ~p~n", [From, Destinations, Packet]), - route_trusted(LServiceS, LServerS, From, Destinations, Packet), + #state{lservice = LServiceS, lserver = LServerS} = + State) -> + route_trusted(LServiceS, LServerS, From, Destinations, + Packet), {noreply, State}; - handle_info({get_host, Pid}, State) -> - Pid ! {my_host, State#state.lservice}, - {noreply, State}; + Pid ! {my_host, State#state.lservice}, {noreply, State}; +handle_info(_Info, State) -> {noreply, State}. -handle_info(_Info, State) -> - {noreply, State}. - - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- terminate(_Reason, State) -> ejabberd_router_multicast:unregister_route(State#state.lserver), ejabberd_router:unregister_route(State#state.lservice), ok. -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - +code_change(_OldVsn, State, _Extra) -> {ok, State}. %%==================================================================== %%% Internal functions %%==================================================================== - %%%------------------------ %%% IQ Request Processing %%%------------------------ -%% disco#info request -process_iq(From, #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} = IQ, State) -> - IQ#iq{type = result, sub_el = - [{xmlelement, "query", [{"xmlns", ?NS_DISCO_INFO}], iq_disco_info(From, Lang, State)}]}; - +process_iq(From, + #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} = + IQ, + State) -> + IQ#iq{type = result, + sub_el = + [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, ?NS_DISCO_INFO}], + children = iq_disco_info(From, Lang, State)}]}; %% disco#items request -process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) -> - IQ#iq{type = result, sub_el = - [{xmlelement, "query", [{"xmlns", ?NS_DISCO_ITEMS}], []}]}; - +process_iq(_, + #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) -> + IQ#iq{type = result, + sub_el = + [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, ?NS_DISCO_ITEMS}], + children = []}]}; %% vCard request -process_iq(_, #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, _) -> - IQ#iq{type = result, sub_el = - [{xmlelement, "vCard", [{"xmlns", ?NS_VCARD}], iq_vcard(Lang)}]}; - +process_iq(_, + #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, + _) -> + IQ#iq{type = result, + sub_el = + [#xmlel{name = <<"vCard">>, + attrs = [{<<"xmlns">>, ?NS_VCARD}], + children = iq_vcard(Lang)}]}; %% version request -process_iq(_, #iq{type = get, xmlns = ?NS_VERSION} = IQ, _) -> - IQ#iq{type = result, sub_el = - [{xmlelement, "query", [{"xmlns", ?NS_VERSION}], iq_version()}]}; - +process_iq(_, #iq{type = get, xmlns = ?NS_VERSION} = IQ, + _) -> + IQ#iq{type = result, + sub_el = + [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, ?NS_VERSION}], + children = iq_version()}]}; %% Unknown "set" or "get" request -process_iq(_, #iq{type=Type, sub_el=SubEl} = IQ, _) when Type==get; Type==set -> - IQ#iq{type = error, sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]}; - +process_iq(_, #iq{type = Type, sub_el = SubEl} = IQ, _) + when Type == get; Type == set -> + IQ#iq{type = error, + sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]}; %% IQ "result" or "error". -process_iq(_, reply, _) -> - reply; - +process_iq(_, reply, _) -> reply; %% IQ "result" or "error". -process_iq(_, _, _) -> - ok. +process_iq(_, _, _) -> ok. --define(FEATURE(Feat), {xmlelement,"feature",[{"var", Feat}],[]}). +-define(FEATURE(Feat), + #xmlel{name = <<"feature">>, + attrs = [{<<"var">>, Feat}], children = []}). iq_disco_info(From, Lang, State) -> - [{xmlelement, "identity", - [{"category", "service"}, - {"type", "multicast"}, - {"name", translate:translate(Lang, "Multicast")}], []}, - ?FEATURE(?NS_DISCO_INFO), - ?FEATURE(?NS_DISCO_ITEMS), - ?FEATURE(?NS_VCARD), - ?FEATURE(?NS_ADDRESS)] ++ - iq_disco_info_extras(From, State). + [#xmlel{name = <<"identity">>, + attrs = + [{<<"category">>, <<"service">>}, + {<<"type">>, <<"multicast">>}, + {<<"name">>, + translate:translate(Lang, <<"Multicast">>)}], + children = []}, + ?FEATURE((?NS_DISCO_INFO)), ?FEATURE((?NS_DISCO_ITEMS)), + ?FEATURE((?NS_VCARD)), ?FEATURE((?NS_ADDRESS))] + ++ iq_disco_info_extras(From, State). iq_vcard(Lang) -> - [{xmlelement, "FN", [], - [{xmlcdata, "ejabberd/mod_multicast"}]}, - {xmlelement, "URL", [], - [{xmlcdata, ?EJABBERD_URI}]}, - {xmlelement, "DESC", [], - [{xmlcdata, translate:translate(Lang, "ejabberd Multicast service\n" - "Copyright (c) 2007 Alexey Shchepin")}]}]. + [#xmlel{name = <<"FN">>, attrs = [], + children = [{xmlcdata, <<"ejabberd/mod_multicast">>}]}, + #xmlel{name = <<"URL">>, attrs = [], + children = [{xmlcdata, ?EJABBERD_URI}]}, + #xmlel{name = <<"DESC">>, attrs = [], + children = + [{xmlcdata, + <<(translate:translate(Lang, + <<"ejabberd Multicast service">>))/binary, + "\nCopyright (c) 2002-2014 ProcessOne">>}]}]. iq_version() -> - [{xmlelement, "name", [], - [{xmlcdata, "mod_multicast"}]}, - {xmlelement, "version", [], - [{xmlcdata, ?VERSION_MULTICAST}]}]. - + [#xmlel{name = <<"name">>, attrs = [], + children = [{xmlcdata, <<"mod_multicast">>}]}, + #xmlel{name = <<"version">>, attrs = [], + children = [{xmlcdata, ?VERSION_MULTICAST}]}]. %%%------------------------- %%% Route %%%------------------------- -%% Destinations = [string()] -route_trusted(LServiceS, LServerS, FromJID, Destinations, Packet) -> - - %% Strip 'addresses' from packet +route_trusted(LServiceS, LServerS, FromJID, + Destinations, Packet) -> Packet_stripped = Packet, - AAttrs = [{"xmlns", ?NS_ADDRESS}], + AAttrs = [{<<"xmlns">>, ?NS_ADDRESS}], Delivereds = [], - - Dests2 = lists:map( - fun(D) -> - DS = jts(D), - XML = {xmlelement, "address", - [{"type", "bcc"}, {"jid", DS}], - []}, - #dest{jid_string = DS, - jid_jid = D, - type = "bcc", - full_xml = XML} - end, - Destinations), - - %% Group Not_delivered by server + Dests2 = lists:map(fun (D) -> + DS = jts(D), + XML = #xmlel{name = <<"address">>, + attrs = + [{<<"type">>, <<"bcc">>}, + {<<"jid">>, DS}], + children = []}, + #dest{jid_string = DS, jid_jid = D, + type = <<"bcc">>, full_xml = XML} + end, + Destinations), Groups = group_dests(Dests2), + route_common(LServerS, LServiceS, FromJID, Groups, + Delivereds, Packet_stripped, AAttrs). - route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs). - -route_untrusted(LServiceS, LServerS, Access, SLimits, From, To, Packet) -> - try route_untrusted2(LServiceS, LServerS, Access, SLimits, From, Packet) +route_untrusted(LServiceS, LServerS, Access, SLimits, + From, To, Packet) -> + try route_untrusted2(LServiceS, LServerS, Access, + SLimits, From, Packet) catch - throw:adenied -> route_error(To, From, Packet, forbidden, "Access denied by service policy"); - throw:eadsele -> route_error(To, From, Packet, bad_request, "No addresses element found"); - throw:eadeles -> route_error(To, From, Packet, bad_request, "No address elements found"); - throw:ewxmlns -> route_error(To, From, Packet, bad_request, "Wrong xmlns"); - throw:etoorec -> route_error(To, From, Packet, not_acceptable, "Too many receiver fields were specified"); - throw:edrelay -> route_error(To, From, Packet, forbidden, "Packet relay is denied by service policy"); - EType:EReason -> - ?ERROR_MSG("Multicast unknown error: Type: ~p~nReason: ~p", [EType, EReason]), - route_error(To, From, Packet, internal_server_error, "Unknown problem") + adenied -> + route_error(To, From, Packet, forbidden, + <<"Access denied by service policy">>); + eadsele -> + route_error(To, From, Packet, bad_request, + <<"No addresses element found">>); + eadeles -> + route_error(To, From, Packet, bad_request, + <<"No address elements found">>); + ewxmlns -> + route_error(To, From, Packet, bad_request, + <<"Wrong xmlns">>); + etoorec -> + route_error(To, From, Packet, not_acceptable, + <<"Too many receiver fields were specified">>); + edrelay -> + route_error(To, From, Packet, forbidden, + <<"Packet relay is denied by service policy">>); + EType:EReason -> + ?ERROR_MSG("Multicast unknown error: Type: ~p~nReason: ~p", + [EType, EReason]), + route_error(To, From, Packet, internal_server_error, + <<"Unknown problem">>) end. -route_untrusted2(LServiceS, LServerS, Access, SLimits, FromJID, Packet) -> +route_untrusted2(LServiceS, LServerS, Access, SLimits, + FromJID, Packet) -> ok = check_access(LServerS, Access, FromJID), - - %% Strip 'addresses' from packet - {ok, Packet_stripped, AAttrs, Addresses} = strip_addresses_element(Packet), - - %% Split Addresses in To_deliver and Delivered - {To_deliver, Delivereds} = split_addresses_todeliver(Addresses), - - %% Convert XML to record + {ok, Packet_stripped, AAttrs, Addresses} = + strip_addresses_element(Packet), + {To_deliver, Delivereds} = + split_addresses_todeliver(Addresses), Dests = convert_dest_record(To_deliver), - - %% Split the destinations by JID {Dests2, Not_jids} = split_dests_jid(Dests), report_not_jid(FromJID, Packet, Not_jids), - - %% Check limit - ok = check_limit_dests(SLimits, FromJID, Packet, Dests2), - - %% Group Not_delivered by server + ok = check_limit_dests(SLimits, FromJID, Packet, + Dests2), Groups = group_dests(Dests2), - - %% Check relay for each Group ok = check_relay(FromJID#jid.server, LServerS, Groups), + route_common(LServerS, LServiceS, FromJID, Groups, + Delivereds, Packet_stripped, AAttrs). - route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs). - -route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs) -> - %% Gather multicast service for each Group +route_common(LServerS, LServiceS, FromJID, Groups, + Delivereds, Packet_stripped, AAttrs) -> Groups2 = look_cached_servers(LServerS, Groups), - - %% Create Delivered XML element for each Group Groups3 = build_others_xml(Groups2), - - %% Add preliminary packet for each group Groups4 = add_addresses(Delivereds, Groups3), - - %% Decide action for each Group AGroups = decide_action_groups(Groups4), + act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, + AGroups). - act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, AGroups). +act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, + AGroups) -> + [perform(FromJID, Packet_stripped, AAttrs, LServiceS, + AGroup) + || AGroup <- AGroups]. -act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, AGroups) -> - [perform(FromJID, Packet_stripped, AAttrs, LServiceS, AGroup) || AGroup <- AGroups]. - -perform(From, Packet, AAttrs, _, {route_single, Group}) -> - [route_packet(From, ToUser, Packet, AAttrs, Group#group.addresses) || ToUser <- Group#group.dests]; - -perform(From, Packet, AAttrs, _, {{route_multicast, JID, RLimits}, Group}) -> - route_packet_multicast(From, JID, Packet, AAttrs, Group#group.dests, Group#group.addresses, RLimits); - -perform(From, Packet, AAttrs, LServiceS, {{ask, Old_service, renewal}, Group}) -> +perform(From, Packet, AAttrs, _, + {route_single, Group}) -> + [route_packet(From, ToUser, Packet, AAttrs, + Group#group.addresses) + || ToUser <- Group#group.dests]; +perform(From, Packet, AAttrs, _, + {{route_multicast, JID, RLimits}, Group}) -> + route_packet_multicast(From, JID, Packet, AAttrs, + Group#group.dests, Group#group.addresses, RLimits); +perform(From, Packet, AAttrs, LServiceS, + {{ask, Old_service, renewal}, Group}) -> send_query_info(Old_service, LServiceS), - add_waiter(#waiter{awaiting = {[Old_service], LServiceS, info}, - group = Group, - renewal = true, - sender = From, - packet = Packet, - aattrs = AAttrs, - addresses = Group#group.addresses - }); - -perform(From, Packet, AAttrs, LServiceS, {{ask, Server, not_renewal}, Group}) -> + add_waiter(#waiter{awaiting = + {[Old_service], LServiceS, info}, + group = Group, renewal = true, sender = From, + packet = Packet, aattrs = AAttrs, + addresses = Group#group.addresses}); +perform(From, Packet, AAttrs, LServiceS, + {{ask, Server, not_renewal}, Group}) -> send_query_info(Server, LServiceS), - add_waiter(#waiter{awaiting = {[Server], LServiceS, info}, - group = Group, - renewal = false, - sender = From, - packet = Packet, - aattrs = AAttrs, - addresses = Group#group.addresses - }). - + add_waiter(#waiter{awaiting = + {[Server], LServiceS, info}, + group = Group, renewal = false, sender = From, + packet = Packet, aattrs = AAttrs, + addresses = Group#group.addresses}). %%%------------------------- %%% Check access permission @@ -431,96 +404,88 @@ perform(From, Packet, AAttrs, LServiceS, {{ask, Server, not_renewal}, Group}) -> check_access(LServerS, Access, From) -> case acl:match_rule(LServerS, Access, From) of - allow -> - ok; - _ -> - throw(adenied) + allow -> ok; + _ -> throw(adenied) end. - %%%------------------------- %%% Strip 'addresses' XML element %%%------------------------- strip_addresses_element(Packet) -> - case xml:get_subtag(Packet, "addresses") of - {xmlelement, "addresses", AAttrs, Addresses} -> - case xml:get_attr_s("xmlns", AAttrs) of - ?NS_ADDRESS -> - {xmlelement, Name, Attrs, Els} = Packet, - Els_stripped = lists:keydelete("addresses", 2, Els), - Packet_stripped = {xmlelement, Name, Attrs, Els_stripped}, - {ok, Packet_stripped, AAttrs, Addresses}; - _ -> throw(ewxmlns) - end; - _ -> throw(eadsele) + case xml:get_subtag(Packet, <<"addresses">>) of + #xmlel{name = <<"addresses">>, attrs = AAttrs, + children = Addresses} -> + case xml:get_attr_s(<<"xmlns">>, AAttrs) of + ?NS_ADDRESS -> + #xmlel{name = Name, attrs = Attrs, children = Els} = + Packet, + Els_stripped = lists:keydelete(<<"addresses">>, 2, Els), + Packet_stripped = #xmlel{name = Name, attrs = Attrs, + children = Els_stripped}, + {ok, Packet_stripped, AAttrs, Addresses}; + _ -> throw(ewxmlns) + end; + _ -> throw(eadsele) end. - %%%------------------------- %%% Split Addresses %%%------------------------- split_addresses_todeliver(Addresses) -> - lists:partition( - fun(XML) -> - case XML of - {xmlelement, "address", Attrs, _El} -> - case xml:get_attr_s("delivered", Attrs) of - "true" -> false; - _ -> - Type = xml:get_attr_s("type", Attrs), - case Type of - "to" -> true; - "cc" -> true; - "bcc" -> true; - _ -> false - end - end; - _ -> false - end - end, - Addresses). - + lists:partition(fun (XML) -> + case XML of + #xmlel{name = <<"address">>, attrs = Attrs} -> + case xml:get_attr_s(<<"delivered">>, Attrs) of + <<"true">> -> false; + _ -> + Type = xml:get_attr_s(<<"type">>, + Attrs), + case Type of + <<"to">> -> true; + <<"cc">> -> true; + <<"bcc">> -> true; + _ -> false + end + end; + _ -> false + end + end, + Addresses). %%%------------------------- %%% Check does not exceed limit of destinations %%%------------------------- -check_limit_dests(SLimits, FromJID, Packet, Addresses) -> +check_limit_dests(SLimits, FromJID, Packet, + Addresses) -> SenderT = sender_type(FromJID), Limits = get_slimit_group(SenderT, SLimits), Type_of_stanza = type_of_stanza(Packet), - {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), + {_Type, Limit_number} = get_limit_number(Type_of_stanza, + Limits), case length(Addresses) > Limit_number of - false -> - ok; - true -> - throw(etoorec) + false -> ok; + true -> throw(etoorec) end. - %%%------------------------- %%% Convert Destination XML to record %%%------------------------- convert_dest_record(XMLs) -> - lists:map( - fun(XML) -> - case xml:get_tag_attr_s("jid", XML) of - [] -> - #dest{jid_string = none, full_xml = XML}; - JIDS -> - Type = xml:get_tag_attr_s("type", XML), - JIDJ = stj(JIDS), - #dest{jid_string = JIDS, - jid_jid = JIDJ, - type = Type, - full_xml = XML} - end - end, - XMLs). - + lists:map(fun (XML) -> + case xml:get_tag_attr_s(<<"jid">>, XML) of + <<"">> -> #dest{jid_string = none, full_xml = XML}; + JIDS -> + Type = xml:get_tag_attr_s(<<"type">>, XML), + JIDJ = stj(JIDS), + #dest{jid_string = JIDS, jid_jid = JIDJ, + type = Type, full_xml = XML} + end + end, + XMLs). %%%------------------------- %%% Split destinations by existence of JID @@ -528,39 +493,35 @@ convert_dest_record(XMLs) -> %%%------------------------- split_dests_jid(Dests) -> - lists:partition( - fun(Dest) -> - case Dest#dest.jid_string of - none -> false; - _ -> true - end - end, - Dests). + lists:partition(fun (Dest) -> + case Dest#dest.jid_string of + none -> false; + _ -> true + end + end, + Dests). -%% Sends an error message for each unknown address -%% Currently only 'jid' addresses are acceptable on ejabberd report_not_jid(From, Packet, Dests) -> - Dests2 = [xml:element_to_string(Dest#dest.full_xml) || Dest <- Dests], - [route_error(From, From, Packet, jid_malformed, - "This service can not process the address: " ++ D) + Dests2 = [xml:element_to_binary(Dest#dest.full_xml) + || Dest <- Dests], + [route_error(From, From, Packet, jid_malformed, + <<"This service can not process the address: ", + D/binary>>) || D <- Dests2]. - %%%------------------------- -%%% Group destinations by their servers +%%% Group destinations by their servers %%%------------------------- group_dests(Dests) -> - D = lists:foldl( - fun(Dest, Dict) -> - ServerS = (Dest#dest.jid_jid)#jid.server, - dict:append(ServerS, Dest, Dict) - end, - dict:new(), - Dests), + D = lists:foldl(fun (Dest, Dict) -> + ServerS = (Dest#dest.jid_jid)#jid.server, + dict:append(ServerS, Dest, Dict) + end, + dict:new(), Dests), Keys = dict:fetch_keys(D), - [ #group{server = Key, dests = dict:fetch(Key, D)} || Key <- Keys ]. - + [#group{server = Key, dests = dict:fetch(Key, D)} + || Key <- Keys]. %%%------------------------- %%% Look for cached responses @@ -570,42 +531,39 @@ look_cached_servers(LServerS, Groups) -> [look_cached(LServerS, Group) || Group <- Groups]. look_cached(LServerS, G) -> - Maxtime_positive = ?MAXTIME_CACHE_POSITIVE, - Maxtime_negative = ?MAXTIME_CACHE_NEGATIVE, - - Cached_response = - search_server_on_cache(G#group.server, LServerS, - {Maxtime_positive, Maxtime_negative}), + Maxtime_positive = (?MAXTIME_CACHE_POSITIVE), + Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE), + Cached_response = search_server_on_cache(G#group.server, + LServerS, + {Maxtime_positive, + Maxtime_negative}), G#group{multicast = Cached_response}. - %%%------------------------- %%% Build delivered XML element %%%------------------------- build_others_xml(Groups) -> - [Group#group{others = build_other_xml(Group#group.dests)} || Group <- Groups]. + [Group#group{others = + build_other_xml(Group#group.dests)} + || Group <- Groups]. -%% Add delivered=true -%% and remove addresses which type == bcc build_other_xml(Dests) -> - lists:foldl( - fun(Dest, R) -> - XML = Dest#dest.full_xml, - case Dest#dest.type of - "to" -> [add_delivered(XML) | R]; - "cc" -> [add_delivered(XML) | R]; - "bcc" -> R; - _ -> [XML | R] - end - end, - [], - Dests). - -add_delivered({xmlelement, Name, Attrs, Els}) -> - Attrs2 = [{"delivered", "true"} | Attrs], - {xmlelement, Name, Attrs2, Els}. + lists:foldl(fun (Dest, R) -> + XML = Dest#dest.full_xml, + case Dest#dest.type of + <<"to">> -> [add_delivered(XML) | R]; + <<"cc">> -> [add_delivered(XML) | R]; + <<"bcc">> -> R; + _ -> [XML | R] + end + end, + [], Dests). +add_delivered(#xmlel{name = Name, attrs = Attrs, + children = Els}) -> + Attrs2 = [{<<"delivered">>, <<"true">>} | Attrs], + #xmlel{name = Name, attrs = Attrs2, children = Els}. %%%------------------------- %%% Add preliminary packets @@ -615,338 +573,291 @@ add_addresses(Delivereds, Groups) -> Ps = [Group#group.others || Group <- Groups], add_addresses2(Delivereds, Groups, [], [], Ps). -add_addresses2(_, [], Res, _, []) -> - Res; - -add_addresses2(Delivereds, [Group | Groups], Res, Pa, [Pi | Pz]) -> +add_addresses2(_, [], Res, _, []) -> Res; +add_addresses2(Delivereds, [Group | Groups], Res, Pa, + [Pi | Pz]) -> Addresses = lists:append([Delivereds] ++ Pa ++ Pz), Group2 = Group#group{addresses = Addresses}, - add_addresses2(Delivereds, Groups, [Group2 | Res], [Pi | Pa], Pz). - + add_addresses2(Delivereds, Groups, [Group2 | Res], + [Pi | Pa], Pz). %%%------------------------- %%% Decide action groups %%%------------------------- decide_action_groups(Groups) -> - [{decide_action_group(Group), Group} || Group <- Groups]. + [{decide_action_group(Group), Group} + || Group <- Groups]. decide_action_group(Group) -> Server = Group#group.server, case Group#group.multicast of - - {cached, local_server} -> - %% Send a copy of the packet to each local user on Dests - route_single; - - {cached, not_supported} -> - %% Send a copy of the packet to each remote user on Dests - route_single; - - {cached, {multicast_supported, JID, RLimits}} -> - %% XEP33 is supported by the server, thanks to this service - {route_multicast, JID, RLimits}; - - {obsolete, {multicast_supported, Old_service, _RLimits}} -> - {ask, Old_service, renewal}; - - {obsolete, not_supported} -> - {ask, Server, not_renewal}; - - not_cached -> - {ask, Server, not_renewal} - + {cached, local_server} -> + %% Send a copy of the packet to each local user on Dests + route_single; + {cached, not_supported} -> + %% Send a copy of the packet to each remote user on Dests + route_single; + {cached, {multicast_supported, JID, RLimits}} -> + {route_multicast, JID, RLimits}; + {obsolete, + {multicast_supported, Old_service, _RLimits}} -> + {ask, Old_service, renewal}; + {obsolete, not_supported} -> {ask, Server, not_renewal}; + not_cached -> {ask, Server, not_renewal} end. - %%%------------------------- %%% Route packet %%%------------------------- -%% Build and send packet to this group of destinations -%% From = jid() -%% ToS = string() route_packet(From, ToDest, Packet, AAttrs, Addresses) -> Dests = case ToDest#dest.type of - "bcc" -> []; - _ -> [ToDest] + <<"bcc">> -> []; + _ -> [ToDest] end, - route_packet2(From, ToDest#dest.jid_string, Dests, Packet, AAttrs, Addresses). + route_packet2(From, ToDest#dest.jid_string, Dests, + Packet, AAttrs, Addresses). -route_packet_multicast(From, ToS, Packet, AAttrs, Dests, Addresses, Limits) -> +route_packet_multicast(From, ToS, Packet, AAttrs, Dests, + Addresses, Limits) -> Type_of_stanza = type_of_stanza(Packet), - {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), + {_Type, Limit_number} = get_limit_number(Type_of_stanza, + Limits), Fragmented_dests = fragment_dests(Dests, Limit_number), - [route_packet2(From, ToS, DFragment, Packet, AAttrs, Addresses) || DFragment <- Fragmented_dests]. + [route_packet2(From, ToS, DFragment, Packet, AAttrs, + Addresses) + || DFragment <- Fragmented_dests]. -route_packet2(From, ToS, Dests, Packet, AAttrs, Addresses) -> - {xmlelement, T, A, C} = Packet, +route_packet2(From, ToS, Dests, Packet, AAttrs, + Addresses) -> + #xmlel{name = T, attrs = A, children = C} = Packet, C2 = case append_dests(Dests, Addresses) of - [] -> - C; - ACs -> - [{xmlelement, "addresses", AAttrs, ACs} | C] + [] -> C; + ACs -> + [#xmlel{name = <<"addresses">>, attrs = AAttrs, + children = ACs} + | C] end, - - Packet2 = {xmlelement, T, A, C2}, + Packet2 = #xmlel{name = T, attrs = A, children = C2}, ToJID = stj(ToS), ejabberd_router:route(From, ToJID, Packet2). -append_dests([], Addresses) -> - Addresses; +append_dests([], Addresses) -> Addresses; append_dests([Dest | Dests], Addresses) -> append_dests(Dests, [Dest#dest.full_xml | Addresses]). - %%%------------------------- %%% Check relay %%%------------------------- check_relay(RS, LS, Gs) -> case check_relay_required(RS, LS, Gs) of - false -> ok; - true -> throw(edrelay) + false -> ok; + true -> throw(edrelay) end. -%% If the sender is external, and at least one destination is external, -%% then this package requires relaying check_relay_required(RServer, LServerS, Groups) -> - case string:str(RServer, LServerS) > 0 of - true -> false; - false -> check_relay_required(LServerS, Groups) + case str:str(RServer, LServerS) > 0 of + true -> false; + false -> check_relay_required(LServerS, Groups) end. check_relay_required(LServerS, Groups) -> - lists:any( - fun(Group) -> - Group#group.server /= LServerS - end, - Groups). - + lists:any(fun (Group) -> Group#group.server /= LServerS + end, + Groups). %%%------------------------- %%% Check protocol support: Send request %%%------------------------- -%% Ask the server if it supports XEP33 send_query_info(RServerS, LServiceS) -> - %% Don't ask a service which JID is "echo.*", - case string:str(RServerS, "echo.") of - 1 -> false; - _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO) + case str:str(RServerS, <<"echo.">>) of + 1 -> false; + _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO) end. send_query_items(RServerS, LServiceS) -> send_query(RServerS, LServiceS, ?NS_DISCO_ITEMS). send_query(RServerS, LServiceS, XMLNS) -> - Packet = {xmlelement, "iq", - [{"to", RServerS}, {"type", "get"}], - [{xmlelement, "query", [{"xmlns", XMLNS}], []}]}, - - ejabberd_router:route(stj(LServiceS), stj(RServerS), Packet). - + Packet = #xmlel{name = <<"iq">>, + attrs = [{<<"to">>, RServerS}, {<<"type">>, <<"get">>}], + children = + [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, XMLNS}], + children = []}]}, + ejabberd_router:route(stj(LServiceS), stj(RServerS), + Packet). %%%------------------------- %%% Check protocol support: Receive response: Error %%%------------------------- process_iqreply_error(From, LServiceS, _Packet) -> - %% We don't need to change the TO attribute in the outgoing XMPP packet, - %% since ejabberd will do it - - %% We do not change the FROM attribute in the outgoing XMPP packet, - %% this way the user will know what server reported the error - FromS = jts(From), case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - received_awaiter(FromS, Waiter, LServiceS); - _ -> ok + {found_waiter, Waiter} -> + received_awaiter(FromS, Waiter, LServiceS); + _ -> ok end. - %%%------------------------- %%% Check protocol support: Receive response: Disco %%%------------------------- -process_iqreply_result(From, LServiceS, Packet, State) -> - {xmlelement, "query", Attrs2, Els2} = xml:get_subtag(Packet, "query"), - case xml:get_attr_s("xmlns", Attrs2) of - ?NS_DISCO_INFO -> - process_discoinfo_result(From, LServiceS, Els2, State); - ?NS_DISCO_ITEMS -> - process_discoitems_result(From, LServiceS, Els2) +process_iqreply_result(From, LServiceS, Packet, + State) -> + #xmlel{name = <<"query">>, attrs = Attrs2, + children = Els2} = + xml:get_subtag(Packet, <<"query">>), + case xml:get_attr_s(<<"xmlns">>, Attrs2) of + ?NS_DISCO_INFO -> + process_discoinfo_result(From, LServiceS, Els2, State); + ?NS_DISCO_ITEMS -> + process_discoitems_result(From, LServiceS, Els2) end. - %%%------------------------- %%% Check protocol support: Receive response: Disco Info %%%------------------------- -process_discoinfo_result(From, LServiceS, Els, _State) -> +process_discoinfo_result(From, LServiceS, Els, + _State) -> FromS = jts(From), case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - process_discoinfo_result2(From, FromS, LServiceS, Els, Waiter); - _ -> - ok + {found_waiter, Waiter} -> + process_discoinfo_result2(From, FromS, LServiceS, Els, + Waiter); + _ -> ok end. -process_discoinfo_result2(From, FromS, LServiceS, Els, Waiter) -> - %% Check the response, to see if it includes the XEP33 feature. If support == - Multicast_support = - lists:any( - fun(XML) -> - case XML of - {xmlelement, "feature", Attrs, _} -> - ?NS_ADDRESS == xml:get_attr_s("var", Attrs); - _ -> false - end - end, - Els), - +process_discoinfo_result2(From, FromS, LServiceS, Els, + Waiter) -> + Multicast_support = lists:any(fun (XML) -> + case XML of + #xmlel{name = <<"feature">>, + attrs = Attrs} -> + (?NS_ADDRESS) == + xml:get_attr_s(<<"var">>, + Attrs); + _ -> false + end + end, + Els), Group = Waiter#waiter.group, RServer = Group#group.server, - case Multicast_support of - true -> - %% Inspect the XML of the disco#info response to get the limits of the remote service - SenderT = sender_type(From), - RLimits = get_limits_xml(Els, SenderT), - - %% Store this response on cache - add_response(RServer, {multicast_supported, FromS, RLimits}), - - %% Send XEP33 packet to JID - FromM = Waiter#waiter.sender, - DestsM = Group#group.dests, - PacketM = Waiter#waiter.packet, - AAttrsM = Waiter#waiter.aattrs, - AddressesM = Waiter#waiter.addresses, - RServiceM = FromS, - route_packet_multicast(FromM, RServiceM, PacketM, AAttrsM, DestsM, AddressesM, RLimits), - - %% Remove from Pool - delo_waiter(Waiter); - - false -> - %% So we now know that JID does not support XEP33 - case FromS of - - RServer -> - %% We asked the server, now let's see if any component supports it: - - %% Send disco#items query to JID - send_query_items(FromS, LServiceS), - - %% Store on Pool - delo_waiter(Waiter), - add_waiter(Waiter#waiter{ - awaiting = {[FromS], LServiceS, items}, - renewal = false - }); - - %% We asked a component, and it does not support XEP33 - _ -> - received_awaiter(FromS, Waiter, LServiceS) - - end + true -> + SenderT = sender_type(From), + RLimits = get_limits_xml(Els, SenderT), + add_response(RServer, + {multicast_supported, FromS, RLimits}), + FromM = Waiter#waiter.sender, + DestsM = Group#group.dests, + PacketM = Waiter#waiter.packet, + AAttrsM = Waiter#waiter.aattrs, + AddressesM = Waiter#waiter.addresses, + RServiceM = FromS, + route_packet_multicast(FromM, RServiceM, PacketM, + AAttrsM, DestsM, AddressesM, RLimits), + delo_waiter(Waiter); + false -> + case FromS of + RServer -> + send_query_items(FromS, LServiceS), + delo_waiter(Waiter), + add_waiter(Waiter#waiter{awaiting = + {[FromS], LServiceS, items}, + renewal = false}); + %% We asked a component, and it does not support XEP33 + _ -> received_awaiter(FromS, Waiter, LServiceS) + end end. get_limits_xml(Els, SenderT) -> - %% Get limits reported by the remote service LimitOpts = get_limits_els(Els), - - %% Build the final list of limits - %% For the ones not reported, put default numbers build_remote_limit_record(LimitOpts, SenderT). -%% Look for disco#info extras which may report limits -%% TODO: Check if there are useful functions in xml.erl to clean this code get_limits_els(Els) -> - lists:foldl( - fun(XML, R) -> - case XML of - {xmlelement, "x", Attrs, SubEls} -> - case (?NS_XDATA == xml:get_attr_s("xmlns", Attrs)) and - ("result" == xml:get_attr_s("type", Attrs)) of - true -> get_limits_fields(SubEls) ++ R; - false -> R - end; - _ -> R - end - end, - [], - Els - ). + lists:foldl(fun (XML, R) -> + case XML of + #xmlel{name = <<"x">>, attrs = Attrs, + children = SubEls} -> + case ((?NS_XDATA) == + xml:get_attr_s(<<"xmlns">>, Attrs)) + and + (<<"result">> == + xml:get_attr_s(<<"type">>, Attrs)) + of + true -> get_limits_fields(SubEls) ++ R; + false -> R + end; + _ -> R + end + end, + [], Els). get_limits_fields(Fields) -> - {Head, Tail} = lists:partition( - fun(Field) -> - case Field of - {xmlelement, "field", Attrs, _SubEls} -> - ("FORM_TYPE" == xml:get_attr_s("var", Attrs)) - and ("hidden" == xml:get_attr_s("type", Attrs)); - _ -> false - end - end, - Fields - ), + {Head, Tail} = lists:partition(fun (Field) -> + case Field of + #xmlel{name = <<"field">>, + attrs = Attrs} -> + (<<"FORM_TYPE">> == + xml:get_attr_s(<<"var">>, + Attrs)) + and + (<<"hidden">> == + xml:get_attr_s(<<"type">>, + Attrs)); + _ -> false + end + end, + Fields), case Head of - [] -> []; - _ -> get_limits_values(Tail) + [] -> []; + _ -> get_limits_values(Tail) end. get_limits_values(Values) -> - lists:foldl( - fun(Value, R) -> - case Value of - {xmlelement, "field", Attrs, SubEls} -> - %% TODO: Only one subel is expected here, but there may be several - [{xmlelement, "value", _AttrsV, SubElsV}] = SubEls, - Number = xml:get_cdata(SubElsV), - Name = xml:get_attr_s("var", Attrs), - [{list_to_atom(Name), list_to_integer(Number)} | R]; - _ -> R - end - end, - [], - Values - ). - + lists:foldl(fun (Value, R) -> + case Value of + #xmlel{name = <<"field">>, attrs = Attrs, + children = SubEls} -> + [#xmlel{name = <<"value">>, children = SubElsV}] = + SubEls, + Number = xml:get_cdata(SubElsV), + Name = xml:get_attr_s(<<"var">>, Attrs), + [{jlib:binary_to_atom(Name), + jlib:binary_to_integer(Number)} + | R]; + _ -> R + end + end, + [], Values). %%%------------------------- %%% Check protocol support: Receive response: Disco Items %%%------------------------- process_discoitems_result(From, LServiceS, Els) -> - %% Convert list of xmlelement into list of strings - List = lists:foldl( - fun(XML, Res) -> - %% For each one, if it's "item", look for jid - case XML of - {xmlelement, "item", Attrs, _} -> - Res ++ [xml:get_attr_s("jid", Attrs)]; - _ -> Res - end - end, - [], - Els), - - %% Send disco#info queries to each item + List = lists:foldl(fun (XML, Res) -> + case XML of + #xmlel{name = <<"item">>, attrs = Attrs} -> + Res ++ [xml:get_attr_s(<<"jid">>, Attrs)]; + _ -> Res + end + end, + [], Els), [send_query_info(Item, LServiceS) || Item <- List], - - %% Search who was awaiting a disco#items response from this JID FromS = jts(From), - {found_waiter, Waiter} = search_waiter(FromS, LServiceS, items), - + {found_waiter, Waiter} = search_waiter(FromS, LServiceS, + items), delo_waiter(Waiter), - add_waiter(Waiter#waiter{ - awaiting = {List, LServiceS, info}, - renewal = false - }). - + add_waiter(Waiter#waiter{awaiting = + {List, LServiceS, info}, + renewal = false}). %%%------------------------- %%% Check protocol support: Receive response: Received awaiter @@ -957,191 +868,160 @@ received_awaiter(JID, Waiter, LServiceS) -> delo_waiter(Waiter), Group = Waiter#waiter.group, RServer = Group#group.server, - - %% Remove this awaiter from the list of awaiting JIDs. case lists:delete(JID, JIDs) of - - [] -> - %% We couldn't find any service in this server that supports XEP33 - case Waiter#waiter.renewal of - - false -> - %% Store on cache the response - add_response(RServer, not_supported), - - %% Send a copy of the packet to each remote user on Dests - From = Waiter#waiter.sender, - Packet = Waiter#waiter.packet, - AAttrs = Waiter#waiter.aattrs, - Addresses = Waiter#waiter.addresses, - [route_packet(From, ToUser, Packet, AAttrs, Addresses) - || ToUser <- Group#group.dests]; - - true -> - %% We asked this component because the cache - %% said it would support XEP33, but it doesn't! - send_query_info(RServer, LServiceS), - add_waiter(Waiter#waiter{ - awaiting = {[RServer], LServiceS, info}, - renewal = false - }) - end; - - JIDs2 -> - %% Maybe other component on the server supports XEP33 - add_waiter(Waiter#waiter{ - awaiting = {JIDs2, LServiceS, info}, - renewal = false - }) + [] -> + case Waiter#waiter.renewal of + false -> + add_response(RServer, not_supported), + From = Waiter#waiter.sender, + Packet = Waiter#waiter.packet, + AAttrs = Waiter#waiter.aattrs, + Addresses = Waiter#waiter.addresses, + [route_packet(From, ToUser, Packet, AAttrs, Addresses) + || ToUser <- Group#group.dests]; + true -> + send_query_info(RServer, LServiceS), + add_waiter(Waiter#waiter{awaiting = + {[RServer], LServiceS, info}, + renewal = false}) + end; + JIDs2 -> + add_waiter(Waiter#waiter{awaiting = + {JIDs2, LServiceS, info}, + renewal = false}) end. - %%%------------------------- %%% Cache %%%------------------------- create_cache() -> - mnesia:create_table(multicastc, [{ram_copies, [node()]}, - {attributes, record_info(fields, multicastc)}]). + mnesia:create_table(multicastc, + [{ram_copies, [node()]}, + {attributes, record_info(fields, multicastc)}]). -%% Add this response to the cache. -%% If a previous response still exists, it's overwritten add_response(RServer, Response) -> - Secs = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + Secs = + calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), mnesia:dirty_write(#multicastc{rserver = RServer, - response = Response, - ts = Secs}). + response = Response, ts = Secs}). -%% Search on the cache if there is a response for the server -%% If there is a response but is obsolete, -%% don't bother removing since it will later be overwritten anyway search_server_on_cache(RServer, LServerS, _Maxmins) - when RServer == LServerS -> + when RServer == LServerS -> {cached, local_server}; - search_server_on_cache(RServer, _LServerS, Maxmins) -> case look_server(RServer) of - not_cached -> - not_cached; - {cached, Response, Ts} -> - Now = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), - case is_obsolete(Response, Ts, Now, Maxmins) of - false -> {cached, Response}; - true -> {obsolete, Response} - end + not_cached -> not_cached; + {cached, Response, Ts} -> + Now = + calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + case is_obsolete(Response, Ts, Now, Maxmins) of + false -> {cached, Response}; + true -> {obsolete, Response} + end end. look_server(RServer) -> case mnesia:dirty_read(multicastc, RServer) of - [] -> not_cached; - [M] -> {cached, M#multicastc.response, M#multicastc.ts} + [] -> not_cached; + [M] -> {cached, M#multicastc.response, M#multicastc.ts} end. is_obsolete(Response, Ts, Now, {Max_pos, Max_neg}) -> Max = case Response of - multicast_not_supported -> Max_neg; - _ -> Max_pos + multicast_not_supported -> Max_neg; + _ -> Max_pos end, - (Now - Ts) > Max. - + Now - Ts > Max. %%%------------------------- %%% Purge cache %%%------------------------- purge() -> - Maxmins_positive = ?MAXTIME_CACHE_POSITIVE, - Maxmins_negative = ?MAXTIME_CACHE_NEGATIVE, - Now = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + Maxmins_positive = (?MAXTIME_CACHE_POSITIVE), + Maxmins_negative = (?MAXTIME_CACHE_NEGATIVE), + Now = + calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), purge(Now, {Maxmins_positive, Maxmins_negative}). purge(Now, Maxmins) -> - F = fun() -> - mnesia:foldl( - fun(R, _) -> - #multicastc{response = Response, ts = Ts} = R, - %% If this record is obsolete, delete it - case is_obsolete(Response, Ts, Now, Maxmins) of - true -> mnesia:delete_object(R); - false -> ok - end - end, - none, - multicastc) + F = fun () -> + mnesia:foldl(fun (R, _) -> + #multicastc{response = Response, ts = Ts} = + R, + case is_obsolete(Response, Ts, Now, + Maxmins) + of + true -> mnesia:delete_object(R); + false -> ok + end + end, + none, multicastc) end, mnesia:transaction(F). - %%%------------------------- %%% Purge cache loop %%%------------------------- try_start_loop() -> case lists:member(?PURGE_PROCNAME, registered()) of - true -> ok; - false -> start_loop() + true -> ok; + false -> start_loop() end, - ?PURGE_PROCNAME ! new_module. + (?PURGE_PROCNAME) ! new_module. start_loop() -> - register(?PURGE_PROCNAME, spawn(?MODULE, purge_loop, [0])), - ?PURGE_PROCNAME ! purge_now. + register(?PURGE_PROCNAME, + spawn(?MODULE, purge_loop, [0])), + (?PURGE_PROCNAME) ! purge_now. -try_stop_loop() -> - ?PURGE_PROCNAME ! try_stop. +try_stop_loop() -> (?PURGE_PROCNAME) ! try_stop. -%% NM = number of modules are running on this node purge_loop(NM) -> receive - purge_now -> - purge(), - timer:send_after(?CACHE_PURGE_TIMER, ?PURGE_PROCNAME, purge_now), - purge_loop(NM); - new_module -> - purge_loop(NM + 1); - try_stop when NM > 1 -> - purge_loop(NM - 1); - try_stop -> - purge_loop_finished + purge_now -> + purge(), + timer:send_after(?CACHE_PURGE_TIMER, ?PURGE_PROCNAME, + purge_now), + purge_loop(NM); + new_module -> purge_loop(NM + 1); + try_stop when NM > 1 -> purge_loop(NM - 1); + try_stop -> purge_loop_finished end. - %%%------------------------- %%% Pool %%%------------------------- create_pool() -> - catch ets:new(multicastp, [duplicate_bag, public, named_table, {keypos, 2}]). + catch ets:new(multicastp, + [duplicate_bag, public, named_table, {keypos, 2}]). -%% If a Waiter with the same key exists, it overwrites it add_waiter(Waiter) -> true = ets:insert(multicastp, Waiter). delo_waiter(Waiter) -> true = ets:delete_object(multicastp, Waiter). -%% Search on the Pool who is waiting for this result -%% If there are several matches, pick the first one only search_waiter(JID, LServiceS, Type) -> - Rs = ets:foldl( - fun(W, Res) -> - {JIDs, LServiceS1, Type1} = W#waiter.awaiting, - case lists:member(JID, JIDs) - and (LServiceS == LServiceS1) - and (Type1 == Type) of - true -> Res ++ [W]; - false -> Res - end - end, - [], - multicastp - ), + Rs = ets:foldl(fun (W, Res) -> + {JIDs, LServiceS1, Type1} = W#waiter.awaiting, + case lists:member(JID, JIDs) and + (LServiceS == LServiceS1) + and (Type1 == Type) + of + true -> Res ++ [W]; + false -> Res + end + end, + [], multicastp), case Rs of - [R | _] -> {found_waiter, R}; - [] -> waiter_not_found + [R | _] -> {found_waiter, R}; + [] -> waiter_not_found end. - %%%------------------------- %%% Limits: utils %%%------------------------- @@ -1156,72 +1036,63 @@ search_waiter(JID, LServiceS, Type) -> list_of_limits(local) -> [{message, ?DEFAULT_LIMIT_LOCAL_MESSAGE}, {presence, ?DEFAULT_LIMIT_LOCAL_PRESENCE}]; - list_of_limits(remote) -> [{message, ?DEFAULT_LIMIT_REMOTE_MESSAGE}, {presence, ?DEFAULT_LIMIT_REMOTE_PRESENCE}]. -build_service_limit_record(LimitOpts) -> +build_service_limit_record(LimitOpts) -> LimitOptsL = get_from_limitopts(LimitOpts, local), LimitOptsR = get_from_limitopts(LimitOpts, remote), - {service_limits, - build_limit_record(LimitOptsL, local), - build_limit_record(LimitOptsR, remote) - }. + {service_limits, build_limit_record(LimitOptsL, local), + build_limit_record(LimitOptsR, remote)}. get_from_limitopts(LimitOpts, SenderT) -> - [{StanzaT, Number} - || {SenderT2, StanzaT, Number} <- LimitOpts, + [{StanzaT, Number} + || {SenderT2, StanzaT, Number} <- LimitOpts, SenderT =:= SenderT2]. -%% Build a record of type #limits{} -%% In fact, it builds a list and then converts to tuple -%% It is important to put the elements in the list in -%% the same order than the elements in record #limits build_remote_limit_record(LimitOpts, SenderT) -> build_limit_record(LimitOpts, SenderT). build_limit_record(LimitOpts, SenderT) -> - Limits = [ - get_limit_value(Name, Default, LimitOpts) + Limits = [get_limit_value(Name, Default, LimitOpts) || {Name, Default} <- list_of_limits(SenderT)], list_to_tuple([limits | Limits]). get_limit_value(Name, Default, LimitOpts) -> case lists:keysearch(Name, 1, LimitOpts) of - {value, {Name, Number}} -> - {custom, Number}; - false -> - {default, Default} + {value, {Name, Number}} -> {custom, Number}; + false -> {default, Default} end. -type_of_stanza({xmlelement, "message", _, _}) -> message; -type_of_stanza({xmlelement, "presence", _, _}) -> presence. +type_of_stanza(#xmlel{name = <<"message">>}) -> message; +type_of_stanza(#xmlel{name = <<"presence">>}) -> + presence. -get_limit_number(message, Limits) -> Limits#limits.message; -get_limit_number(presence, Limits) -> Limits#limits.presence. +get_limit_number(message, Limits) -> + Limits#limits.message; +get_limit_number(presence, Limits) -> + Limits#limits.presence. -get_slimit_group(local, SLimits) -> SLimits#service_limits.local; -get_slimit_group(remote, SLimits) -> SLimits#service_limits.remote. +get_slimit_group(local, SLimits) -> + SLimits#service_limits.local; +get_slimit_group(remote, SLimits) -> + SLimits#service_limits.remote. fragment_dests(Dests, Limit_number) -> - {R, _} = lists:foldl( - fun(Dest, {Res, Count}) -> - case Count of - Limit_number -> - Head2 = [Dest], - {[Head2 | Res], 0}; - _ -> - [Head | Tail] = Res, - Head2 = [Dest | Head], - {[Head2 | Tail], Count+1} - end - end, - {[[]], 0}, - Dests), + {R, _} = lists:foldl(fun (Dest, {Res, Count}) -> + case Count of + Limit_number -> + Head2 = [Dest], {[Head2 | Res], 0}; + _ -> + [Head | Tail] = Res, + Head2 = [Dest | Head], + {[Head2 | Tail], Count + 1} + end + end, + {[[]], 0}, Dests), R. - %%%------------------------- %%% Limits: XEP-0128 Service Discovery Extensions %%%------------------------- @@ -1229,58 +1100,62 @@ fragment_dests(Dests, Limit_number) -> %% Some parts of code are borrowed from mod_muc_room.erl -define(RFIELDT(Type, Var, Val), - {xmlelement, "field", [{"var", Var}, {"type", Type}], - [{xmlelement, "value", [], [{xmlcdata, Val}]}]}). + #xmlel{name = <<"field">>, + attrs = [{<<"var">>, Var}, {<<"type">>, Type}], + children = + [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, Val}]}]}). -define(RFIELDV(Var, Val), - {xmlelement, "field", [{"var", Var}], - [{xmlelement, "value", [], [{xmlcdata, Val}]}]}). + #xmlel{name = <<"field">>, attrs = [{<<"var">>, Var}], + children = + [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, Val}]}]}). iq_disco_info_extras(From, State) -> SenderT = sender_type(From), Service_limits = State#state.service_limits, case iq_disco_info_extras2(SenderT, Service_limits) of - [] -> []; - List_limits_xmpp -> - [{xmlelement, "x", [{"xmlns", ?NS_XDATA}, {"type", "result"}], - [?RFIELDT("hidden", "FORM_TYPE", ?NS_ADDRESS)] ++ List_limits_xmpp - }] + [] -> []; + List_limits_xmpp -> + [#xmlel{name = <<"x">>, + attrs = + [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"result">>}], + children = + [?RFIELDT(<<"hidden">>, <<"FORM_TYPE">>, (?NS_ADDRESS))] + ++ List_limits_xmpp}] end. sender_type(From) -> - Local_hosts = ?MYHOSTS, + Local_hosts = (?MYHOSTS), case lists:member(From#jid.lserver, Local_hosts) of - true -> local; - false -> remote + true -> local; + false -> remote end. iq_disco_info_extras2(SenderT, SLimits) -> - %% And report only the limits that are interesting for this sender Limits = get_slimit_group(SenderT, SLimits), Stanza_types = [message, presence], - lists:foldl( - fun(Type_of_stanza, R) -> - %% Report only custom limits - case get_limit_number(Type_of_stanza, Limits) of - {custom, Number} -> - [?RFIELDV(to_string(Type_of_stanza), to_string(Number)) | R]; - {default, _} -> R - end - end, - [], - Stanza_types). - -to_string(A) -> - hd(io_lib:format("~p",[A])). + lists:foldl(fun (Type_of_stanza, R) -> + case get_limit_number(Type_of_stanza, Limits) of + {custom, Number} -> + [?RFIELDV((to_binary(Type_of_stanza)), + (to_binary(Number))) + | R]; + {default, _} -> R + end + end, + [], Stanza_types). +to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))). %%%------------------------- %%% Error report %%%------------------------- route_error(From, To, Packet, ErrType, ErrText) -> - {xmlelement, _Name, Attrs, _Els} = Packet, - Lang = xml:get_attr_s("xml:lang", Attrs), + #xmlel{attrs = Attrs} = Packet, + Lang = xml:get_attr_s(<<"xml:lang">>, Attrs), Reply = make_reply(ErrType, Lang, ErrText), Err = jlib:make_error_reply(Packet, Reply), ejabberd_router:route(From, To, Err). @@ -1297,4 +1172,5 @@ make_reply(forbidden, Lang, ErrText) -> ?ERRT_FORBIDDEN(Lang, ErrText). stj(String) -> jlib:string_to_jid(String). + jts(String) -> jlib:jid_to_string(String).