pubsub.lazy_guild: use SYNC instead of DELETE->INSERT->DELETE

This makes our implementation go off-spec (depends on what you'd even
call a spec, considering lazy guilds are not documented and probably
won't be), but well... *it works*

There are also a lot of print-debugs on this commit, will clean them up.

 - pubsub.lazy_guild: filter empty groups out of items list
 - pubsub.lazy_guild: check operator's op on Operation.to_dict
 - pubsub.lazy_guild: add resync() method to deal with re-SYNCs easily
 - pubsub.lazy_guild: fix get_item_index and get_group_item_index
 - pubsub.lazy_guild: drop the ops in complex presence updates in favour
    of using resync()
 - pubsub.lazy_guild: resync on group resorting
 - pubsub.lazy_guild: use resync() on role delete
This commit is contained in:
Luna Mendes 2018-11-12 02:02:56 -03:00
parent ea6e228bb4
commit fad8484a74
2 changed files with 125 additions and 60 deletions

View File

@ -116,7 +116,7 @@ class GatewayWebsocket:
"""
encoded = self.encoder(payload)
if len(encoded) < 1024:
if len(encoded) < 2048:
log.debug('sending\n{}', pprint.pformat(payload))
else:
# log.debug('sending {}', pprint.pformat(payload))
@ -750,6 +750,9 @@ class GatewayWebsocket:
if guild_id not in gids:
return
log.debug('lazy request: members: {}',
data.get('members', []))
# make shard query
lazy_guilds = self.ext.dispatcher.backends['lazy_guild']

View File

@ -84,6 +84,22 @@ class MemberList:
for group in self.groups:
yield group, self.data[group.gid]
@property
def iter_non_empty(self):
"""Only iterate through non-empty groups"""
for group, member_ids in self:
count = len(member_ids)
if group.gid == 'offline':
yield group, member_ids
continue
if count == 0:
continue
yield group, member_ids
@property
def groups_complete(self):
"""Yield only group info for groups that have more than
@ -91,17 +107,8 @@ class MemberList:
Always will output the 'offline' group.
"""
for group, member_ids in self:
for group, member_ids in self.iter_non_empty:
count = len(member_ids)
if group.gid == 'offline':
yield group, count
continue
if count == 0:
continue
yield group, count
@property
@ -110,7 +117,6 @@ class MemberList:
# this isn't actively used.
return {g.gid: g for g in self.groups}
def is_empty(self, group_id: GroupID) -> bool:
"""Return if a group is empty."""
return len(self.data[group_id]) == 0
@ -129,6 +135,10 @@ class Operation:
@property
def to_dict(self) -> dict:
"""Return a dictionary representation of the operation."""
if self.list_op not in ('SYNC', 'INVALIDATE',
'INSERT', 'UPDATE', 'DELETE'):
raise ValueError('Invalid list operator')
res = {
'op': self.list_op
}
@ -549,6 +559,48 @@ class GuildMemberList:
return dispatched
async def resync(self, session_ids: int, item_index: int) -> List[str]:
"""Send a SYNC event to all states that are subscribed to an item.
Returns
-------
List[str]
The list of session ids that had the SYNC operation
resent to.
"""
result = []
for session_id in session_ids:
# find the list range that the group was on
# so we resync only the given range, instead
# of the whole list state.
ranges = self.state[session_id]
try:
# get the only range where the group is in
role_range = next((r_min, r_max) for r_min, r_max in ranges
if r_min <= item_index <= r_max)
except StopIteration:
log.debug('ignoring sess_id={}, no range for item {}, {}',
session_id, item_index, ranges)
continue
# do resync-ing in the background
result.append(session_id)
self.loop.create_task(
self.shard_query(session_id, [role_range])
)
return result
async def resync_by_item(self, item_index: int):
"""Resync but only giving the item index."""
return await self.resync(
self.get_subs(item_index),
item_index
)
async def shard_query(self, session_id: str, ranges: list):
"""Send a GUILD_MEMBER_LIST_UPDATE event
for a shard that is querying about the member list.
@ -611,31 +663,36 @@ class GuildMemberList:
def get_item_index(self, user_id: Union[str, int]) -> int:
"""Get the item index a user is on."""
user_id = int(user_id)
index = 0
index = 1
for g, member_ids in self.list.iter_non_empty:
print('step', index, g.gid, user_id, member_ids)
for _, member_ids in self.list:
try:
relative_index = member_ids.index(user_id)
index += relative_index
print('found, finish', index, relative_index)
return index
except ValueError:
pass
# +1 is for the group item
index = (index or 0) + len(member_ids) + 1
print('not found, skip', index, len(member_ids))
index += 1 + len(member_ids)
print('not found, finish')
return None
def get_group_item_index(self, group_id: GroupID) -> int:
"""Get the item index a group is on."""
index = None
index = 0
for group, member_ids in self.list:
for group, count in self.list.groups_complete:
if group.gid == group_id:
index += 1
return index
index = (index or 0) + 1 + len(member_ids)
index += 1 + count
return None
@ -708,30 +765,16 @@ class GuildMemberList:
ops = []
old_user_index = self.get_item_index(user_id)
old_group_index = self.get_group_item_index(old_group)
ops.append(Operation('DELETE', {
'index': old_user_index
}))
# do the necessary changes
self.list.data[old_group].remove(user_id)
# if self.list.is_empty(old_group):
# ops.append(Operation('DELETE', {
# 'index': self.get_group_item_index(old_group)
# }))
self.list.data[new_group].append(user_id)
# put a INSERT operation if this is
# the first member in the group.
if self.list.is_birth(new_group):
ops.append(Operation('INSERT', {
'index': self.get_group_item_index(new_group),
'item': {
'group': str(new_group), 'count': 1
}
}))
await self._sort_groups()
new_user_index = self.get_item_index(user_id)
@ -744,13 +787,43 @@ class GuildMemberList:
'item': self.items[new_user_index]
}))
session_ids_old = self.get_subs(old_user_index)
session_ids_new = self.get_subs(new_user_index)
# put a INSERT operation if this is
# the first member in the group.
if self.list.is_birth(new_group) and new_group != 'offline':
ops.append(Operation('INSERT', {
'index': self.get_group_item_index(new_group),
'item': {
'group': str(new_group), 'count': 1
}
}))
return await self._dispatch_sess(
list(session_ids_old) + list(session_ids_new),
ops
)
# only add DELETE for the old group after
# both operations.
if self.list.is_empty(old_group):
ops.append(Operation('DELETE', {
'index': old_group_index,
}))
session_ids_old = list(self.get_subs(old_user_index))
session_ids_new = list(self.get_subs(new_user_index))
# session_ids = set(session_ids_old + session_ids_new)
# NOTE: this section is what a realistic implementation
# of lazy guilds would do. i've been tackling the same issue
# for a week without success, something alongside the indexes
# of the UPDATE operation don't match up with the official client.
# from now on i'm pulling a mass-SYNC for both session ids,
# which should be handled gracefully, but then we're going off-spec.
# return await self._dispatch_sess(
# session_ids,
# ops
# )
# merge both results together
return (await self.resync(session_ids_old, old_user_index) +
await self.resync(session_ids_new, new_user_index))
async def pres_update(self, user_id: int,
partial_presence: Presence):
@ -909,6 +982,9 @@ class GuildMemberList:
"""
role_id = int(role['id'])
old_index = self.get_group_item_index
old_sessions = list(self.get_subs(old_index))
groups_idx = self._get_role_as_group_idx(role_id)
if groups_idx is None:
log.debug('ignoring rid={} because not group (gid={}, cid={})',
@ -932,6 +1008,10 @@ class GuildMemberList:
[g.gid for g in new_groups])
self.list.groups = new_groups
new_index = self.get_group_item_index(role_id)
return (await self.resync(old_sessions, old_index) +
await self.resync_by_item(new_index))
async def role_update(self, role: dict):
"""Update a role.
@ -1055,25 +1135,7 @@ class GuildMemberList:
log.debug('there are {} session ids to resync (for item {})',
len(sess_ids_resync), role_item_index)
for session_id in sess_ids_resync:
# find the list range that the group was on
# so we resync only the given range, instead
# of the whole list state.
ranges = self.state[session_id]
try:
# get the only range where the group is in
role_range = next((r_min, r_max) for r_min, r_max in ranges
if r_min <= role_item_index <= r_max)
except StopIteration:
log.debug('ignoring sess_id={}, no range for item {}, {}',
session_id, role_item_index, ranges)
continue
# do resync-ing in the background
self.loop.create_task(
self.shard_query(session_id, [role_range])
)
return await self.resync(sess_ids_resync, role_item_index)
class LazyGuildDispatcher(Dispatcher):