From d3b4bc817dadb3fb6ad8c669b0037156a0733f96 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Fri, 6 Feb 2015 14:15:30 +0100 Subject: [PATCH] Initial import of mod_mam Import an implementation of XEP-0313: Message Archive Management, version 0.3. --- mod_mam/COPYING | 342 +++++++++++ mod_mam/Emakefile | 4 + mod_mam/README.txt | 54 ++ mod_mam/TODO.txt | 6 + mod_mam/build.bat | 1 + mod_mam/build.sh | 2 + mod_mam/ebin/.keepme | 0 mod_mam/src/mod_mam.erl | 1259 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 1668 insertions(+) create mode 100644 mod_mam/COPYING create mode 100644 mod_mam/Emakefile create mode 100644 mod_mam/README.txt create mode 100644 mod_mam/TODO.txt create mode 100644 mod_mam/build.bat create mode 100755 mod_mam/build.sh create mode 100644 mod_mam/ebin/.keepme create mode 100644 mod_mam/src/mod_mam.erl diff --git a/mod_mam/COPYING b/mod_mam/COPYING new file mode 100644 index 0000000..cc498bd --- /dev/null +++ b/mod_mam/COPYING @@ -0,0 +1,342 @@ +As a special exception, the authors give permission to link this program +with the OpenSSL library and distribute the resulting binary. + + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + , 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. diff --git a/mod_mam/Emakefile b/mod_mam/Emakefile new file mode 100644 index 0000000..fe1263d --- /dev/null +++ b/mod_mam/Emakefile @@ -0,0 +1,4 @@ +{'../ejabberd-dev/src/gen_mod', [{outdir, "../ejabberd-dev/ebin"}, {i, "../ejabberd-dev/include"}]}. +{'../ejabberd-dev/src/lager_transform', [{outdir, "../ejabberd-dev/ebin"}, {i, "../ejabberd-dev/include"}]}. +{'../ejabberd-dev/src/lager_util', [{outdir, "../ejabberd-dev/ebin"}, {i, "../ejabberd-dev/include"}]}. +{'src/mod_mam', [{outdir, "ebin"}, {i, "../ejabberd-dev/include"}, {d, 'LAGER'}]}. diff --git a/mod_mam/README.txt b/mod_mam/README.txt new file mode 100644 index 0000000..ae42b21 --- /dev/null +++ b/mod_mam/README.txt @@ -0,0 +1,54 @@ + + mod_mam - Message Archive Management (XEP-0313) + + Author: Holger Weiss + Requirements: ejabberd 14.12 or newer + + + DESCRIPTION + ----------- + +This module implements Message Archive Management (XEP-0313). + + + CONFIGURATION + ------------- + +In order to use this module with the default settings, add the following +line to the modules section of your ejabberd.yml file: + + mod_mam: {} + +The configurable options are: + +- access_max_user_messages (default: 'max_user_mam_messages') + + This option defines which access rule is used to limit the maximum number + of MAM messages a user can have. That rule should either yield 0, which + disables MAM storage for the corresponding user(s), or a positive number, + or 'infinity' (which is the default). + +- default_page_size (default: 25) + + If a large set of messages is requested using MAM, the response is split + into smaller chunks ("pages"). The client can optionally specify the + number of messages it would like to receive per page. This option + specifies the number of messages returned if the client doesn't do that. + +- max_page_size (default: 100) + + This option specifies the upper limit of messages transmitted per chunk. + If the client requests larger pages, 'max_page_size' is used instead. + +- request_activates_archiving (default: 'true') + + By default, message archiving for a given user won't be enabled before one + of his clients issued a MAM request. If message archiving should instead + be activated for all users immediately, set 'request_activates_archiving' + to 'false'. + +- iqdisc (default: 'parallel') + + The module also supports the 'iqdisc' option, as documented here: + + http://www.process-one.net/docs/ejabberd/guide_en.html#modiqdiscoption diff --git a/mod_mam/TODO.txt b/mod_mam/TODO.txt new file mode 100644 index 0000000..c08c01f --- /dev/null +++ b/mod_mam/TODO.txt @@ -0,0 +1,6 @@ + + Feature requests for mod_mam + +- Add ODBC support. +- Support archiving preferences (XEP-0313, section 6). +- Add form element that allows for matching contents. diff --git a/mod_mam/build.bat b/mod_mam/build.bat new file mode 100644 index 0000000..57e634e --- /dev/null +++ b/mod_mam/build.bat @@ -0,0 +1 @@ +erl -pa ../ejabberd-dev/ebin -pz ebin -make diff --git a/mod_mam/build.sh b/mod_mam/build.sh new file mode 100755 index 0000000..f0a0cfa --- /dev/null +++ b/mod_mam/build.sh @@ -0,0 +1,2 @@ +#!/bin/sh +erl -pa ../ejabberd-dev/ebin -pz ebin -make diff --git a/mod_mam/ebin/.keepme b/mod_mam/ebin/.keepme new file mode 100644 index 0000000..e69de29 diff --git a/mod_mam/src/mod_mam.erl b/mod_mam/src/mod_mam.erl new file mode 100644 index 0000000..e90724f --- /dev/null +++ b/mod_mam/src/mod_mam.erl @@ -0,0 +1,1259 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_mam.erl +%%% Author : Holger Weiss +%%% Purpose : Message Archive Management (XEP-0313) +%%% Created : 25 Jan 2015 by Holger Weiss +%%% +%%% +%%% ejabberd, Copyright (C) 2015 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(mod_mam). +-author('holger@zedat.fu-berlin.de'). + +-define(NS_MAM, <<"urn:xmpp:mam:0">>). +-define(PROCNAME, ?MODULE). +-define(GEN_SERVER, gen_server). +-define(DEFAULT_MAX_MESSAGES, infinity). +-define(DEFAULT_PAGE_SIZE, 25). +-define(MAX_PAGE_SIZE, 100). + +-behaviour(?GEN_SERVER). +-behaviour(gen_mod). + +%% gen_mod/supervisor callbacks. +-export([start_link/2, + start/2, + stop/1]). + +%% gen_server callbacks. +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +%% ejabberd_hooks callbacks. +-export([receive_stanza/4, + send_stanza/3, + remove_user/2]). + +%% gen_iq_handler callback. +-export([handle_iq/3]). + +%% Spawned processes. +-export([maybe_change_mnesia_fragment_count/2]). + +-include("jlib.hrl"). +-include("logger.hrl"). + +-record(mam_msg, + {key :: mam_msg_key(), + time :: erlang:timestamp(), + route :: route(), + from :: ljid(), + to :: ljid(), + stanza :: xmlel()}). + +-record(mam_meta, + {mam_jid :: mam_jid(), + mam_type :: mam_type(), + first_id :: mam_msg_id() | undefined, + last_id :: mam_msg_id() | undefined, + arch_always = [] :: [jid()], + arch_never = [] :: [jid()], + arch_default = always :: mam_behavior()}). + +-record(mam_filter, + {start :: erlang:timestamp() | error, + fin :: erlang:timestamp() | error, + with :: ljid() | error}). + +-record(mam_query, + {mam_jid :: mam_jid(), + direction :: direction(), + query_id :: binary() | undefined, + id :: mam_msg_id() | undefined | error, + index :: non_neg_integer() | undefined | error, + max :: non_neg_integer() | undefined | error, + filter :: mam_filter()}). + +-record(mam_query_state, + {messages = [] :: [mam_msg()], + first :: mam_msg_id() | undefined, + last :: mam_msg_id() | undefined, + current :: mam_msg_id() | undefined, + n_remaining :: non_neg_integer()}). + +-record(mam_result, + {messages = [] :: [mam_msg()], + count :: non_neg_integer() | undefined, + index :: non_neg_integer() | undefined, + first :: mam_msg_id() | undefined, + last :: mam_msg_id() | undefined, + is_complete :: boolean()}). + +-record(mam_page_size_conf, + {default :: pos_integer(), + max :: pos_integer()}). + +-record(state, + {host :: binary(), + access_max_messages :: atom(), + request_activates_archiving :: boolean()}). + +-type state() :: #state{}. +-type mam_msg() :: #mam_msg{}. +-type mam_meta() :: #mam_meta{}. +-type mam_filter() :: #mam_filter{}. +-type mam_query() :: #mam_query{}. +-type mam_query_state() :: #mam_query_state{}. +-type mam_result() :: #mam_result{}. +-type mam_page_size_conf() :: #mam_page_size_conf{}. +-type mam_jid() :: {binary(), binary()}. +-type mam_msg_id() :: non_neg_integer(). +-type mam_msg_key() :: {mam_jid(), mam_msg_id()}. +-type mam_max_msgs() :: non_neg_integer() | infinity. +-type mam_query_id() :: binary() | undefined. +-type mam_type() :: user | muc | pubsub. +-type mam_filter_type() :: start | fin | with. +-type mam_behavior() :: always | never | roster. +-type route() :: incoming | outgoing. +-type direction() :: before | aft. +-type db_type() :: odbc | mnesia | riak. % But we currently only support Mnesia. + +%%-------------------------------------------------------------------- +%% gen_mod/supervisor callbacks. +%%-------------------------------------------------------------------- + +-spec start_link(binary(), gen_mod:opts()) -> {ok, pid()} | ignore | {error, _}. + +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + ?GEN_SERVER:start_link({local, Proc}, ?MODULE, {Host, Opts}, []). + +-spec start(binary(), gen_mod:opts()) -> {ok, _} | {ok, _, _} | {error, _}. + +start(Host, Opts) -> + %% Set up processing of MAM requests. + IQDisc = gen_mod:get_opt(iqdisc, Opts, + fun gen_iq_handler:check_type/1, + parallel), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM, ?MODULE, + handle_iq, IQDisc), + mod_disco:register_feature(Host, ?NS_MAM), + %% Set up message storage process. + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + Spec = {Proc, {?MODULE, start_link, [Host, Opts]}, permanent, 3000, worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, Spec). + +-spec stop(binary()) -> ok. + +stop(Host) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + ok = supervisor:terminate_child(ejabberd_sup, Proc), + ok = supervisor:delete_child(ejabberd_sup, Proc). + +%%-------------------------------------------------------------------- +%% gen_server callbacks. +%%-------------------------------------------------------------------- + +-spec init({binary(), gen_mod:opts()}) -> {ok, state()}. + +init({Host, Opts}) -> + process_flag(trap_exit, true), + case gen_mod:db_type(Opts) of + mnesia -> + %% We start with two fragments; and by default, each fragment is + %% replicated to all nodes. + FragProperties = [{n_fragments, 2}, + {n_disc_only_copies, length([myself | nodes()])}], + mnesia:create_table(mam_msg, + [{frag_properties, FragProperties}, + {attributes, record_info(fields, mam_msg)}]), + mnesia:create_table(mam_meta, + [{disc_copies, [node()]}, + {attributes, record_info(fields, mam_meta)}]); + _ -> ok + end, + ejabberd_hooks:add(user_receive_packet, Host, ?MODULE, + receive_stanza, 50), + ejabberd_hooks:add(user_send_packet, Host, ?MODULE, + send_stanza, 50), + ejabberd_hooks:add(remove_user, Host, ?MODULE, + remove_user, 50), + ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, + remove_user, 50), + AccessMaxMsgs = + gen_mod:get_opt(access_max_user_messages, Opts, + fun(A) when is_atom(A) -> A end, max_user_mam_messages), + RequestActivatesArchiving = + gen_mod:get_opt(request_activates_archiving, Opts, + fun(B) when is_boolean(B) -> B end, true), + {ok, #state{host = Host, + access_max_messages = AccessMaxMsgs, + request_activates_archiving = RequestActivatesArchiving}}. + +-spec handle_call(_, {pid(), _}, state()) -> {noreply, state()}. + +handle_call(Request, From, State) -> + ?ERROR_MSG("Got unexpected request from ~p: ~p", [From, Request]), + {noreply, State}. + +-spec handle_cast(_, state()) -> {noreply, state()}. + +handle_cast({store, US, Msg}, State) -> + maybe_store_message(State, US, Msg), + {noreply, State}; +handle_cast(Request, State) -> + ?ERROR_MSG("Got unexpected request: ~p", [Request]), + {noreply, State}. + +-spec handle_info(timeout | _, state()) -> {noreply, state()}. + +handle_info({'EXIT', Pid, normal}, State) -> + ?DEBUG("Got info: PID ~w exited normally", [Pid]), + {noreply, State}; +handle_info(Info, State) -> + ?ERROR_MSG("Got unexpected info: ~p", [Info]), + {noreply, State}. + +-spec terminate(normal | shutdown | {shutdown, _} | _, _) -> ok. + +terminate(Reason, #state{host = Host}) -> + ?DEBUG("Stopping MAM archiving for ~s: ~p", [Host, Reason]), + ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE, + receive_stanza, 50), + ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, + send_stanza, 50), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, + remove_user, 50), + ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, + remove_user, 50). + +-spec code_change({down, _} | _, state(), _) -> {ok, state()}. + +code_change(_OldVsn, #state{host = Host} = State, _Extra) -> + ?DEBUG("Updating MAM archiving process for ~s", [Host]), + {ok, State}. + +%%-------------------------------------------------------------------- +%% ejabberd_hooks callbacks. +%%-------------------------------------------------------------------- + +-spec receive_stanza(jid(), jid(), jid(), xmlel()) -> ok. + +receive_stanza(#jid{luser = U, lserver = S} = JID, From, To, + #xmlel{name = <<"message">>} = Stanza) -> + case is_desired(incoming, JID, To, Stanza) of + true -> + Proc = gen_mod:get_module_proc(S, ?PROCNAME), + Msg = #mam_msg{route = incoming, + from = jlib:jid_tolower(From), + to = jlib:jid_tolower(To), + stanza = Stanza}, + ?GEN_SERVER:cast(Proc, {store, {U, S}, Msg}); + false -> + ?DEBUG("Won't archive undesired incoming stanza for ~s", + [jlib:jid_to_string(To)]), + ok + end; +receive_stanza(_JID, _From, _To, _Stanza) -> ok. + +-spec send_stanza(jid(), jid(), xmlel()) -> ok. + +send_stanza(#jid{luser = U, lserver = S} = From, To, + #xmlel{name = <<"message">>} = Stanza) -> + case is_desired(outgoing, From, To, Stanza) of + true -> + Proc = gen_mod:get_module_proc(S, ?PROCNAME), + Msg = #mam_msg{route = outgoing, + from = jlib:jid_tolower(From), + to = jlib:jid_tolower(To), + stanza = jlib:replace_from(From, Stanza)}, + ?GEN_SERVER:cast(Proc, {store, {U, S}, Msg}); + false -> + ?DEBUG("Won't archive undesired outgoing stanza from ~s", + [jlib:jid_to_string(From)]), + ok + end; +send_stanza(_From, _To, _Stanza) -> ok. + +%%-------------------------------------------------------------------- +%% Check whether stanza should be stored. +%%-------------------------------------------------------------------- + +-spec is_desired(route(), jid(), jid(), xmlel()) -> boolean(). + +is_desired(Route, JID, To, Message) -> + is_chat_or_normal_message(Message) andalso + has_non_empty_body(Message) andalso + not has_no_storage_hint(Message) andalso + not is_bare_copy(Route, JID, To) andalso + not is_resent(Message). + +-spec is_chat_or_normal_message(xmlel()) -> boolean(). + +is_chat_or_normal_message(#xmlel{name = <<"message">>} = Message) -> + case message_type(Message) of + <<"chat">> -> + true; + <<"normal">> -> + true; + _ -> + false + end; +is_chat_or_normal_message(_Message) -> false. + +-spec message_type(xmlel()) -> binary(). + +message_type(#xmlel{attrs = Attrs}) -> + case xml:get_attr(<<"type">>, Attrs) of + {value, Type} -> + Type; + false -> + <<"normal">> + end. + +-spec has_non_empty_body(xmlel()) -> boolean(). + +has_non_empty_body(Message) -> + xml:get_subtag_cdata(Message, <<"body">>) =/= <<"">>. + +-spec has_no_storage_hint(xmlel()) -> boolean(). + +has_no_storage_hint(Message) -> + xml:get_subtag_with_xmlns(Message, <<"no-storage">>, ?NS_HINTS) + =/= false orelse + xml:get_subtag_with_xmlns(Message, <<"no-permanent-storage">>, ?NS_HINTS) + =/= false. + +-spec is_bare_copy(route(), jid(), jid()) -> boolean(). + +is_bare_copy(incoming, #jid{luser = U, lserver = S, lresource = R}, To) -> + PrioRes = ejabberd_sm:get_user_present_resources(U, S), + MaxRes = case catch lists:max(PrioRes) of + {_Prio, Res} when is_binary(Res) -> + Res; + _ -> + undefined + end, + IsBareTo = case To of + #jid{lresource = <<"">>} -> + true; + #jid{lresource = LRes} -> + %% Unavailable resources are handled like bare JIDs. + lists:keyfind(LRes, 2, PrioRes) =:= false + end, + case {IsBareTo, R} of + {true, MaxRes} -> + ?DEBUG("Recipient of message to bare JID has top priority: ~s@~s/~s", + [U, S, R]), + false; + {true, _R} -> + %% The message was sent to our bare JID, and we currently have + %% multiple resources with the same highest priority, so the session + %% manager routes the message to each of them. We store the message + %% only from the resource where R equals MaxRes. + ?DEBUG("Additional recipient of message to bare JID: ~s@~s/~s", + [U, S, R]), + true; + {false, _R} -> + false + end; +is_bare_copy(outgoing, _JID, _To) -> false. + +-spec is_resent(xmlel()) -> boolean(). + +is_resent(El) -> + case xml:get_subtag_cdata(El, <<"delay">>) of + <<"">> -> + false; + Desc -> + binary:match(Desc, <<"Resent">>) =/= nomatch + end. + +%%-------------------------------------------------------------------- +%% Store message. +%%-------------------------------------------------------------------- + +-spec maybe_store_message(state(), mam_jid(), mam_msg()) -> ok. + +maybe_store_message(#state{host = Host} = State, US, Msg) -> + DBType = gen_mod:db_type(Host, ?MODULE), + maybe_store_message(State, US, Msg, DBType). + +-spec maybe_store_message(state(), mam_jid(), mam_msg(), db_type()) -> ok. + +maybe_store_message(#state{host = Host, access_max_messages = AccessMaxMsgs} = + State, {U, S} = US, Msg, mnesia) -> + UpdateTables = + fun() -> + {Meta, ID} = + case mnesia:read(mam_meta, US, write) of + [#mam_meta{first_id = undefined} = M] -> + {M#mam_meta{first_id = 1}, 1}; + [M] -> + {M, M#mam_meta.last_id + 1}; + [] -> + {maybe_init_meta(State, US, mnesia), 1} + end, + case Meta of + #mam_meta{} -> + case get_max_messages(AccessMaxMsgs, US, Host) of + 0 -> + ?DEBUG("Not storing MAM message for ~s@~s", [U, S]), + remove_messages(Meta, mnesia); + MaxMsgs -> + ?DEBUG("Storing MAM message for ~s@~s", [U, S]), + store_message(Meta, ID, Msg, MaxMsgs, mnesia) + end; + undefined -> + ?DEBUG("Not storing MAM message for ~s@~s", [U, S]), + ok + end + end, + Transaction = fun() -> mnesia:sync_transaction(UpdateTables) end, + {atomic, ok} = global:trans({mod_mam_table_write, mod_mam}, Transaction), + manage_mnesia_fragments(). + +-spec maybe_init_meta(state(), mam_jid(), db_type()) -> mam_meta() | undefined. + +maybe_init_meta(#state{request_activates_archiving = false}, US, mnesia) -> + #mam_meta{mam_jid = US, mam_type = user, first_id = 1}; +maybe_init_meta(#state{request_activates_archiving = true}, _US, mnesia) -> + undefined. + +-spec store_message(mam_meta(), mam_msg_id(), mam_msg(), mam_max_msgs(), + db_type()) -> ok. + +store_message(#mam_meta{mam_jid = US, first_id = FirstID, last_id = LastID} = + Meta, ID, Msg, MaxMsgs, mnesia) + when MaxMsgs =:= infinity; + LastID =:= undefined; + LastID - FirstID + 1 < MaxMsgs -> + UpdateMsgTab = + fun() -> + mnesia:write(Msg#mam_msg{key = {US, ID}, time = os:timestamp()}) + end, + mnesia:activity(transaction, UpdateMsgTab, [], mnesia_frag), + mnesia:write(Meta#mam_meta{last_id = ID}); +store_message(#mam_meta{mam_jid = US, first_id = FirstID, last_id = LastID} = + Meta, ID, Msg, MaxMsgs, mnesia) -> + NumStored = LastID - FirstID + 1, + NumDelete = NumStored - MaxMsgs + 1, % + 1 to make room for the new message. + NewFirstID = FirstID + NumDelete, + UpdateMsgTab = + fun() -> + DeleteMsg = fun(DelID) -> + mnesia:delete({mam_msg, {US, DelID}}) + end, + lists:foreach(DeleteMsg, lists:seq(FirstID, NewFirstID - 1)), + mnesia:write(Msg#mam_msg{key = {US, ID}, time = os:timestamp()}) + end, + mnesia:activity(transaction, UpdateMsgTab, [], mnesia_frag), + mnesia:write(Meta#mam_meta{first_id = NewFirstID, last_id = ID}). + +-spec remove_messages(mam_meta(), db_type()) -> ok. + +remove_messages(#mam_meta{last_id = undefined}, mnesia) -> ok; +remove_messages(#mam_meta{mam_jid = {U, S}}, mnesia) -> + remove_user(U, S, mnesia). + +-spec get_max_messages(atom(), mam_jid(), binary()) -> mam_max_msgs(). + +get_max_messages(AccessRule, {U, S}, Host) -> + case acl:match_rule(Host, AccessRule, jlib:make_jid(U, S, <<"">>)) of + Max when is_integer(Max), Max >= 0 -> + Max; + infinity -> + infinity; + _ -> + ?DEFAULT_MAX_MESSAGES + end. + +%%-------------------------------------------------------------------- +%% Manage Mnesia fragments. +%%-------------------------------------------------------------------- + +-spec manage_mnesia_fragments() -> ok. + +manage_mnesia_fragments() -> + %% Check the table fragment sizes only occasionally. + manage_mnesia_fragments(random:uniform(100) =:= 1). + +-spec manage_mnesia_fragments(boolean()) -> ok. + +manage_mnesia_fragments(true) -> + Info = fun(Item) -> mnesia:table_info(mam_msg, Item) end, + %% + %% Size is the number of bytes: + %% + %% http://erlang.org/pipermail/erlang-questions/2009-July/044970.html + %% + Size = mnesia:activity(sync_dirty, Info, [memory], mnesia_frag), + Number = mnesia:activity(sync_dirty, Info, [n_fragments], mnesia_frag), + FragSize = Size div Number, + ?DEBUG("Average MAM table fragment size: ~B", [FragSize]), + %% + %% With disc_only_copies, the (Dets) limit on each fragment is 2 GB. We're + %% very conservative and double the number of fragments when the average size + %% reaches 500 MB. See also: + %% + %% http://www.tamewildsystems.com/2010/05/mnesia-one-year-later-part-3.html + %% + Action = if FragSize > 500000000 -> + [add, Number]; + FragSize < 100000000, Number >= 4 -> + [del, Number div 2]; + true -> + none + end, + if Action =/= none -> + case spawn_link(?MODULE, maybe_change_mnesia_fragment_count, Action) + of + Pid when is_pid(Pid) -> ok + end; + Action =:= none -> ok + end; +manage_mnesia_fragments(false) -> ok. + +-spec maybe_change_mnesia_fragment_count(add | del, pos_integer()) -> ok. + +maybe_change_mnesia_fragment_count(Operation, Number) -> + WriteLock = {mod_mam_table_write, self()}, + ReFragLock = {mod_mam_table_refrag, self()}, + ChangeFragCount = + fun() -> + DoIt = fun() -> + change_mnesia_fragment_count(Operation, Number) + end, + global:trans(WriteLock, DoIt) + end, + case global:trans(ReFragLock, ChangeFragCount, [node() | nodes()], 0) of + ok -> + ?DEBUG("Done changing number of MAM table fragments", []), + ok; + aborted -> + ?DEBUG("Got no lock to change number of MAM table fragments", []), + ok + end. + +-spec change_mnesia_fragment_count(add | del, pos_integer()) -> ok. + +change_mnesia_fragment_count(add, Number) -> + ?INFO_MSG("Adding ~B table fragments for MAM storage", [Number]), + AddFrag = fun(_N) -> + Info = fun(Item) -> mnesia:table_info(mam_msg, Item) end, + Layout = mnesia:activity(sync_dirty, Info, [frag_dist], + mnesia_frag), + {atomic, ok} = + mnesia:change_table_frag(mam_msg, {add_frag, Layout}) + end, + lists:foreach(AddFrag, lists:seq(1, Number)); +change_mnesia_fragment_count(del, Number) -> + ?INFO_MSG("Removing ~B table fragments for MAM storage", [Number]), + DelFrag = fun(_N) -> + {atomic, ok} = + mnesia:change_table_frag(mam_msg, del_frag) + end, + lists:foreach(DelFrag, lists:seq(1, Number)). + +%%-------------------------------------------------------------------- +%% Handle IQ requests. +%%-------------------------------------------------------------------- + +-spec handle_iq(jid(), jid(), iq_request()) -> iq_reply(). + +handle_iq(#jid{luser = U, lserver = S}, + #jid{luser = U, lserver = S}, + #iq{type = get, sub_el = #xmlel{name = <<"query">>}} = IQ) -> + ?DEBUG("Got MAM form request from ~s@~s", [U, S]), + handle_form_request(IQ); +handle_iq(#jid{luser = U, lserver = S} = From, + #jid{luser = U, lserver = S}, + #iq{type = set, sub_el = #xmlel{name = <<"query">>}} = IQ) -> + ?DEBUG("Got MAM archive request from ~s@~s", [U, S]), + handle_archive_request(From, IQ); +handle_iq(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, + #iq{sub_el = #xmlel{name = <<"prefs">>} = SubEl} = IQ) -> + ?DEBUG("Refusing MAM preferences request from ~s@~s (not implemented)", + [U, S]), + IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}; +handle_iq(From, To, #iq{sub_el = SubEl} = IQ) -> + ?DEBUG("Refusing MAM request from ~s to ~s (forbidden)", + [jlib:jid_to_string(From), jlib:jid_to_string(To)]), + IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}. + +-spec handle_form_request(iq_request()) -> iq_result(). + +handle_form_request(IQ) -> + Fields = [#xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"hidden">>}, + {<<"var">>, <<"FORM_TYPE">>}], + children = [#xmlel{name = <<"value">>, + children = [{xmlcdata, ?NS_MAM}]}]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"jid-single">>}, + {<<"var">>, <<"with">>}]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-single">>}, + {<<"var">>, <<"start">>}]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-single">>}, + {<<"var">>, <<"end">>}]}], + Form = #xmlel{name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}], + children = Fields}, + IQ#iq{type = result, + sub_el = [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, ?NS_MAM}], + children = [Form]}]}. + +-spec handle_archive_request(jid(), iq_set()) -> ignore | iq_error(). + +handle_archive_request(#jid{luser = U, lserver = S} = JID, + #iq{sub_el = SubEl} = IQ) -> + Query1 = parse_request(S, SubEl), + Query2 = Query1#mam_query{mam_jid = {U, S}}, + DBType = gen_mod:db_type(S, ?MODULE), + case query_archive(Query2, DBType) of + #mam_result{messages = Msgs} = Result -> + ?DEBUG("MAM archive query for ~s successful", + [jlib:jid_to_string(JID)]), + QueryID = Query2#mam_query.query_id, + send_iq_result(JID, IQ), + send_mam_messages(JID, QueryID, Msgs), + send_fin_message(JID, QueryID, Result), + ignore; % gen_iq_handler:process_iq/6 shouldn't respond. + {error, Error} -> + ?DEBUG("MAM archive query for ~s returned an error: ~p", + [jlib:jid_to_string(JID), Error]), + IQ#iq{type = error, sub_el = [SubEl, Error]} + end. + +%%-------------------------------------------------------------------- +%% Parse request. +%%-------------------------------------------------------------------- + +-spec parse_request(binary(), xmlel()) -> mam_query(). + +parse_request(Host, Query) -> + RSM = case jlib:rsm_decode(Query) of + #rsm_in{} = R -> + R; + none -> + #rsm_in{} + end, + Filter = parse_form(Query), + Max = normalize_max(RSM#rsm_in.max, get_page_size_conf(Host)), + Index = RSM#rsm_in.index, + ID = case RSM#rsm_in.id of + undefined -> + undefined; + <<"">> -> + undefined; + Bin -> + case str:to_integer(Bin) of + {Int, _Rest} when is_integer(Int), Int > 0 -> + Int; + _ -> + error + end + end, + Direction = case RSM#rsm_in.direction of + before -> + before; + _ -> + aft + end, + QueryID = case xml:get_tag_attr(<<"queryid">>, Query) of + {value, Value} -> + Value; + false -> + undefined + end, + Parsed = #mam_query{query_id = QueryID, + direction = Direction, + id = ID, + index = Index, + max = Max, + filter = Filter}, + ?DEBUG("Got MAM query: ~p", [Parsed]), + Parsed. + +-spec parse_form(xmlel() | [xmlel()]) -> mam_filter(). + +parse_form(#xmlel{} = Query) -> + case xml:get_subtag_with_xmlns(Query, <<"x">>, ?NS_XDATA) of + #xmlel{children = Fields} -> + parse_form(Fields); + false -> + #mam_filter{} + end; +parse_form(Fields) when is_list(Fields) -> + Parse = + fun(#xmlel{name = <<"field">>, + attrs = Attrs, + children = [#xmlel{name = <<"value">>, children = [Value]}]}, + Form) -> + case xml:get_attr_s(<<"var">>, Attrs) of + <<"FORM_TYPE">> -> + Form; + <<"start">> -> + CData = xml:get_cdata([Value]), + case jlib:datetime_string_to_timestamp(CData) of + undefined -> + Form#mam_filter{start = error}; + Start -> + Form#mam_filter{start = Start} + end; + <<"end">> -> + CData = xml:get_cdata([Value]), + case jlib:datetime_string_to_timestamp(CData) of + undefined -> + Form#mam_filter{fin = error}; + End -> + Form#mam_filter{fin = End} + end; + <<"with">> -> + CData = xml:get_cdata([Value]), + case jlib:string_to_jid(CData) of + error -> + Form#mam_filter{with = error}; + #jid{} = JID -> + Form#mam_filter{with = jlib:jid_tolower(JID)} + end; + Var -> + ?DEBUG("Got unexpected form variable: ~p", [Var]), + Form + end; + (El, Form) -> + ?DEBUG("Got unexpected form element: ~p", [El]), + Form + end, + lists:foldl(Parse, #mam_filter{}, Fields). + +-spec get_page_size_conf(binary()) -> mam_page_size_conf(). + +get_page_size_conf(Host) -> + DefaultPageSize = gen_mod:get_module_opt(Host, ?MODULE, default_page_size, + fun(D) when is_integer(D), D > 0 -> + D + end, + ?DEFAULT_PAGE_SIZE), + MaxPageSize = gen_mod:get_module_opt(Host, ?MODULE, max_page_size, + fun(M) when is_integer(M), M > 0 -> + M + end, + ?MAX_PAGE_SIZE), + #mam_page_size_conf{default = DefaultPageSize, max = MaxPageSize}. + +-spec normalize_max(integer() | undefined | error, mam_page_size_conf()) + -> non_neg_integer() | error. + +normalize_max(error, _Conf) -> + error; +normalize_max(undefined, #mam_page_size_conf{default = Default}) -> + Default; +normalize_max(Max, #mam_page_size_conf{max = Limit}) when Max > Limit -> + Limit; +normalize_max(Max, _Conf) when Max < 0 -> % Huh. + 0; +normalize_max(Max, _Conf) -> + Max. + +-spec get_start_id(mam_query(), mam_meta()) -> mam_msg_id() | undefined. + +get_start_id(_Query, + #mam_meta{first_id = undefined, last_id = undefined}) -> + undefined; +get_start_id(#mam_query{id = ID, index = undefined, direction = before}, + #mam_meta{last_id = LastID}) when ID =:= undefined; + ID > LastID -> + LastID; +get_start_id(#mam_query{id = ID, index = undefined, direction = before}, + _Meta) -> + ID - 1; +get_start_id(#mam_query{id = ID, index = undefined, direction = aft}, + #mam_meta{first_id = FirstID}) when ID =:= undefined; + ID < FirstID -> + FirstID; +get_start_id(#mam_query{id = ID, index = undefined, direction = aft}, + _Meta) -> + ID + 1; +get_start_id(#mam_query{index = Index}, + #mam_meta{first_id = FirstID}) -> + FirstID + Index. + +-spec check_request(mam_query()) -> ok | {error, _}. + +check_request(#mam_query{id = error}) -> + ?DEBUG("Got invalid or value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{index = error}) -> + ?DEBUG("Got invalid value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{max = error}) -> + ?DEBUG("Got invalid value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{filter = #mam_filter{start = error}}) -> + ?DEBUG("Got invalid value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{filter = #mam_filter{fin = error}}) -> + ?DEBUG("Got invalid value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{filter = #mam_filter{with = error}}) -> + ?DEBUG("Got invalid value", []), + {error, ?ERR_BAD_REQUEST}; +check_request(#mam_query{index = Index, filter = Filter}) + when is_integer(Index), Filter =/= #mam_filter{} -> + %% We don't support both an index and filters. + ?DEBUG("Cannot use with filters", []), + {error, ?ERR_FEATURE_NOT_IMPLEMENTED}; +check_request(_Query) -> + ok. + +%%-------------------------------------------------------------------- +%% Send responses. +%%-------------------------------------------------------------------- + +-spec send_iq_result(jid(), iq_request()) -> ok. + +send_iq_result(#jid{lserver = Host} = JID, IQ) -> + ?DEBUG("Sending IQ result to ~s", [jlib:jid_to_string(JID)]), + Response = jlib:make_result_iq_reply(jlib:iq_to_xml(IQ#iq{sub_el = []})), + ejabberd_router:route(jlib:make_jid(<<"">>, Host, <<"">>), JID, Response). + +-spec send_mam_messages(jid(), mam_query_id() | undefined, [mam_msg()]) -> ok. + +send_mam_messages(JID, QueryID, Msgs) -> + lists:foreach(fun(Msg) -> send_mam_message(JID, QueryID, Msg) end, Msgs). + +-spec send_mam_message(jid(), mam_query_id() | undefined, mam_msg()) -> ok. + +send_mam_message(#jid{lserver = Host} = JID, QueryID, + #mam_msg{key = {_US, MamID}, stanza = Stanza, time = Time}) -> + ID = jlib:encode_base64(crypto:rand_bytes(9)), + To = jlib:jid_to_string(JID), + QueryIDAttr = if is_binary(QueryID) -> + [{<<"queryid">>, QueryID}]; + QueryID =:= undefined -> + [] + end, + NoCopy = #xmlel{name = <<"no-copy">>, + attrs = [{<<"xmlns">>, ?NS_HINTS}]}, + Forwarded = #xmlel{name = <<"forwarded">>, + attrs = [{<<"xmlns">>, ?NS_FORWARD}], + children = [Stanza]}, + Result = #xmlel{name = <<"result">>, + attrs = [{<<"xmlns">>, ?NS_MAM}, + {<<"to">>, To}, + {<<"id">>, jlib:integer_to_binary(MamID)}] + ++ QueryIDAttr, + children = [jlib:add_delay_info(Forwarded, Host, Time)]}, + Message = #xmlel{name = <<"message">>, + attrs = [{<<"id">>, ID}, {<<"to">>, To}], + children = [Result, NoCopy]}, + ?DEBUG("Sending MAM message ~B to ~s", [MamID, To]), + ejabberd_router:route(jlib:make_jid(<<"">>, Host, <<"">>), JID, Message). + +-spec send_fin_message(jid(), mam_query_id(), mam_result()) -> ok. + +send_fin_message(#jid{lserver = Host} = JID, QueryID, + #mam_result{count = Count, + index = Index, + first = First, + last = Last, + is_complete = IsComplete}) -> + ID = jlib:encode_base64(crypto:rand_bytes(9)), + To = jlib:jid_to_string(JID), + QueryIDAttr = if is_binary(QueryID) -> + [{<<"queryid">>, QueryID}]; + QueryID =:= undefined -> + [] + end, + NoCopy = #xmlel{name = <<"no-copy">>, attrs = [{<<"xmlns">>, ?NS_HINTS}]}, + CompleteAttr = if IsComplete -> + [{<<"complete">>, <<"true">>}]; + not IsComplete -> + [] + end, + FirstBin = if is_integer(First) -> + jlib:integer_to_binary(First); + First =:= undefined -> + undefined + end, + LastBin = if is_integer(Last) -> + jlib:integer_to_binary(Last); + Last =:= undefined -> + undefined + end, + RSM = #rsm_out{count = Count, + index = Index, + first = FirstBin, + last = LastBin}, + Fin = #xmlel{name = <<"fin">>, + attrs = [{<<"xmlns">>, ?NS_MAM}] + ++ QueryIDAttr ++ CompleteAttr, + children = jlib:rsm_encode(RSM)}, + Message = #xmlel{name = <<"message">>, + attrs = [{<<"id">>, ID}, {<<"to">>, To}], + children = [Fin, NoCopy]}, + ?DEBUG("Sending MAM result to ~s: ~p (~w)", [To, RSM, IsComplete]), + ejabberd_router:route(jlib:make_jid(<<"">>, Host, <<"">>), JID, Message). + +%%-------------------------------------------------------------------- +%% Query MAM archive. +%%-------------------------------------------------------------------- + +-spec query_archive(mam_query(), db_type()) -> mam_result() | {error, _}. + +query_archive(Query, mnesia) -> + {atomic, Result} = + mnesia:transaction(fun() -> collect_messages(Query, mnesia) end), + Result. + +-spec read_meta(mam_jid(), db_type()) -> mam_meta(). + +read_meta(US, mnesia) -> + case mnesia:read(mam_meta, US) of + [M] -> + M; + [] -> + M = #mam_meta{mam_jid = US, mam_type = user}, + mnesia:write(M), % Initialize MAM for this user. + M + end. + +-spec collect_messages(mam_query(), db_type()) + -> mam_result() | {error, _}. + +collect_messages(#mam_query{mam_jid = {U, S}, max = Max} = Query, DBType) -> + case check_request(Query) of + ok -> + Meta = read_meta({U, S}, DBType), + StartID = get_start_id(Query, Meta), + collect_messages(Query, #mam_query_state{current = StartID, + n_remaining = Max}, + Meta, DBType); + {error, Error} -> + {error, Error} + end. + +-spec collect_messages(mam_query(), mam_query_state(), mam_meta(), db_type()) + -> mam_result(). + +collect_messages(Query, + #mam_query_state{n_remaining = Remaining, + current = ID} = QueryState, + #mam_meta{first_id = FirstID, + last_id = LastID} = Meta, + DBType) when Remaining =:= 0; + ID =:= undefined; + ID < FirstID; + ID > LastID -> % We're done! + #mam_result{messages = resulting_messages(Query, QueryState), + count = resulting_count(Query, Meta), + index = resulting_index(Query, QueryState, Meta), + first = resulting_first(Query, QueryState), + last = resulting_last(Query, QueryState), + is_complete = result_is_complete(Query, QueryState, Meta, + DBType)}; +collect_messages(#mam_query{mam_jid = {U, S}, + filter = Filter} = Query, + #mam_query_state{messages = Msgs, + current = ID, + n_remaining = N} = QueryState, + Meta, DBType) -> + case read_message({{U, S}, ID}, Filter, DBType) of + #mam_msg{} = Msg -> + NewQueryState = + case QueryState of + #mam_query_state{first = undefined} -> + QueryState#mam_query_state{first = ID, + last = ID, + messages = [Msg]}; + #mam_query_state{} -> + QueryState#mam_query_state{last = ID, + messages = [Msg | Msgs]} + end, + collect_next(Query, NewQueryState, Meta, N - 1, DBType); + filtered -> + collect_next(Query, QueryState, Meta, N, DBType); + not_found -> + ?ERROR_MSG("MAM message ~B of ~s@~s not found", [ID, U, S]), + collect_next(Query, QueryState, Meta, N - 1, DBType) + end. + +-spec collect_next(mam_query(), mam_query_state(), mam_meta(), + non_neg_integer(), db_type()) -> mam_result(). + +collect_next(#mam_query{direction = before} = Query, + #mam_query_state{current = ID} = QueryState, Meta, N, DBType) -> + collect_messages(Query, QueryState#mam_query_state{current = ID - 1, + n_remaining = N}, + Meta, DBType); +collect_next(#mam_query{direction = aft} = Query, + #mam_query_state{current = ID} = QueryState, Meta, N, DBType) -> + collect_messages(Query, QueryState#mam_query_state{current = ID + 1, + n_remaining = N}, + Meta, DBType). + +-spec read_message(mam_msg_key(), mam_filter(), db_type()) + -> mam_msg() | filtered | not_found. + +read_message(Key, Filter, mnesia) -> + ReadMsg = fun() -> mnesia:read(mam_msg, Key) end, + case mnesia:activity(transaction, ReadMsg, [], mnesia_frag) of + [#mam_msg{} = Msg] -> + case filter_message(Msg, Filter) of + pass -> + ?DEBUG("Message ~p passes filter ~p", [Msg, Filter]), + Msg; + drop -> + ?DEBUG("Message ~p dropped by filter ~p", [Msg, Filter]), + filtered + end; + [] -> not_found + end. + +-spec filter_message(mam_msg(), mam_filter()) -> pass | drop. + +filter_message(_Msg, Filter) when Filter =:= #mam_filter{} -> pass; +filter_message(Msg, Filter) -> + lists:foldl(fun(FilterType, pass) -> + filter_message(FilterType, Msg, Filter); + (_FilterType, drop) -> + drop + end, pass, [start, fin, with]). + +-spec filter_message(mam_filter_type(), mam_msg(), mam_filter()) -> pass | drop. + +filter_message(start, + _Msg, + #mam_filter{start = undefined}) -> + pass; +filter_message(start, + #mam_msg{time = Time}, + #mam_filter{start = Start}) when Time >= Start -> + pass; +filter_message(start, _Msg, _Filter) -> + drop; + +filter_message(fin, + _Msg, + #mam_filter{fin = undefined}) -> + pass; +filter_message(fin, + #mam_msg{time = Time}, + #mam_filter{fin = End}) when Time =< End -> + pass; +filter_message(fin, _Msg, _Filter) -> + drop; + +filter_message(with, + _Msg, + #mam_filter{with = undefined}) -> + pass; + +filter_message(with, Msg, #mam_filter{with = {_U, _S, <<"">>}} = Filter) -> + filter_message_with(bare, Msg, Filter); +filter_message(with, Msg, Filter) -> + filter_message_with(full, Msg, Filter). + +-spec filter_message_with(bare | full, mam_msg(), mam_filter()) -> pass | drop. + +filter_message_with(bare, + #mam_msg{from = {U, S, _}, to = {U, S, _}}, + #mam_filter{with = {U, S, _}}) -> + %% XEP-0313 (0.3) says: "If the 'with' field's value is the bare JID of the + %% archive, the server must only return results where both the 'to' and + %% 'from' match the bare JID". + pass; +filter_message_with(bare, + #mam_msg{key = {{U, S}, _}}, + #mam_filter{with = {U, S, _}}) -> + drop; + +filter_message_with(bare, + #mam_msg{from = {U, S, _}}, + #mam_filter{with = {U, S, _}}) -> + pass; +filter_message_with(bare, + #mam_msg{to = {U, S, _}}, + #mam_filter{with = {U, S, _}}) -> + pass; +filter_message_with(bare, _Msg, _Filter) -> + drop; + +filter_message_with(full, + #mam_msg{from = {U, S, R}}, + #mam_filter{with = {U, S, R}}) -> + pass; +filter_message_with(full, + #mam_msg{to = {U, S, R}}, + #mam_filter{with = {U, S, R}}) -> + pass; +filter_message_with(full, _Msg, _Filter) -> + drop. + +-spec another_message_exists(mam_query(), mam_msg_id(), db_type()) -> boolean(). + +another_message_exists(#mam_query{mam_jid = {U, S}, + direction = Direction, + filter = Filter } = Query, ID, DBType) -> + case read_message({{U, S}, ID}, Filter, DBType) of + #mam_msg{} -> + ?DEBUG("Found another message for ~s@~s: ~B", [U, S, ID]), + true; + not_found -> + ?DEBUG("Found no other message for ~s@~s: ~B", [U, S, ID]), + false; + filtered -> + NextID = + case Direction of + before -> + ID - 1; + aft -> + ID + 1 + end, + another_message_exists(Query, NextID, DBType) + end. + +%%-------------------------------------------------------------------- +%% Extract collect_messages/4 results. +%%-------------------------------------------------------------------- + +-spec resulting_messages(mam_query(), mam_query_state()) -> [mam_msg()]. + +resulting_messages(#mam_query{direction = before}, + #mam_query_state{messages = Msgs}) -> + Msgs; +resulting_messages(#mam_query{direction = aft}, + #mam_query_state{messages = Msgs}) -> + lists:reverse(Msgs). + +-spec resulting_count(mam_query(), mam_meta()) -> non_neg_integer() | undefined. + +resulting_count(#mam_query{filter = Filter}, + #mam_meta{first_id = FirstID, last_id = LastID}) + when Filter =:= #mam_filter{}, + is_integer(FirstID), + is_integer(LastID) -> + LastID - FirstID + 1; +resulting_count(_Query, _Meta) -> undefined. + +-spec resulting_index(mam_query(), mam_query_state(), mam_meta()) + -> non_neg_integer() | undefined. + +resulting_index(#mam_query{filter = Filter, direction = before}, + #mam_query_state{last = Last}, + #mam_meta{first_id = FirstID}) + when Filter =:= #mam_filter{}, + is_integer(Last), + is_integer(FirstID) -> + Last - FirstID; +resulting_index(#mam_query{filter = Filter, direction = aft}, + #mam_query_state{first = First}, + #mam_meta{first_id = FirstID}) + when Filter =:= #mam_filter{}, + is_integer(First), + is_integer(FirstID) -> + First - FirstID; +resulting_index(_Query, _QueryState, _Meta) -> undefined. + +-spec resulting_first(mam_query(), mam_query_state()) + -> mam_msg_id() | undefined. + +resulting_first(#mam_query{direction = before}, + #mam_query_state{last = Last}) -> + Last; +resulting_first(#mam_query{direction = aft}, + #mam_query_state{first = First}) -> + First. + +-spec resulting_last(mam_query(), mam_query_state()) + -> mam_msg_id() | undefined. + +resulting_last(#mam_query{direction = before}, + #mam_query_state{first = First}) -> + First; +resulting_last(#mam_query{direction = aft}, + #mam_query_state{last = Last}) -> + Last. + +-spec result_is_complete(mam_query(), mam_query_state(), mam_meta(), db_type()) + -> boolean(). + +result_is_complete(#mam_query{filter = Filter} = Query, + #mam_query_state{current = ID}, + _Meta, DBType) + when Filter =/= #mam_filter{}, ID =/= undefined -> + not another_message_exists(Query, ID, DBType); +result_is_complete(#mam_query{direction = before}, + #mam_query_state{first = First}, + #mam_meta{first_id = FirstID}, _DBType) -> + First =:= FirstID; +result_is_complete(#mam_query{direction = aft}, + #mam_query_state{last = Last}, + #mam_meta{last_id = LastID}, _DBType) -> + Last =:= LastID. + +%%-------------------------------------------------------------------- +%% Remove user. +%%-------------------------------------------------------------------- + +-spec remove_user(binary(), binary()) -> ok. + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + DBType = gen_mod:db_type(LServer, ?MODULE), + ?INFO_MSG("Removing MAM archive of ~s@~s", [LUser, LServer]), + remove_user(LUser, LServer, DBType). + +-spec remove_user(binary(), binary(), db_type()) -> ok. + +remove_user(LUser, LServer, mnesia) -> + US = {LUser, LServer}, + Remove = + fun() -> + case mnesia:read(mam_meta, US, write) of + [#mam_meta{first_id = undefined, last_id = undefined}] -> + mnesia:delete({mam_meta, US}); + [#mam_meta{first_id = FirstID, last_id = LastID}] -> + DeleteMsgs = + fun() -> + DeleteMsg = + fun(ID) -> + mnesia:delete({mam_msg, {US, ID}}) + end, + lists:foreach(DeleteMsg, lists:seq(FirstID, + LastID)) + end, + mnesia:activity(transaction, DeleteMsgs, [], mnesia_frag), + mnesia:delete({mam_meta, US}); + [] -> + ok + end + end, + {atomic, ok} = mnesia:transaction(Remove), + ok.