From 720417d14d2781579433ef4f32c393b74e640828 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 4 Apr 2020 16:12:03 -0300 Subject: [PATCH 1/8] Add basic functions for scheduled state removal --- litecord/gateway/state_manager.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/litecord/gateway/state_manager.py b/litecord/gateway/state_manager.py index 6b92eb9..ce600fb 100644 --- a/litecord/gateway/state_manager.py +++ b/litecord/gateway/state_manager.py @@ -96,6 +96,8 @@ class StateManager: #: raw mapping from session ids to GatewayState self.states_raw = StateDictWrapper(self, {}) + self.tasks = {} + def insert(self, state: GatewayState): """Insert a new state object.""" user_states = self.states[state.user_id] @@ -239,3 +241,18 @@ class StateManager: # DMs and GDMs use all user states return self.user_states(user_id) + + async def _future_cleanup(self, state: GatewayState): + await asyncio.sleep(30) + self.remove(state) + + async def schedule_deletion(self, state: GatewayState): + task = app.loop.create_task(self._future_cleanup(state)) + self.tasks[state.session_id] = task + + async def unschedule_deletion(self, state: GatewayState): + try: + task = self.tasks.pop(state.session_id) + task.cancel() + except KeyError: + pass From 24ff453c6ba3b9cffa8de76da8e91d0aef2770b2 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 4 Apr 2020 22:50:00 -0300 Subject: [PATCH 2/8] Plug state resume into deletion unschedule --- litecord/gateway/state_manager.py | 7 +++++-- litecord/gateway/websocket.py | 8 +++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/litecord/gateway/state_manager.py b/litecord/gateway/state_manager.py index ce600fb..eb2e0fb 100644 --- a/litecord/gateway/state_manager.py +++ b/litecord/gateway/state_manager.py @@ -245,6 +245,8 @@ class StateManager: async def _future_cleanup(self, state: GatewayState): await asyncio.sleep(30) self.remove(state) + state.ws.state = None + state.ws = None async def schedule_deletion(self, state: GatewayState): task = app.loop.create_task(self._future_cleanup(state)) @@ -253,6 +255,7 @@ class StateManager: async def unschedule_deletion(self, state: GatewayState): try: task = self.tasks.pop(state.session_id) - task.cancel() except KeyError: - pass + return + + task.cancel() diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index a98752f..2a47fdc 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -824,6 +824,7 @@ class GatewayWebsocket: return await self.invalidate_session(False) # relink this connection + await self.app.state_manager.unschedule_deletion(state) self.state = state state.ws = self @@ -1085,9 +1086,10 @@ class GatewayWebsocket: task.cancel() if self.state: - self.app.state_manager.remove(self.state) - self.state.ws = None - self.state = None + self.app.state_manager.schedule_deletion(self.state) + # self.app.state_manager.remove(self.state) + # self.state.ws = None + # self.state = None async def _check_conns(self, user_id): """Check if there are any existing connections. From 1cbc4541a8bbccc1b139e39b8eaac68514270d98 Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 01:27:29 -0300 Subject: [PATCH 3/8] Handle connection closed when sending RECONNECT --- litecord/gateway/state_manager.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/litecord/gateway/state_manager.py b/litecord/gateway/state_manager.py index eb2e0fb..9239159 100644 --- a/litecord/gateway/state_manager.py +++ b/litecord/gateway/state_manager.py @@ -190,14 +190,14 @@ class StateManager: """Send OP Reconnect to a single connection.""" websocket = state.ws - await websocket.send({"op": OP.RECONNECT}) - - # wait 200ms - # so that the client has time to process - # our payload then close the connection - await asyncio.sleep(0.2) - try: + await websocket.send({"op": OP.RECONNECT}) + + # wait 200ms + # so that the client has time to process + # our payload then close the connection + await asyncio.sleep(0.2) + # try to close the connection ourselves await websocket.ws.close(code=4000, reason="litecord shutting down") except ConnectionClosed: From 6a45da01b8383966d3ea99024e1dfd97069f8426 Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 14:41:49 -0300 Subject: [PATCH 4/8] Handle conn closed when dispatching events --- litecord/gateway/websocket.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 2a47fdc..8f36f26 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -267,9 +267,15 @@ class GatewayWebsocket: log.debug("sending payload {!r} sid {}", event.upper(), self.state.session_id) - await self.send(payload) + try: + await self.send(payload) + except websockets.exceptions.ConnectionClosed: + log.warning( + "Failed to dispatch {!r} to {}", event.upper, self.state.session_id + ) async def _make_guild_list(self) -> List[Dict[str, Any]]: + assert self.state is not None user_id = self.state.user_id guild_ids = await self._guild_ids() From 5b0f4f01d87c4b91f0d86553440c08c52f8f889b Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 14:46:30 -0300 Subject: [PATCH 5/8] Remove double-relationship between state and WS --- litecord/gateway/websocket.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 8f36f26..6ef17b7 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -1092,10 +1092,9 @@ class GatewayWebsocket: task.cancel() if self.state: + self.state.ws = None self.app.state_manager.schedule_deletion(self.state) - # self.app.state_manager.remove(self.state) - # self.state.ws = None - # self.state = None + self.state = None async def _check_conns(self, user_id): """Check if there are any existing connections. From 8e6bbdbe19f956286c83304e5358619fa6aa899f Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 14:46:39 -0300 Subject: [PATCH 6/8] Add test for session resumption --- tests/test_websocket.py | 51 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 6402f21..7ac678f 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -176,3 +176,54 @@ async def test_etf(test_cli): assert hello["op"] == OP.HELLO finally: await _close(conn) + + +@pytest.mark.asyncio +async def test_resume(test_cli_user): + conn = await gw_start(test_cli_user.cli) + + # get the hello frame but ignore it + await _json(conn) + + await _json_send( + conn, {"op": OP.IDENTIFY, "d": {"token": test_cli_user.user["token"]}} + ) + + try: + ready = await _json(conn) + assert isinstance(ready, dict) + assert ready["op"] == OP.DISPATCH + assert ready["t"] == "READY" + + data = ready["d"] + assert isinstance(data, dict) + + assert isinstance(data["session_id"], str) + sess_id: str = data["session_id"] + finally: + await _close(conn) + + # try to resume + conn = await gw_start(test_cli_user.cli) + + # get the hello frame but ignore it + await _json(conn) + + await _json_send( + conn, + { + "op": OP.RESUME, + "d": { + "token": test_cli_user.user["token"], + "session_id": sess_id, + "seq": 0, + }, + }, + ) + + msg = await _json(conn) + assert isinstance(msg, dict) + assert isinstance(msg["op"], int) + assert msg["op"] == OP.DISPATCH + assert isinstance(msg["t"], str) + assert msg["t"] in ("RESUMED", "PRESENCE_REPLACE") From 7b6b69671726b6448491286b9162ddb6031883b1 Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 15:08:18 -0300 Subject: [PATCH 7/8] Change remove() declaration Allow Session IDs to be passed, instead of full state objects. --- litecord/common/users.py | 2 +- litecord/gateway/state_manager.py | 21 ++++++++++----------- litecord/gateway/websocket.py | 5 +++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/litecord/common/users.py b/litecord/common/users.py index fa9b825..1e5bdf4 100644 --- a/litecord/common/users.py +++ b/litecord/common/users.py @@ -268,7 +268,7 @@ async def user_disconnect(user_id: int): for state in user_states: # make it unable to resume - app.state_manager.remove(state) + app.state_manager.remove(state.session_id, user_id=user_id) if not state.ws: continue diff --git a/litecord/gateway/state_manager.py b/litecord/gateway/state_manager.py index 9239159..08a4114 100644 --- a/litecord/gateway/state_manager.py +++ b/litecord/gateway/state_manager.py @@ -19,7 +19,7 @@ along with this program. If not, see . import asyncio -from typing import List +from typing import List, Optional from collections import defaultdict from quart import current_app as app @@ -121,21 +121,20 @@ class StateManager: """Fetch a single state given the Session ID.""" return self.states_raw[session_id] - def remove(self, state): + def remove(self, session_id: str, *, user_id: Optional[int] = None): """Remove a state from the registry""" - if not state: - return - try: - self.states_raw.pop(state.session_id) + state = self.states_raw.pop(session_id) + user_id = state.user_id except KeyError: pass - try: - log.debug("removing state: {!r}", state) - self.states[state.user_id].pop(state.session_id) - except KeyError: - pass + if user_id is not None: + try: + log.debug("removing state: {!r}", state) + self.states[state.user_id].pop(session_id) + except KeyError: + pass def fetch_states(self, user_id: int, guild_id: int) -> List[GatewayState]: """Fetch all states that are tied to a guild.""" diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 6ef17b7..fffc37f 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -770,10 +770,11 @@ class GatewayWebsocket: # since the state will be removed from # the manager, it will become unreachable # when trying to resume. - self.app.state_manager.remove(self.state) + self.app.state_manager.remove(self.state.user_id) async def _resume(self, replay_seqs: Iterable): - presences = [] + assert self.state is not None + presences: List[dict] = [] try: for seq in replay_seqs: From 323013cb8773118c82061da3cc7b5ec553b77b84 Mon Sep 17 00:00:00 2001 From: Luna Date: Sun, 5 Apr 2020 15:08:46 -0300 Subject: [PATCH 8/8] Add test to ensure a removed state invalidates WS --- tests/test_websocket.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 7ac678f..ed49838 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -205,9 +205,7 @@ async def test_resume(test_cli_user): # try to resume conn = await gw_start(test_cli_user.cli) - - # get the hello frame but ignore it - await _json(conn) + _ = await _json(conn) await _json_send( conn, @@ -227,3 +225,30 @@ async def test_resume(test_cli_user): assert msg["op"] == OP.DISPATCH assert isinstance(msg["t"], str) assert msg["t"] in ("RESUMED", "PRESENCE_REPLACE") + + # close again, and retry again, but this time by removing the state + # and asserting the session won't be resumed. + await _close(conn) + + conn = await gw_start(test_cli_user.cli) + _ = await _json(conn) + + async with test_cli_user.app.app_context(): + test_cli_user.app.state_manager.remove(sess_id) + + await _json_send( + conn, + { + "op": OP.RESUME, + "d": { + "token": test_cli_user.user["token"], + "session_id": sess_id, + "seq": 0, + }, + }, + ) + + msg = await _json(conn) + assert isinstance(msg, dict) + assert isinstance(msg["op"], int) + assert msg["op"] == OP.INVALID_SESSION