mod_mam: Make filtered lookups more efficient

Try to limit the number of Mnesia queries in the case where the client
specified the desired timespan but no <before/> or <after/> UID.
This commit is contained in:
Holger Weiss 2015-02-17 23:06:05 +01:00
parent 7e9acda1c7
commit 4db159b4fa
1 changed files with 58 additions and 28 deletions

View File

@ -956,12 +956,14 @@ query_archive(Query,
last = resulting_last(Query, QueryState), last = resulting_last(Query, QueryState),
is_complete = result_is_complete(Query, QueryState, Meta, is_complete = result_is_complete(Query, QueryState, Meta,
DBType)}; DBType)};
query_archive(#mam_query{mam_jid = {U, S}, filter = Filter} = Query, query_archive(#mam_query{mam_jid = {U, S},
direction = Direction,
filter = Filter} = Query,
#mam_query_state{messages = Msgs, #mam_query_state{messages = Msgs,
current = ID, current = ID,
n_remaining = N} = QueryState, n_remaining = N} = QueryState,
Meta, DBType) -> Meta, DBType) ->
case read_message({{U, S}, ID}, Filter, DBType) of case read_message({{U, S}, ID}, Filter, Direction, DBType) of
#mam_msg{} = Msg -> #mam_msg{} = Msg ->
NewQueryState = NewQueryState =
case QueryState of case QueryState of
@ -974,8 +976,10 @@ query_archive(#mam_query{mam_jid = {U, S}, filter = Filter} = Query,
messages = [Msg | Msgs]} messages = [Msg | Msgs]}
end, end,
query_next(Query, NewQueryState, Meta, N - 1, DBType); query_next(Query, NewQueryState, Meta, N - 1, DBType);
filtered -> drop ->
query_next(Query, QueryState, Meta, N, DBType); query_next(Query, QueryState, Meta, N, DBType);
stop ->
query_next(Query, QueryState, Meta, 0, DBType);
not_found -> not_found ->
?DEBUG("MAM message ~B of ~s@~s not found", [ID, U, S]), ?DEBUG("MAM message ~B of ~s@~s not found", [ID, U, S]),
query_next(Query, QueryState, Meta, N - 1, DBType) query_next(Query, QueryState, Meta, N - 1, DBType)
@ -1007,66 +1011,89 @@ read_meta(US, mnesia) ->
M M
end. end.
-spec read_message(mam_msg_key(), mam_filter(), db_type()) -spec read_message(mam_msg_key(), mam_filter(), direction(), db_type())
-> mam_msg() | filtered | not_found. -> mam_msg() | drop | stop | not_found.
read_message(Key, Filter, mnesia) -> read_message(Key, Filter, Direction, mnesia) ->
ReadMsg = fun() -> mnesia:read(mam_msg, Key) end, ReadMsg = fun() -> mnesia:read(mam_msg, Key) end,
case mnesia:activity(sync_dirty, ReadMsg, [], mnesia_frag) of case mnesia:activity(sync_dirty, ReadMsg, [], mnesia_frag) of
[#mam_msg{} = Msg] -> [#mam_msg{} = Msg] ->
case filter_message(Msg, Filter) of case filter_message(Msg, Filter, Direction) of
pass -> pass ->
?DEBUG("Message ~p passes filter ~p", [Msg, Filter]), ?DEBUG("Message ~p passes filter ~p", [Msg, Filter]),
Msg; Msg;
drop -> DropOrStop ->
?DEBUG("Message ~p dropped by filter ~p", [Msg, Filter]), ?DEBUG("Message ~p filtered: ~s (~p)", [Msg, DropOrStop,
filtered Filter]),
DropOrStop
end; end;
[] -> not_found [] -> not_found
end. end.
-spec filter_message(mam_msg(), mam_filter()) -> pass | drop. -spec filter_message(mam_msg(), mam_filter(), direction())
-> pass | drop | stop.
filter_message(_Msg, Filter) when Filter =:= #mam_filter{} -> pass; filter_message(_Msg, Filter, _Direction) when Filter =:= #mam_filter{} -> pass;
filter_message(Msg, Filter) -> filter_message(Msg, Filter, Direction) ->
lists:foldl(fun(FilterType, pass) -> lists:foldl(fun(FilterType, pass) ->
filter_message(FilterType, Msg, Filter); filter_message(FilterType, Msg, Filter, Direction);
(_FilterType, drop) -> (FilterType, drop) ->
drop case filter_message(FilterType, Msg, Filter, Direction)
of
pass ->
drop;
DropOrStop ->
DropOrStop
end;
(_FilterType, stop) ->
stop
end, pass, [start, fin, with]). end, pass, [start, fin, with]).
-spec filter_message(mam_filter_type(), mam_msg(), mam_filter()) -> pass | drop. -spec filter_message(mam_filter_type(), mam_msg(), mam_filter(), direction())
-> pass | drop | stop.
filter_message(start, filter_message(start,
_Msg, _Msg,
#mam_filter{start = undefined}) -> #mam_filter{start = undefined},
_Direction) ->
pass; pass;
filter_message(start, filter_message(start,
#mam_msg{time = Time}, #mam_msg{time = Time},
#mam_filter{start = Start}) when Time >= Start -> #mam_filter{start = Start},
_Direction) when Time >= Start ->
pass; pass;
filter_message(start, _Msg, _Filter) -> filter_message(start, _Msg, _Filter, before) ->
stop;
filter_message(start, _Msg, _Filter, aft) ->
drop; drop;
filter_message(fin, filter_message(fin,
_Msg, _Msg,
#mam_filter{fin = undefined}) -> #mam_filter{fin = undefined},
_Direction) ->
pass; pass;
filter_message(fin, filter_message(fin,
#mam_msg{time = Time}, #mam_msg{time = Time},
#mam_filter{fin = End}) when Time =< End -> #mam_filter{fin = End},
_Direction) when Time =< End ->
pass; pass;
filter_message(fin, _Msg, _Filter) -> filter_message(fin, _Msg, _Filter, aft) ->
stop;
filter_message(fin, _Msg, _Filter, before) ->
drop; drop;
filter_message(with, filter_message(with,
_Msg, _Msg,
#mam_filter{with = undefined}) -> #mam_filter{with = undefined},
_Direction) ->
pass; pass;
filter_message(with, Msg, #mam_filter{with = {_U, _S, <<"">>}} = Filter) -> filter_message(with,
Msg,
#mam_filter{with = {_U, _S, <<"">>}} = Filter,
_Direction) ->
filter_message_with(bare, Msg, Filter); filter_message_with(bare, Msg, Filter);
filter_message(with, Msg, Filter) -> filter_message(with, Msg, Filter, _Direction) ->
filter_message_with(full, Msg, Filter). filter_message_with(full, Msg, Filter).
-spec filter_message_with(bare | full, mam_msg(), mam_filter()) -> pass | drop. -spec filter_message_with(bare | full, mam_msg(), mam_filter()) -> pass | drop.
@ -1110,14 +1137,17 @@ filter_message_with(full, _Msg, _Filter) ->
another_message_exists(#mam_query{mam_jid = {U, S}, another_message_exists(#mam_query{mam_jid = {U, S},
direction = Direction, direction = Direction,
filter = Filter } = Query, ID, DBType) -> filter = Filter } = Query, ID, DBType) ->
case read_message({{U, S}, ID}, Filter, DBType) of case read_message({{U, S}, ID}, Filter, Direction, DBType) of
#mam_msg{} -> #mam_msg{} ->
?DEBUG("Found another message for ~s@~s: ~B", [U, S, ID]), ?DEBUG("Found another message for ~s@~s: ~B", [U, S, ID]),
true; true;
not_found -> not_found ->
?DEBUG("Found no other message for ~s@~s: ~B", [U, S, ID]), ?DEBUG("Found no other message for ~s@~s: ~B", [U, S, ID]),
false; false;
filtered -> stop ->
?DEBUG("Found no other unfiltered message for ~s@~s: ~B", [U, S, ID]),
false;
drop ->
NextID = NextID =
case Direction of case Direction of
before -> before ->