From bd9c4cb26cfd8ca009488f5371403c58250f6ed3 Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Wed, 7 Nov 2018 18:52:50 -0300 Subject: [PATCH] pubsub.lazy_guild: add implementation for pres_update - utils: fix index_by_func --- litecord/gateway/websocket.py | 6 +- litecord/presence.py | 8 +- litecord/pubsub/lazy_guild.py | 165 +++++++++++++++++++++++++++------- litecord/utils.py | 2 +- 4 files changed, 140 insertions(+), 41 deletions(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 6226fe6..8001a28 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -710,7 +710,7 @@ class GatewayWebsocket: list_op = 'SYNC' | 'INVALIDATE' | 'INSERT' | 'UPDATE' | 'DELETE' list_data = { - 'id': "everyone" // ?? + 'id': channel_id | 'everyone', 'guild_id': guild_id, 'ops': [ @@ -723,10 +723,10 @@ class GatewayWebsocket: // exists if op = 'SYNC' 'items': sync_item[], - // exists if op = 'INSERT' or 'DELETE' + // exists if op == 'INSERT' | 'DELETE' | 'UPDATE' 'index': num, - // exists if op = 'INSERT' + // exists if op == 'INSERT' | 'UPDATE' 'item': sync_item, } ], diff --git a/litecord/presence.py b/litecord/presence.py index 48ad80f..67f8edd 100644 --- a/litecord/presence.py +++ b/litecord/presence.py @@ -113,9 +113,11 @@ class PresenceManager: for member_list in lists: session_ids = await member_list.pres_update( int(member['user']['id']), - member['roles'], - state['status'], - game + { + 'roles': member['roles'], + 'status': state['status'], + 'game': game + } ) log.debug('Lazy Dispatch to {}', diff --git a/litecord/pubsub/lazy_guild.py b/litecord/pubsub/lazy_guild.py index 8063383..6467d1c 100644 --- a/litecord/pubsub/lazy_guild.py +++ b/litecord/pubsub/lazy_guild.py @@ -48,10 +48,40 @@ class MemberList: Yields a tuple containing :class:`GroupInfo` and the List[Presence] for the group. """ + if not self.groups: + return + for group in self.groups: yield group, self.data[group.gid] +@dataclass +class Operation: + """Represents a member list operation.""" + list_op: str + params: Dict[str, Any] + + @property + def to_dict(self) -> dict: + res = { + 'op': self.list_op + } + + if self.list_op == 'SYNC': + res['items'] = self.params['items'] + + if self.list_op in ('SYNC', 'INVALIDATE'): + res['range'] = self.params['range'] + + if self.list_op in ('INSERT', 'DELETE', 'UPDATE'): + res['index'] = self.params['index'] + + if self.list_op in ('INSERT', 'UPDATE'): + res['item'] = self.params['item'] + + return res + + def _to_simple_group(presence: dict) -> str: """Return a simple group (not a role), given a presence.""" return 'offline' if presence['status'] == 'offline' else 'online' @@ -111,6 +141,13 @@ class GuildMemberList: """Get the global :class:`StateManager` instance.""" return self.main.app.state_manager + @property + def list_id(self): + """get the id of the member list.""" + return ('everyone' + if self.channel_id == self.guild_id + else str(self.channel_id)) + def _set_empty_list(self): """Set the member list as being empty.""" self.list = MemberList(None, None, None, None) @@ -311,7 +348,7 @@ class GuildMemberList: """Subscribe a shard to the member list.""" await self._init_check() - async def unsub(self, session_id: str): + def unsub(self, session_id: str): """Unsubscribe a shard from the member list""" try: self.state.pop(session_id) @@ -326,6 +363,41 @@ class GuildMemberList: if not self.state: self._set_empty_list() + def get_state(self, session_id: str): + state = self.state_man.fetch_raw(session_id) + + if not state: + self.unsub(session_id) + return + + return state + + async def _dispatch_sess(self, session_ids: List[str], + operations: List[Operation]): + + # construct the payload to dispatch + payload = { + 'id': self.list_id, + 'guild_id': str(self.guild_id), + + 'groups': [ + { + 'count': len(presences), + 'id': group.gid + } for group, presences in self.list + ], + + 'ops': [ + operation.to_dict + for operation in operations + ] + } + + states = map(self.get_state, session_ids) + for state in (s for s in states if s is not None): + await state.ws.dispatch( + 'GUILD_MEMBER_LIST_UPDATE', payload) + async def shard_query(self, session_id: str, ranges: list): """Send a GUILD_MEMBER_LIST_UPDATE event for a shard that is querying about the member list. @@ -342,9 +414,7 @@ class GuildMemberList: # a guild list with a channel id of the guild # represents the 'everyone' global list. - list_id = ('everyone' - if self.channel_id == self.guild_id - else str(self.channel_id)) + list_id = self.list_id # if everyone can read the channel, # we direct the request to the 'everyone' gml instance @@ -365,25 +435,7 @@ class GuildMemberList: await self._init_check() - # make sure this is a sane state - state = self.state_man.fetch_raw(session_id) - if not state: - await self.unsub(session_id) - return - - reply = { - 'guild_id': str(self.guild_id), - 'id': list_id, - - 'groups': [ - { - 'count': len(presences), - 'id': group.gid - } for group, presences in self.list - ], - - 'ops': [], - } + ops = [] for start, end in ranges: itemcount = end - start @@ -394,19 +446,64 @@ class GuildMemberList: self.state[session_id].add((start, end)) - reply['ops'].append({ - 'op': 'SYNC', + ops.append(Operation('SYNC', { 'range': [start, end], - 'items': self.items[start:end], - }) + 'items': self.items[start:end] + })) - # the first GUILD_MEMBER_LIST_UPDATE for a shard - # is dispatched here. - await state.ws.dispatch('GUILD_MEMBER_LIST_UPDATE', reply) + await self._dispatch_sess([session_id], ops) - async def pres_update(self, user_id: int, roles: List[str], - status: str, game: dict) -> List[str]: - return list(self.state) + async def pres_update(self, user_id: int, + partial_presence: Dict[str, Any]): + """Update a presence inside the member listlist.""" + await self._init_check() + + for _group, presences in self.list: + p_idx = index_by_func( + lambda p: p['user']['id'] == str(user_id), + presences) + + if not p_idx: + continue + + presences[p_idx].update(partial_presence) + + item_index = index_by_func( + lambda p: p.get('user', {}).get('id') == str(user_id), + self.items + ) + + pprint.pprint(self.items) + + if not item_index: + log.warning('lazy guild got invalid pres update uid={}', + user_id) + return [] + + item = self.items[item_index] + + def _is_in(sess_id): + ranges = self.state[sess_id] + + for range_start, range_end in ranges: + if range_start <= item_index <= range_end: + return True + + return False + + session_ids = filter(_is_in, self.state.keys()) + + await self._dispatch_sess( + session_ids, + [ + Operation('UPDATE', { + 'index': item_index, + 'item': item, + }) + ] + ) + + return list(session_ids) async def dispatch(self, event: str, data: Any): """Modify the member list and dispatch the respective @@ -473,4 +570,4 @@ class LazyGuildDispatcher(Dispatcher): async def unsub(self, chan_id, session_id): gml = await self.get_gml(chan_id) - await gml.unsub(session_id) + gml.unsub(session_id) diff --git a/litecord/utils.py b/litecord/utils.py index 1a2b676..2fda9d5 100644 --- a/litecord/utils.py +++ b/litecord/utils.py @@ -32,7 +32,7 @@ def dict_get(mapping, key, default): def index_by_func(function, indexable: iter) -> int: """Search in an idexable and return the index number for an iterm that has func(item) = True.""" - for index, item in indexable: + for index, item in enumerate(indexable): if function(item): return index