Skip to content
Snippets Groups Projects
Unverified Commit af187805 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Merge pull request #5809 from matrix-org/erikj/handle_pusher_stop

Handle pusher being deleted during processing.
parents 0b6fbb28 96bdd661
No related branches found
No related tags found
No related merge requests found
Handle pusher being deleted during processing rather than logging an exception.
...@@ -234,6 +234,7 @@ class EmailPusher(object): ...@@ -234,6 +234,7 @@ class EmailPusher(object):
return return
self.last_stream_ordering = last_stream_ordering self.last_stream_ordering = last_stream_ordering
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success( yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.app_id,
self.email, self.email,
...@@ -241,6 +242,11 @@ class EmailPusher(object): ...@@ -241,6 +242,11 @@ class EmailPusher(object):
last_stream_ordering, last_stream_ordering,
self.clock.time_msec(), self.clock.time_msec(),
) )
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
def seconds_until(self, ts_msec): def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000 secs = (ts_msec - self.clock.time_msec()) / 1000
......
...@@ -199,6 +199,7 @@ class HttpPusher(object): ...@@ -199,6 +199,7 @@ class HttpPusher(object):
http_push_processed_counter.inc() http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"] self.last_stream_ordering = push_action["stream_ordering"]
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success( yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
...@@ -206,6 +207,13 @@ class HttpPusher(object): ...@@ -206,6 +207,13 @@ class HttpPusher(object):
self.last_stream_ordering, self.last_stream_ordering,
self.clock.time_msec(), self.clock.time_msec(),
) )
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
if self.failing_since: if self.failing_since:
self.failing_since = None self.failing_since = None
yield self.store.update_pusher_failing_since( yield self.store.update_pusher_failing_since(
...@@ -234,12 +242,17 @@ class HttpPusher(object): ...@@ -234,12 +242,17 @@ class HttpPusher(object):
) )
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"] self.last_stream_ordering = push_action["stream_ordering"]
yield self.store.update_pusher_last_stream_ordering( pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_id, self.user_id,
self.last_stream_ordering, self.last_stream_ordering,
) )
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
self.failing_since = None self.failing_since = None
yield self.store.update_pusher_failing_since( yield self.store.update_pusher_failing_since(
......
...@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore): ...@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
def update_pusher_last_stream_ordering_and_success( def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success self, app_id, pushkey, user_id, last_stream_ordering, last_success
): ):
yield self._simple_update_one( """Update the last stream ordering position we've processed up to for
"pushers", the given pusher.
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{ Args:
app_id (str)
pushkey (str)
last_stream_ordering (int)
last_success (int)
Returns:
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
"""
updated = yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={
"last_stream_ordering": last_stream_ordering, "last_stream_ordering": last_stream_ordering,
"last_success": last_success, "last_success": last_success,
}, },
desc="update_pusher_last_stream_ordering_and_success", desc="update_pusher_last_stream_ordering_and_success",
) )
return bool(updated)
@defer.inlineCallbacks @defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one( yield self._simple_update(
"pushers", table="pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{"failing_since": failing_since}, updatevalues={"failing_since": failing_since},
desc="update_pusher_failing_since", desc="update_pusher_failing_since",
) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment