From fad8484a749f819989781745ac2b7e7c81af5eaf Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Mon, 12 Nov 2018 02:02:56 -0300 Subject: [PATCH] pubsub.lazy_guild: use SYNC instead of DELETE->INSERT->DELETE This makes our implementation go off-spec (depends on what you'd even call a spec, considering lazy guilds are not documented and probably won't be), but well... *it works* There are also a lot of print-debugs on this commit, will clean them up. - pubsub.lazy_guild: filter empty groups out of items list - pubsub.lazy_guild: check operator's op on Operation.to_dict - pubsub.lazy_guild: add resync() method to deal with re-SYNCs easily - pubsub.lazy_guild: fix get_item_index and get_group_item_index - pubsub.lazy_guild: drop the ops in complex presence updates in favour of using resync() - pubsub.lazy_guild: resync on group resorting - pubsub.lazy_guild: use resync() on role delete --- litecord/gateway/websocket.py | 5 +- litecord/pubsub/lazy_guild.py | 180 +++++++++++++++++++++++----------- 2 files changed, 125 insertions(+), 60 deletions(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index e14ca41..c2d7080 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -116,7 +116,7 @@ class GatewayWebsocket: """ encoded = self.encoder(payload) - if len(encoded) < 1024: + if len(encoded) < 2048: log.debug('sending\n{}', pprint.pformat(payload)) else: # log.debug('sending {}', pprint.pformat(payload)) @@ -750,6 +750,9 @@ class GatewayWebsocket: if guild_id not in gids: return + log.debug('lazy request: members: {}', + data.get('members', [])) + # make shard query lazy_guilds = self.ext.dispatcher.backends['lazy_guild'] diff --git a/litecord/pubsub/lazy_guild.py b/litecord/pubsub/lazy_guild.py index 48f0f0b..c086852 100644 --- a/litecord/pubsub/lazy_guild.py +++ b/litecord/pubsub/lazy_guild.py @@ -84,6 +84,22 @@ class MemberList: for group in self.groups: yield group, self.data[group.gid] + @property + def iter_non_empty(self): + """Only iterate through non-empty groups""" + + for group, member_ids in self: + count = len(member_ids) + + if group.gid == 'offline': + yield group, member_ids + continue + + if count == 0: + continue + + yield group, member_ids + @property def groups_complete(self): """Yield only group info for groups that have more than @@ -91,17 +107,8 @@ class MemberList: Always will output the 'offline' group. """ - - for group, member_ids in self: + for group, member_ids in self.iter_non_empty: count = len(member_ids) - - if group.gid == 'offline': - yield group, count - continue - - if count == 0: - continue - yield group, count @property @@ -110,7 +117,6 @@ class MemberList: # this isn't actively used. return {g.gid: g for g in self.groups} - def is_empty(self, group_id: GroupID) -> bool: """Return if a group is empty.""" return len(self.data[group_id]) == 0 @@ -129,6 +135,10 @@ class Operation: @property def to_dict(self) -> dict: """Return a dictionary representation of the operation.""" + if self.list_op not in ('SYNC', 'INVALIDATE', + 'INSERT', 'UPDATE', 'DELETE'): + raise ValueError('Invalid list operator') + res = { 'op': self.list_op } @@ -549,6 +559,48 @@ class GuildMemberList: return dispatched + async def resync(self, session_ids: int, item_index: int) -> List[str]: + """Send a SYNC event to all states that are subscribed to an item. + + Returns + ------- + List[str] + The list of session ids that had the SYNC operation + resent to. + """ + + result = [] + + for session_id in session_ids: + # find the list range that the group was on + # so we resync only the given range, instead + # of the whole list state. + ranges = self.state[session_id] + + try: + # get the only range where the group is in + role_range = next((r_min, r_max) for r_min, r_max in ranges + if r_min <= item_index <= r_max) + except StopIteration: + log.debug('ignoring sess_id={}, no range for item {}, {}', + session_id, item_index, ranges) + continue + + # do resync-ing in the background + result.append(session_id) + self.loop.create_task( + self.shard_query(session_id, [role_range]) + ) + + return result + + async def resync_by_item(self, item_index: int): + """Resync but only giving the item index.""" + return await self.resync( + self.get_subs(item_index), + item_index + ) + async def shard_query(self, session_id: str, ranges: list): """Send a GUILD_MEMBER_LIST_UPDATE event for a shard that is querying about the member list. @@ -611,31 +663,36 @@ class GuildMemberList: def get_item_index(self, user_id: Union[str, int]) -> int: """Get the item index a user is on.""" user_id = int(user_id) - index = 0 + index = 1 + + for g, member_ids in self.list.iter_non_empty: + print('step', index, g.gid, user_id, member_ids) - for _, member_ids in self.list: try: relative_index = member_ids.index(user_id) index += relative_index + + print('found, finish', index, relative_index) return index except ValueError: pass # +1 is for the group item - index = (index or 0) + len(member_ids) + 1 + print('not found, skip', index, len(member_ids)) + index += 1 + len(member_ids) + print('not found, finish') return None def get_group_item_index(self, group_id: GroupID) -> int: """Get the item index a group is on.""" - index = None + index = 0 - for group, member_ids in self.list: + for group, count in self.list.groups_complete: if group.gid == group_id: - index += 1 return index - index = (index or 0) + 1 + len(member_ids) + index += 1 + count return None @@ -708,30 +765,16 @@ class GuildMemberList: ops = [] old_user_index = self.get_item_index(user_id) + old_group_index = self.get_group_item_index(old_group) + ops.append(Operation('DELETE', { 'index': old_user_index })) # do the necessary changes self.list.data[old_group].remove(user_id) - - # if self.list.is_empty(old_group): - # ops.append(Operation('DELETE', { - # 'index': self.get_group_item_index(old_group) - # })) - self.list.data[new_group].append(user_id) - # put a INSERT operation if this is - # the first member in the group. - if self.list.is_birth(new_group): - ops.append(Operation('INSERT', { - 'index': self.get_group_item_index(new_group), - 'item': { - 'group': str(new_group), 'count': 1 - } - })) - await self._sort_groups() new_user_index = self.get_item_index(user_id) @@ -744,13 +787,43 @@ class GuildMemberList: 'item': self.items[new_user_index] })) - session_ids_old = self.get_subs(old_user_index) - session_ids_new = self.get_subs(new_user_index) + # put a INSERT operation if this is + # the first member in the group. + if self.list.is_birth(new_group) and new_group != 'offline': + ops.append(Operation('INSERT', { + 'index': self.get_group_item_index(new_group), + 'item': { + 'group': str(new_group), 'count': 1 + } + })) - return await self._dispatch_sess( - list(session_ids_old) + list(session_ids_new), - ops - ) + # only add DELETE for the old group after + # both operations. + if self.list.is_empty(old_group): + ops.append(Operation('DELETE', { + 'index': old_group_index, + })) + + session_ids_old = list(self.get_subs(old_user_index)) + session_ids_new = list(self.get_subs(new_user_index)) + # session_ids = set(session_ids_old + session_ids_new) + + # NOTE: this section is what a realistic implementation + # of lazy guilds would do. i've been tackling the same issue + # for a week without success, something alongside the indexes + # of the UPDATE operation don't match up with the official client. + + # from now on i'm pulling a mass-SYNC for both session ids, + # which should be handled gracefully, but then we're going off-spec. + + # return await self._dispatch_sess( + # session_ids, + # ops + # ) + + # merge both results together + return (await self.resync(session_ids_old, old_user_index) + + await self.resync(session_ids_new, new_user_index)) async def pres_update(self, user_id: int, partial_presence: Presence): @@ -909,6 +982,9 @@ class GuildMemberList: """ role_id = int(role['id']) + old_index = self.get_group_item_index + old_sessions = list(self.get_subs(old_index)) + groups_idx = self._get_role_as_group_idx(role_id) if groups_idx is None: log.debug('ignoring rid={} because not group (gid={}, cid={})', @@ -932,6 +1008,10 @@ class GuildMemberList: [g.gid for g in new_groups]) self.list.groups = new_groups + new_index = self.get_group_item_index(role_id) + + return (await self.resync(old_sessions, old_index) + + await self.resync_by_item(new_index)) async def role_update(self, role: dict): """Update a role. @@ -1055,25 +1135,7 @@ class GuildMemberList: log.debug('there are {} session ids to resync (for item {})', len(sess_ids_resync), role_item_index) - for session_id in sess_ids_resync: - # find the list range that the group was on - # so we resync only the given range, instead - # of the whole list state. - ranges = self.state[session_id] - - try: - # get the only range where the group is in - role_range = next((r_min, r_max) for r_min, r_max in ranges - if r_min <= role_item_index <= r_max) - except StopIteration: - log.debug('ignoring sess_id={}, no range for item {}, {}', - session_id, role_item_index, ranges) - continue - - # do resync-ing in the background - self.loop.create_task( - self.shard_query(session_id, [role_range]) - ) + return await self.resync(sess_ids_resync, role_item_index) class LazyGuildDispatcher(Dispatcher):