mirror of https://gitlab.com/litecord/litecord.git
gateway: docstrings!
also subjective readability improvements
This commit is contained in:
parent
58834f88af
commit
3b85556d89
|
|
@ -22,9 +22,8 @@ from litecord.gateway.websocket import GatewayWebsocket
|
||||||
|
|
||||||
|
|
||||||
async def websocket_handler(app, ws, url):
|
async def websocket_handler(app, ws, url):
|
||||||
"""Main websocket handler, checks query arguments
|
"""Main websocket handler, checks query arguments when connecting to
|
||||||
when connecting to the gateway and spawns a
|
the gateway and spawns a GatewayWebsocket instance for the connection."""
|
||||||
GatewayWebsocket instance for the connection."""
|
|
||||||
args = urllib.parse.parse_qs(
|
args = urllib.parse.parse_qs(
|
||||||
urllib.parse.urlparse(url).query
|
urllib.parse.urlparse(url).query
|
||||||
)
|
)
|
||||||
|
|
@ -54,6 +53,9 @@ async def websocket_handler(app, ws, url):
|
||||||
if gw_compress and gw_compress not in ('zlib-stream',):
|
if gw_compress and gw_compress not in ('zlib-stream',):
|
||||||
return await ws.close(1000, 'Invalid gateway compress')
|
return await ws.close(1000, 'Invalid gateway compress')
|
||||||
|
|
||||||
gws = GatewayWebsocket(ws, app, v=gw_version,
|
gws = GatewayWebsocket(
|
||||||
encoding=gw_encoding, compress=gw_compress)
|
ws, app, v=gw_version, encoding=gw_encoding, compress=gw_compress)
|
||||||
|
|
||||||
|
# this can be run with a single await since this whole coroutine
|
||||||
|
# is already running in the background.
|
||||||
await gws.run()
|
await gws.run()
|
||||||
|
|
|
||||||
|
|
@ -65,30 +65,34 @@ WebsocketObjects = collections.namedtuple(
|
||||||
|
|
||||||
|
|
||||||
def encode_json(payload) -> str:
|
def encode_json(payload) -> str:
|
||||||
|
"""Encode a given payload to JSON."""
|
||||||
return json.dumps(payload, separators=(',', ':'),
|
return json.dumps(payload, separators=(',', ':'),
|
||||||
cls=LitecordJSONEncoder)
|
cls=LitecordJSONEncoder)
|
||||||
|
|
||||||
|
|
||||||
def decode_json(data: str):
|
def decode_json(data: str):
|
||||||
|
"""Decode from JSON."""
|
||||||
return json.loads(data)
|
return json.loads(data)
|
||||||
|
|
||||||
|
|
||||||
def encode_etf(payload) -> str:
|
def encode_etf(payload) -> str:
|
||||||
# The thing with encoding ETF is that with json we have LitecordJSONEncoder
|
"""Encode a payload to ETF (External Term Format).
|
||||||
# which takes care of converting e.g datetime objects to their ISO
|
|
||||||
# representation.
|
|
||||||
|
|
||||||
# so, to keep things working, i'll to a json pass on the payload, then send
|
This gives a JSON pass on the given payload (via calling encode_json and
|
||||||
# the decoded payload back to earl.
|
then decode_json) because we may want to encode objects that can only be
|
||||||
|
encoded by LitecordJSONEncoder.
|
||||||
|
|
||||||
|
Earl-ETF does not give the same interface for extensibility, hence why we
|
||||||
|
do the pass.
|
||||||
|
"""
|
||||||
sanitized = encode_json(payload)
|
sanitized = encode_json(payload)
|
||||||
sanitized = decode_json(sanitized)
|
sanitized = decode_json(sanitized)
|
||||||
return earl.pack(sanitized)
|
return earl.pack(sanitized)
|
||||||
|
|
||||||
|
|
||||||
def _etf_decode_dict(data):
|
def _etf_decode_dict(data):
|
||||||
# NOTE: this is a very slow implementation to
|
"""Decode a given dictionary."""
|
||||||
# decode the dictionary.
|
# NOTE: this is very slow.
|
||||||
|
|
||||||
if isinstance(data, bytes):
|
if isinstance(data, bytes):
|
||||||
return data.decode()
|
return data.decode()
|
||||||
|
|
@ -109,6 +113,7 @@ def _etf_decode_dict(data):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def decode_etf(data: bytes):
|
def decode_etf(data: bytes):
|
||||||
|
"""Decode data in ETF to any."""
|
||||||
res = earl.unpack(data)
|
res = earl.unpack(data)
|
||||||
|
|
||||||
if isinstance(res, bytes):
|
if isinstance(res, bytes):
|
||||||
|
|
@ -269,7 +274,7 @@ class GatewayWebsocket:
|
||||||
task_wrapper('hb wait', self._hb_wait(interval))
|
task_wrapper('hb wait', self._hb_wait(interval))
|
||||||
)
|
)
|
||||||
|
|
||||||
async def send_hello(self):
|
async def _send_hello(self):
|
||||||
"""Send the OP 10 Hello packet over the websocket."""
|
"""Send the OP 10 Hello packet over the websocket."""
|
||||||
# random heartbeat intervals
|
# random heartbeat intervals
|
||||||
interval = randint(40, 46) * 1000
|
interval = randint(40, 46) * 1000
|
||||||
|
|
@ -367,7 +372,7 @@ class GatewayWebsocket:
|
||||||
|
|
||||||
'friend_suggestion_count': 0,
|
'friend_suggestion_count': 0,
|
||||||
|
|
||||||
|
# those are unused default values.
|
||||||
'connected_accounts': [],
|
'connected_accounts': [],
|
||||||
'experiments': [],
|
'experiments': [],
|
||||||
'guild_experiments': [],
|
'guild_experiments': [],
|
||||||
|
|
@ -406,6 +411,8 @@ class GatewayWebsocket:
|
||||||
self.ext.loop.create_task(self._guild_dispatch(guilds))
|
self.ext.loop.create_task(self._guild_dispatch(guilds))
|
||||||
|
|
||||||
async def _check_shards(self, shard, user_id):
|
async def _check_shards(self, shard, user_id):
|
||||||
|
"""Check if the given `shard` value in IDENTIFY has good enough values.
|
||||||
|
"""
|
||||||
current_shard, shard_count = shard
|
current_shard, shard_count = shard
|
||||||
|
|
||||||
guilds = await self.ext.db.fetchval("""
|
guilds = await self.ext.db.fetchval("""
|
||||||
|
|
@ -612,10 +619,6 @@ class GatewayWebsocket:
|
||||||
# setting new presence to state
|
# setting new presence to state
|
||||||
await self.update_status(presence)
|
await self.update_status(presence)
|
||||||
|
|
||||||
def voice_key(self, channel_id: int, guild_id: int):
|
|
||||||
"""Voice state key."""
|
|
||||||
return (self.state.user_id, self.state.session_id)
|
|
||||||
|
|
||||||
async def _vsu_get_prop(self, state, data):
|
async def _vsu_get_prop(self, state, data):
|
||||||
"""Get voice state properties from data, fallbacking to
|
"""Get voice state properties from data, fallbacking to
|
||||||
user settings."""
|
user settings."""
|
||||||
|
|
@ -837,6 +840,11 @@ class GatewayWebsocket:
|
||||||
await self._req_guild_members(gid, [], query, limit)
|
await self._req_guild_members(gid, [], query, limit)
|
||||||
|
|
||||||
async def _guild_sync(self, guild_id: int):
|
async def _guild_sync(self, guild_id: int):
|
||||||
|
"""Synchronize a guild.
|
||||||
|
|
||||||
|
Fetches the members and presences of a guild and dispatches a
|
||||||
|
GUILD_SYNC event with that info.
|
||||||
|
"""
|
||||||
members = await self.storage.get_member_data(guild_id)
|
members = await self.storage.get_member_data(guild_id)
|
||||||
member_ids = [int(m['user']['id']) for m in members]
|
member_ids = [int(m['user']['id']) for m in members]
|
||||||
|
|
||||||
|
|
@ -979,7 +987,7 @@ class GatewayWebsocket:
|
||||||
self.state.session_id, ranges
|
self.state.session_id, ranges
|
||||||
)
|
)
|
||||||
|
|
||||||
async def process_message(self, payload):
|
async def _process_message(self, payload):
|
||||||
"""Process a single message coming in from the client."""
|
"""Process a single message coming in from the client."""
|
||||||
try:
|
try:
|
||||||
op_code = payload['op']
|
op_code = payload['op']
|
||||||
|
|
@ -998,7 +1006,7 @@ class GatewayWebsocket:
|
||||||
if self._check_ratelimit('messages', self.state.session_id):
|
if self._check_ratelimit('messages', self.state.session_id):
|
||||||
raise WebsocketClose(4008, 'You are being ratelimited.')
|
raise WebsocketClose(4008, 'You are being ratelimited.')
|
||||||
|
|
||||||
async def listen_messages(self):
|
async def _listen_messages(self):
|
||||||
"""Listen for messages coming in from the websocket."""
|
"""Listen for messages coming in from the websocket."""
|
||||||
|
|
||||||
# close anyone trying to login while the
|
# close anyone trying to login while the
|
||||||
|
|
@ -1018,9 +1026,11 @@ class GatewayWebsocket:
|
||||||
await self._msg_ratelimit()
|
await self._msg_ratelimit()
|
||||||
|
|
||||||
payload = self.decoder(message)
|
payload = self.decoder(message)
|
||||||
await self.process_message(payload)
|
await self._process_message(payload)
|
||||||
|
|
||||||
def _cleanup(self):
|
def _cleanup(self):
|
||||||
|
"""Cleanup any leftover tasks, and remove the connection from the
|
||||||
|
state manager."""
|
||||||
for task in self.wsp.tasks.values():
|
for task in self.wsp.tasks.values():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
|
|
@ -1058,11 +1068,11 @@ class GatewayWebsocket:
|
||||||
)
|
)
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
"""Wrap listen_messages inside
|
"""Wrap :meth:`listen_messages` inside
|
||||||
a try/except block for WebsocketClose handling."""
|
a try/except block for WebsocketClose handling."""
|
||||||
try:
|
try:
|
||||||
await self.send_hello()
|
await self._send_hello()
|
||||||
await self.listen_messages()
|
await self._listen_messages()
|
||||||
except websockets.exceptions.ConnectionClosed as err:
|
except websockets.exceptions.ConnectionClosed as err:
|
||||||
log.warning('conn close, state={}, err={}', self.state, err)
|
log.warning('conn close, state={}, err={}', self.state, err)
|
||||||
except WebsocketClose as err:
|
except WebsocketClose as err:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue