Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor sync APIs to reuse pagination API #3199

Merged
merged 7 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,19 @@ def _load_filtered_recents(self, room_id, sync_config, now_token,
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
if since_key:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
end_token=end_key,
)
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
Expand Down
73 changes: 35 additions & 38 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,52 +233,49 @@ def get_rooms_that_changed(self, room_ids, from_key):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
# Note: If from_key is None then we return in topological order. This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be worth preserving this comment, but moving it to sync.py?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it I don't really know why we don't always use the topological ordering, but I've added the comment.

# is because in that case we're using this as a "get the last few messages
# in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
from_id = None
to_id = RoomStreamToken.parse_stream_token(to_key).stream

"""Get new room events in stream ordering since `from_key`.

Args:
room_id (str)
from_key (str): Token from which no events are returned before
to_key (str): Token from which no events are returned after. (This
is typically the current stream token)
limit (int): Maximum number of events to return
order (str): Either "DESC" or "ASC". Determines which events are
returned when the result is limited. If "DESC" then the most
recent `limit` events are returned, otherwise returns the
oldest `limit` events.

Returns:
Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
events (in ascending order) and the token from the start of
the chunk of events returned.
"""
if from_key == to_key:
defer.returnValue(([], from_key))

if from_id:
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream

if not has_changed:
defer.returnValue(([], from_key))
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)

def f(txn):
if from_id is not None:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
else:
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
if not has_changed:
defer.returnValue(([], from_key))

rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
def f(txn):
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows

rows = yield self.runInteraction("get_room_events_stream_for_room", f)
Expand Down