diff --git a/mod_mam/src/mod_mam.erl b/mod_mam/src/mod_mam.erl index b946a59..3ac35bc 100644 --- a/mod_mam/src/mod_mam.erl +++ b/mod_mam/src/mod_mam.erl @@ -956,12 +956,14 @@ query_archive(Query, last = resulting_last(Query, QueryState), is_complete = result_is_complete(Query, QueryState, Meta, DBType)}; -query_archive(#mam_query{mam_jid = {U, S}, filter = Filter} = Query, +query_archive(#mam_query{mam_jid = {U, S}, + direction = Direction, + filter = Filter} = Query, #mam_query_state{messages = Msgs, current = ID, n_remaining = N} = QueryState, Meta, DBType) -> - case read_message({{U, S}, ID}, Filter, DBType) of + case read_message({{U, S}, ID}, Filter, Direction, DBType) of #mam_msg{} = Msg -> NewQueryState = case QueryState of @@ -974,8 +976,10 @@ query_archive(#mam_query{mam_jid = {U, S}, filter = Filter} = Query, messages = [Msg | Msgs]} end, query_next(Query, NewQueryState, Meta, N - 1, DBType); - filtered -> + drop -> query_next(Query, QueryState, Meta, N, DBType); + stop -> + query_next(Query, QueryState, Meta, 0, DBType); not_found -> ?DEBUG("MAM message ~B of ~s@~s not found", [ID, U, S]), query_next(Query, QueryState, Meta, N - 1, DBType) @@ -1007,66 +1011,89 @@ read_meta(US, mnesia) -> M end. --spec read_message(mam_msg_key(), mam_filter(), db_type()) - -> mam_msg() | filtered | not_found. +-spec read_message(mam_msg_key(), mam_filter(), direction(), db_type()) + -> mam_msg() | drop | stop | not_found. -read_message(Key, Filter, mnesia) -> +read_message(Key, Filter, Direction, mnesia) -> ReadMsg = fun() -> mnesia:read(mam_msg, Key) end, case mnesia:activity(sync_dirty, ReadMsg, [], mnesia_frag) of [#mam_msg{} = Msg] -> - case filter_message(Msg, Filter) of + case filter_message(Msg, Filter, Direction) of pass -> ?DEBUG("Message ~p passes filter ~p", [Msg, Filter]), Msg; - drop -> - ?DEBUG("Message ~p dropped by filter ~p", [Msg, Filter]), - filtered + DropOrStop -> + ?DEBUG("Message ~p filtered: ~s (~p)", [Msg, DropOrStop, + Filter]), + DropOrStop end; [] -> not_found end. --spec filter_message(mam_msg(), mam_filter()) -> pass | drop. +-spec filter_message(mam_msg(), mam_filter(), direction()) + -> pass | drop | stop. -filter_message(_Msg, Filter) when Filter =:= #mam_filter{} -> pass; -filter_message(Msg, Filter) -> +filter_message(_Msg, Filter, _Direction) when Filter =:= #mam_filter{} -> pass; +filter_message(Msg, Filter, Direction) -> lists:foldl(fun(FilterType, pass) -> - filter_message(FilterType, Msg, Filter); - (_FilterType, drop) -> - drop + filter_message(FilterType, Msg, Filter, Direction); + (FilterType, drop) -> + case filter_message(FilterType, Msg, Filter, Direction) + of + pass -> + drop; + DropOrStop -> + DropOrStop + end; + (_FilterType, stop) -> + stop end, pass, [start, fin, with]). --spec filter_message(mam_filter_type(), mam_msg(), mam_filter()) -> pass | drop. +-spec filter_message(mam_filter_type(), mam_msg(), mam_filter(), direction()) + -> pass | drop | stop. filter_message(start, _Msg, - #mam_filter{start = undefined}) -> + #mam_filter{start = undefined}, + _Direction) -> pass; filter_message(start, #mam_msg{time = Time}, - #mam_filter{start = Start}) when Time >= Start -> + #mam_filter{start = Start}, + _Direction) when Time >= Start -> pass; -filter_message(start, _Msg, _Filter) -> +filter_message(start, _Msg, _Filter, before) -> + stop; +filter_message(start, _Msg, _Filter, aft) -> drop; filter_message(fin, _Msg, - #mam_filter{fin = undefined}) -> + #mam_filter{fin = undefined}, + _Direction) -> pass; filter_message(fin, #mam_msg{time = Time}, - #mam_filter{fin = End}) when Time =< End -> + #mam_filter{fin = End}, + _Direction) when Time =< End -> pass; -filter_message(fin, _Msg, _Filter) -> +filter_message(fin, _Msg, _Filter, aft) -> + stop; +filter_message(fin, _Msg, _Filter, before) -> drop; filter_message(with, _Msg, - #mam_filter{with = undefined}) -> + #mam_filter{with = undefined}, + _Direction) -> pass; -filter_message(with, Msg, #mam_filter{with = {_U, _S, <<"">>}} = Filter) -> +filter_message(with, + Msg, + #mam_filter{with = {_U, _S, <<"">>}} = Filter, + _Direction) -> filter_message_with(bare, Msg, Filter); -filter_message(with, Msg, Filter) -> +filter_message(with, Msg, Filter, _Direction) -> filter_message_with(full, Msg, Filter). -spec filter_message_with(bare | full, mam_msg(), mam_filter()) -> pass | drop. @@ -1110,14 +1137,17 @@ filter_message_with(full, _Msg, _Filter) -> another_message_exists(#mam_query{mam_jid = {U, S}, direction = Direction, filter = Filter } = Query, ID, DBType) -> - case read_message({{U, S}, ID}, Filter, DBType) of + case read_message({{U, S}, ID}, Filter, Direction, DBType) of #mam_msg{} -> ?DEBUG("Found another message for ~s@~s: ~B", [U, S, ID]), true; not_found -> ?DEBUG("Found no other message for ~s@~s: ~B", [U, S, ID]), false; - filtered -> + stop -> + ?DEBUG("Found no other unfiltered message for ~s@~s: ~B", [U, S, ID]), + false; + drop -> NextID = case Direction of before ->