Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send events to ASes concurrently #3088

Merged
merged 1 commit into from
Apr 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)

import logging

Expand Down Expand Up @@ -84,11 +86,16 @@ def notify_interested_services(self, current_id):
if not events:
break

events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)

@defer.inlineCallbacks
def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
continue # no services need notifying
return # no services need notifying

# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
Expand All @@ -108,6 +115,16 @@ def notify_interested_services(self, current_id):
service, event
)

@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)

yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))

events_processed_counter.inc_by(len(events))

yield self.store.set_appservice_last_pos(upper_bound)
Expand Down