mod_mam: Simplify querying the archive

Now that the Mnesia tables are no longer queried in transaction context,
the code can be simplified a bit.  The behavior should remain unchanged.
This commit is contained in:
Holger Weiss 2015-02-13 00:55:05 +01:00
parent c825f6deea
commit d430788865
1 changed files with 49 additions and 59 deletions

View File

@ -633,14 +633,12 @@ handle_form_request(IQ) ->
handle_archive_request(#jid{luser = U, lserver = S} = JID, handle_archive_request(#jid{luser = U, lserver = S} = JID,
#iq{sub_el = SubEl} = IQ) -> #iq{sub_el = SubEl} = IQ) ->
Query1 = parse_request(S, SubEl), Query = parse_request(S, SubEl),
Query2 = Query1#mam_query{mam_jid = {U, S}}, case query_archive(Query#mam_query{mam_jid = {U, S}}) of
DBType = gen_mod:db_type(S, ?MODULE),
case query_archive(Query2, DBType) of
#mam_result{messages = Msgs} = Result -> #mam_result{messages = Msgs} = Result ->
?DEBUG("MAM archive query for ~s successful", ?DEBUG("MAM archive query for ~s successful",
[jlib:jid_to_string(JID)]), [jlib:jid_to_string(JID)]),
QueryID = Query2#mam_query.query_id, QueryID = Query#mam_query.query_id,
send_iq_result(JID, IQ), send_iq_result(JID, IQ),
send_mam_messages(JID, QueryID, Msgs), send_mam_messages(JID, QueryID, Msgs),
send_fin_message(JID, QueryID, Result), send_fin_message(JID, QueryID, Result),
@ -926,50 +924,31 @@ send_fin_message(#jid{lserver = Host} = JID, QueryID,
%% Query MAM archive. %% Query MAM archive.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec query_archive(mam_query(), db_type()) -> mam_result() | {error, _}. -spec query_archive(mam_query()) -> mam_result() | {error, _}.
query_archive(Query, mnesia) -> query_archive(#mam_query{mam_jid = {U, S}, max = Max} = Query) ->
mnesia:sync_dirty(fun() -> collect_messages(Query, mnesia) end),
-spec read_meta(mam_jid(), db_type()) -> mam_meta().
read_meta(US, mnesia) ->
case mnesia:read(mam_meta, US) of
[M] ->
M;
[] ->
M = #mam_meta{mam_jid = US, mam_type = user},
mnesia:write(M), % Initialize MAM for this user.
M
end.
-spec collect_messages(mam_query(), db_type())
-> mam_result() | {error, _}.
collect_messages(#mam_query{mam_jid = {U, S}, max = Max} = Query, DBType) ->
case check_request(Query) of case check_request(Query) of
ok -> ok ->
DBType = gen_mod:db_type(S, ?MODULE),
Meta = read_meta({U, S}, DBType), Meta = read_meta({U, S}, DBType),
StartID = get_start_id(Query, Meta), StartID = get_start_id(Query, Meta),
collect_messages(Query, #mam_query_state{current = StartID, query_archive(Query, #mam_query_state{current = StartID,
n_remaining = Max}, n_remaining = Max},
Meta, DBType); Meta, DBType);
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
end. end.
-spec collect_messages(mam_query(), mam_query_state(), mam_meta(), db_type()) -spec query_archive(mam_query(), mam_query_state(), mam_meta(), db_type())
-> mam_result(). -> mam_result().
collect_messages(Query, query_archive(Query,
#mam_query_state{n_remaining = Remaining, #mam_query_state{n_remaining = ToDo, current = ID} = QueryState,
current = ID} = QueryState, #mam_meta{first_id = FirstID, last_id = LastID} = Meta, DBType)
#mam_meta{first_id = FirstID, when ToDo =:= 0;
last_id = LastID} = Meta, ID =:= undefined;
DBType) when Remaining =:= 0; ID < FirstID;
ID =:= undefined; ID > LastID -> % We're done!
ID < FirstID;
ID > LastID -> % We're done!
#mam_result{messages = resulting_messages(Query, QueryState), #mam_result{messages = resulting_messages(Query, QueryState),
count = resulting_count(Query, Meta), count = resulting_count(Query, Meta),
index = resulting_index(Query, QueryState, Meta), index = resulting_index(Query, QueryState, Meta),
@ -977,12 +956,11 @@ collect_messages(Query,
last = resulting_last(Query, QueryState), last = resulting_last(Query, QueryState),
is_complete = result_is_complete(Query, QueryState, Meta, is_complete = result_is_complete(Query, QueryState, Meta,
DBType)}; DBType)};
collect_messages(#mam_query{mam_jid = {U, S}, query_archive(#mam_query{mam_jid = {U, S}, filter = Filter} = Query,
filter = Filter} = Query, #mam_query_state{messages = Msgs,
#mam_query_state{messages = Msgs, current = ID,
current = ID, n_remaining = N} = QueryState,
n_remaining = N} = QueryState, Meta, DBType) ->
Meta, DBType) ->
case read_message({{U, S}, ID}, Filter, DBType) of case read_message({{U, S}, ID}, Filter, DBType) of
#mam_msg{} = Msg -> #mam_msg{} = Msg ->
NewQueryState = NewQueryState =
@ -995,33 +973,45 @@ collect_messages(#mam_query{mam_jid = {U, S},
QueryState#mam_query_state{last = ID, QueryState#mam_query_state{last = ID,
messages = [Msg | Msgs]} messages = [Msg | Msgs]}
end, end,
collect_next(Query, NewQueryState, Meta, N - 1, DBType); query_next(Query, NewQueryState, Meta, N - 1, DBType);
filtered -> filtered ->
collect_next(Query, QueryState, Meta, N, DBType); query_next(Query, QueryState, Meta, N, DBType);
not_found -> not_found ->
?DEBUG("MAM message ~B of ~s@~s not found", [ID, U, S]), ?DEBUG("MAM message ~B of ~s@~s not found", [ID, U, S]),
collect_next(Query, QueryState, Meta, N - 1, DBType) query_next(Query, QueryState, Meta, N - 1, DBType)
end. end.
-spec collect_next(mam_query(), mam_query_state(), mam_meta(), -spec query_next(mam_query(), mam_query_state(), mam_meta(),
non_neg_integer(), db_type()) -> mam_result(). non_neg_integer(), db_type()) -> mam_result().
collect_next(#mam_query{direction = before} = Query, query_next(#mam_query{direction = before} = Query,
#mam_query_state{current = ID} = QueryState, Meta, N, DBType) -> #mam_query_state{current = ID} = QueryState, Meta, N, DBType) ->
collect_messages(Query, QueryState#mam_query_state{current = ID - 1, query_archive(Query, QueryState#mam_query_state{current = ID - 1,
n_remaining = N}, n_remaining = N},
Meta, DBType); Meta, DBType);
collect_next(#mam_query{direction = aft} = Query, query_next(#mam_query{direction = aft} = Query,
#mam_query_state{current = ID} = QueryState, Meta, N, DBType) -> #mam_query_state{current = ID} = QueryState, Meta, N, DBType) ->
collect_messages(Query, QueryState#mam_query_state{current = ID + 1, query_archive(Query, QueryState#mam_query_state{current = ID + 1,
n_remaining = N}, n_remaining = N},
Meta, DBType). Meta, DBType).
-spec read_meta(mam_jid(), db_type()) -> mam_meta().
read_meta(US, mnesia) ->
case mnesia:dirty_read(mam_meta, US) of
[M] ->
M;
[] ->
M = #mam_meta{mam_jid = US, mam_type = user},
mnesia:dirty_write(M), % Initialize MAM for this user.
M
end.
-spec read_message(mam_msg_key(), mam_filter(), db_type()) -spec read_message(mam_msg_key(), mam_filter(), db_type())
-> mam_msg() | filtered | not_found. -> mam_msg() | filtered | not_found.
read_message(Key, Filter, mnesia) -> read_message(Key, Filter, mnesia) ->
ReadMsg = fun() -> mnesia:read(mam_msg, Key) end, ReadMsg = fun() -> mnesia:dirty_read(mam_msg, Key) end,
case mnesia:activity(sync_dirty, ReadMsg, [], mnesia_frag) of case mnesia:activity(sync_dirty, ReadMsg, [], mnesia_frag) of
[#mam_msg{} = Msg] -> [#mam_msg{} = Msg] ->
case filter_message(Msg, Filter) of case filter_message(Msg, Filter) of
@ -1139,7 +1129,7 @@ another_message_exists(#mam_query{mam_jid = {U, S},
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Extract collect_messages/4 results. %% Extract query_archive/4 results.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec resulting_messages(mam_query(), mam_query_state()) -> [mam_msg()]. -spec resulting_messages(mam_query(), mam_query_state()) -> [mam_msg()].