pubsub.lazy_guild: use member ids instead of presences on main list

This commit is contained in:
Luna Mendes 2018-11-11 00:14:59 -03:00
parent 864f6a5d9f
commit ea6e228bb4
1 changed files with 250 additions and 161 deletions

View File

@ -3,7 +3,7 @@ Main code for Lazy Guild implementation in litecord.
""" """
import pprint import pprint
import asyncio import asyncio
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict, field
from collections import defaultdict from collections import defaultdict
from typing import Any, List, Dict, Union from typing import Any, List, Dict, Union
@ -40,38 +40,43 @@ class MemberList:
Attributes Attributes
---------- ----------
groups: List[:class:`GroupInfo`] groups:
List with all group information, sorted List with all group information, sorted
by their actual position in the member list. by their actual position in the member list.
data: data:
Actual dictionary holding a list of presences Dictionary holding a list of member IDs
that are connected to the given group. for each group.
members:
Dictionary holding member information for
each member in the list.
presences:
Dictionary holding presence data for each
member.
overwrites: overwrites:
Holds the channel overwrite information Holds the channel overwrite information
for the list (a list is tied to a single for the list (a list is tied to a single
channel, and since only roles with Read Messages channel, and since only roles with Read Messages
can be in the list, we need to store that information) can be in the list, we need to store that information)
""" """
groups: List[GroupInfo] = None groups: List[GroupInfo] = field(default_factory=list)
data: Dict[GroupID, List[int]] = field(default_factory=dict)
#: this attribute is not actively used presences: Dict[int, Presence] = field(default_factory=dict)
# but i'm keeping it here to future-proof members: Dict[int, Dict[str, Any]] = field(default_factory=dict)
# in the case where we need to fetch info overwrites: Dict[int, Dict[str, Any]] = field(default_factory=dict)
# by the group id.
group_info: Dict[GroupID, GroupInfo] = None
data: Dict[GroupID, List[Presence]] = None
overwrites: Dict[int, Dict[str, Any]] = None
def __bool__(self): def __bool__(self):
"""Return if the current member list is fully initialized.""" """Return if the current member list is fully initialized."""
list_dict = asdict(self) list_dict = asdict(self)
return all(v is not None for v in list_dict.values())
# ignore the bool status of overwrites
return all(bool(list_dict[k])
for k in ('groups', 'data', 'presences', 'members'))
def __iter__(self): def __iter__(self):
"""Iterate over all groups in the correct order. """Iterate over all groups in the correct order.
Yields a tuple containing :class:`GroupInfo` and Yields a tuple containing :class:`GroupInfo` and
the List[Presence] for the group. the List[int] for the group.
""" """
if not self.groups: if not self.groups:
return return
@ -79,6 +84,41 @@ class MemberList:
for group in self.groups: for group in self.groups:
yield group, self.data[group.gid] yield group, self.data[group.gid]
@property
def groups_complete(self):
"""Yield only group info for groups that have more than
1 member.
Always will output the 'offline' group.
"""
for group, member_ids in self:
count = len(member_ids)
if group.gid == 'offline':
yield group, count
continue
if count == 0:
continue
yield group, count
@property
def group_info(self) -> dict:
"""Return a dictionary with group information."""
# this isn't actively used.
return {g.gid: g for g in self.groups}
def is_empty(self, group_id: GroupID) -> bool:
"""Return if a group is empty."""
return len(self.data[group_id]) == 0
def is_birth(self, group_id: GroupID) -> bool:
"""Return if a group is with a single presence."""
return len(self.data[group_id]) == 1
@dataclass @dataclass
class Operation: class Operation:
@ -126,6 +166,21 @@ def display_name(member_nicks: Dict[str, str], presence: Presence) -> str:
return nick or uname return nick or uname
def merge(member: dict, presence: Presence) -> dict:
"""Merge a member dictionary and a presence dictionary
into an item."""
return {**member, **{
'presence': {
'user': {
'id': str(member['user']['id']),
},
'status': presence['status'],
'game': presence['game'],
'activities': presence['activities']
}
}}
class GuildMemberList: class GuildMemberList:
"""This class stores the current member list information """This class stores the current member list information
for a guild (by channel). for a guild (by channel).
@ -159,10 +214,10 @@ class GuildMemberList:
self.channel_id = channel_id self.channel_id = channel_id
self.main = main_lg self.main = main_lg
self.list = MemberList(None, None, None, None) self.list = MemberList()
#: store the states that are subscribed to the list #: store the states that are subscribed to the list.
# type is{session_id: set[list]} # type is {session_id: set[list]}
self.state = defaultdict(set) self.state = defaultdict(set)
self._list_lock = asyncio.Lock() self._list_lock = asyncio.Lock()
@ -242,7 +297,7 @@ class GuildMemberList:
# part of the group. # part of the group.
return final_perms.bits.read_messages return final_perms.bits.read_messages
async def get_roles(self) -> List[GroupInfo]: async def get_role_groups(self) -> List[GroupInfo]:
"""Get role information, but only: """Get role information, but only:
- the ID - the ID
- the name - the name
@ -254,7 +309,7 @@ class GuildMemberList:
being referred to this :class:`GuildMemberList` being referred to this :class:`GuildMemberList`
instance. instance.
The list is sorted by position. The list is sorted by each role's position.
""" """
roledata = await self.storage.db.fetch(""" roledata = await self.storage.db.fetch("""
SELECT id, name, hoist, position, permissions SELECT id, name, hoist, position, permissions
@ -273,14 +328,15 @@ class GuildMemberList:
hoisted = sorted(hoisted, key=lambda group: group.position, hoisted = sorted(hoisted, key=lambda group: group.position,
reverse=True) reverse=True)
# we need to store them since # we need to store the overwrites since
# we have incoming presences to manage. # we have incoming presences to manage.
await self._fetch_overwrites() await self._fetch_overwrites()
return list(filter(self._can_read_chan, hoisted)) return list(filter(self._can_read_chan, hoisted))
async def set_groups(self): async def set_groups(self):
"""Get the groups for the member list.""" """Get the groups for the member list."""
role_groups = await self.get_roles() role_groups = await self.get_role_groups()
# inject default groups 'online' and 'offline' # inject default groups 'online' and 'offline'
# their position is always going to be the last ones. # their position is always going to be the last ones.
@ -289,12 +345,10 @@ class GuildMemberList:
GroupInfo('offline', 'offline', MAX_ROLES + 2, 0) GroupInfo('offline', 'offline', MAX_ROLES + 2, 0)
] ]
self.list.group_info = {g.gid: g for g in role_groups} async def get_group_for_member(self, member_id: int,
async def get_group(self, member_id: int,
roles: List[Union[str, int]], roles: List[Union[str, int]],
status: str) -> int: status: str) -> GroupID:
"""Return a fitting group ID for the user.""" """Return a fitting group ID for the member."""
member_roles = list(map(int, roles)) member_roles = list(map(int, roles))
# get the member's permissions relative to the channel # get the member's permissions relative to the channel
@ -312,19 +366,21 @@ class GuildMemberList:
return group_id return group_id
async def _pass_1(self, guild_presences: List[Presence]): async def _list_fill_groups(self, member_ids: List[int]):
"""First pass on generating the member list. """Fill in groups with the member ids."""
for member_id in member_ids:
presence = self.list.presences[member_id]
This assigns all presences a single group. group_id = await self.get_group_for_member(
"""
for presence in guild_presences:
member_id = int(presence['user']['id'])
group_id = await self.get_group(
member_id, presence['roles'], presence['status'] member_id, presence['roles'], presence['status']
) )
self.list.data[group_id].append(presence) member = await self.storage.get_member_data_one(
self.guild_id, member_id
)
self.list.members[member_id] = member
self.list.data[group_id].append(member_id)
async def get_member_nicks_dict(self) -> dict: async def get_member_nicks_dict(self) -> dict:
"""Get a dictionary with nickname information.""" """Get a dictionary with nickname information."""
@ -338,37 +394,45 @@ class GuildMemberList:
return member_nicks return member_nicks
async def _sort_groups(self): def display_name(self, member_id: int):
member_nicks = await self.get_member_nicks_dict() member = self.list.members.get(member_id)
for group_members in self.list.data.values(): if not member_id:
return None
username = member['user']['username']
nickname = member['nick']
return nickname or username
async def _sort_groups(self):
for member_ids in self.list.data.values():
# this should update the list in-place # this should update the list in-place
group_members.sort( member_ids.sort(
key=lambda p: display_name(member_nicks, p), key=self.display_name)
reverse=True)
print('post sort')
pprint.pprint(group_members)
async def __init_member_list(self): async def __init_member_list(self):
"""Generate the main member list with groups.""" """Generate the main member list with groups."""
member_ids = await self.storage.get_member_ids(self.guild_id) member_ids = await self.storage.get_member_ids(self.guild_id)
guild_presences = await self.presence.guild_presences( presences = await self.presence.guild_presences(
member_ids, self.guild_id) member_ids, self.guild_id)
# set presences in the list
self.list.presences = {int(p['user']['id']): p
for p in presences}
await self.set_groups() await self.set_groups()
log.debug('{} presences, {} groups', log.debug('init: {} members, {} groups',
len(guild_presences), len(member_ids),
len(self.list.groups)) len(self.list.groups))
# allocate a list per group
self.list.data = {group.gid: [] for group in self.list.groups} self.list.data = {group.gid: [] for group in self.list.groups}
# first pass: set which presences await self._list_fill_groups(member_ids)
# go to which groups
await self._pass_1(guild_presences)
# second pass: sort each group's members # second pass: sort each group's members
# by the display name # by the display name
@ -381,6 +445,13 @@ class GuildMemberList:
finally: finally:
self._list_lock.release() self._list_lock.release()
def get_member_as_item(self, member_id: int) -> dict:
"""Get an item representing a member."""
member = self.list.members[member_id]
presence = self.list.presences[member_id]
return merge(member, presence)
@property @property
def items(self) -> list: def items(self) -> list:
"""Main items list.""" """Main items list."""
@ -394,17 +465,23 @@ class GuildMemberList:
res = [] res = []
# NOTE: maybe use map()? # NOTE: maybe use map()?
for group, presences in self.list: for group, member_ids in self.list:
# do not send information on groups
# that don't have anyone
if not member_ids:
continue
res.append({ res.append({
'group': { 'group': {
'id': str(group.gid), 'id': str(group.gid),
'count': len(presences), 'count': len(member_ids),
} }
}) })
for presence in presences: for member_id in member_ids:
res.append({ res.append({
'member': presence 'member': self.get_member_as_item(member_id)
}) })
return res return res
@ -434,11 +511,12 @@ class GuildMemberList:
state = self.state_man.fetch_raw(session_id) state = self.state_man.fetch_raw(session_id)
return state return state
except KeyError: except KeyError:
self.unsub(session_id) return None
return
async def _dispatch_sess(self, session_ids: List[str], async def _dispatch_sess(self, session_ids: List[str],
operations: List[Operation]): operations: List[Operation]):
"""Dispatch a GUILD_MEMBER_LIST_UPDATE to the
given session ids."""
# construct the payload to dispatch # construct the payload to dispatch
payload = { payload = {
@ -447,9 +525,9 @@ class GuildMemberList:
'groups': [ 'groups': [
{ {
'count': len(presences),
'id': str(group.gid), 'id': str(group.gid),
} for group, presences in self.list 'count': count,
} for group, count in self.list.groups_complete
], ],
'ops': [ 'ops': [
@ -527,19 +605,39 @@ class GuildMemberList:
'items': self.items[start:end] 'items': self.items[start:end]
})) }))
# send SYNCs to the state that requested
await self._dispatch_sess([session_id], ops) await self._dispatch_sess([session_id], ops)
def get_item_index(self, user_id: Union[str, int]): def get_item_index(self, user_id: Union[str, int]) -> int:
"""Get the item index a user is on.""" """Get the item index a user is on."""
def _get_id(item): user_id = int(user_id)
# item can be a group item or a member item index = 0
return item.get('member', {}).get('user', {}).get('id')
# get the updated item's index for _, member_ids in self.list:
return index_by_func( try:
lambda p: _get_id(p) == str(user_id), relative_index = member_ids.index(user_id)
self.items index += relative_index
) return index
except ValueError:
pass
# +1 is for the group item
index = (index or 0) + len(member_ids) + 1
return None
def get_group_item_index(self, group_id: GroupID) -> int:
"""Get the item index a group is on."""
index = None
for group, member_ids in self.list:
if group.gid == group_id:
index += 1
return index
index = (index or 0) + 1 + len(member_ids)
return None
def state_is_subbed(self, item_index, session_id: str) -> bool: def state_is_subbed(self, item_index, session_id: str) -> bool:
"""Return if a state's ranges include the given """Return if a state's ranges include the given
@ -568,6 +666,9 @@ class GuildMemberList:
user_id) user_id)
return [] return []
print('item found', item_index)
pprint.pprint(self.items)
item = self.items[item_index] item = self.items[item_index]
session_ids = self.get_subs(item_index) session_ids = self.get_subs(item_index)
@ -583,87 +684,79 @@ class GuildMemberList:
] ]
) )
async def _pres_update_complex(self, user_id: int, async def _pres_update_complex(
old_group: str, old_index: int, self, user_id: int,
new_group: str): old_group: GroupID, rel_index: int,
"""Move a member between groups.""" new_group: GroupID):
log.debug('complex update: uid={} old={} old_idx={} new={}', """Move a member between groups.
user_id, old_group, old_index, new_group)
old_group_presences = self.list.data[old_group]
old_item_index = self.get_item_index(user_id)
# make a copy of current presence to insert in the new group Parameters
current_presence = dict(old_group_presences[old_index]) ----------
user_id:
The user that is moving.
old_group:
The group the user is currently in.
rel_index:
The relative index of the user inside old_group's list.
new_group:
The group the user has to move to.
"""
# step 1: remove the old presence (old_index is relative log.debug('complex update: uid={} old={} rel_idx={} new={}',
# to the group, and not the items list) user_id, old_group, rel_index, new_group)
del old_group_presences[old_index]
# we need to insert current_presence to the new group ops = []
# but we also need to calculate its index to insert on.
presences = self.list.data[new_group]
best_index = 0 old_user_index = self.get_item_index(user_id)
member_nicks = await self.get_member_nicks_dict() ops.append(Operation('DELETE', {
current_name = display_name(member_nicks, current_presence) 'index': old_user_index
}))
# go through each one until we find the best placement # do the necessary changes
for presence in presences: self.list.data[old_group].remove(user_id)
name = display_name(member_nicks, presence)
print(name, current_name, current_name < name) # if self.list.is_empty(old_group):
# ops.append(Operation('DELETE', {
# 'index': self.get_group_item_index(old_group)
# }))
# TODO: check if this works self.list.data[new_group].append(user_id)
if current_name < name:
break
best_index += 1 # put a INSERT operation if this is
# the first member in the group.
# insert the presence at the index if self.list.is_birth(new_group):
print('pre insert') ops.append(Operation('INSERT', {
pprint.pprint(presences) 'index': self.get_group_item_index(new_group),
presences.insert(best_index - 1, current_presence)
log.debug('inserted cur pres @ pres idx {}', best_index - 1)
print('post insert')
pprint.pprint(presences)
new_item_index = self.get_item_index(user_id)
log.debug('assigned new item index {} to uid {}',
new_item_index, user_id)
print('items')
pprint.pprint(self.items)
session_ids_old = self.get_subs(old_item_index)
session_ids_new = self.get_subs(new_item_index)
# dispatch events to both the old states and
# new states.
return await self._dispatch_sess(
# inefficient, but necessary since we
# want to merge both session ids.
list(session_ids_old) + list(session_ids_new),
[
Operation('DELETE', {
'index': old_item_index,
}),
Operation('INSERT', {
'index': new_item_index,
'item': { 'item': {
'member': current_presence 'group': str(new_group), 'count': 1
} }
}) }))
]
await self._sort_groups()
new_user_index = self.get_item_index(user_id)
ops.append(Operation('INSERT', {
'index': new_user_index,
# TODO: maybe construct the new item manually
# instead of resorting to items list?
'item': self.items[new_user_index]
}))
session_ids_old = self.get_subs(old_user_index)
session_ids_new = self.get_subs(new_user_index)
return await self._dispatch_sess(
list(session_ids_old) + list(session_ids_new),
ops
) )
async def pres_update(self, user_id: int, async def pres_update(self, user_id: int,
partial_presence: Dict[str, Any]): partial_presence: Presence):
"""Update a presence inside the member list. """Update a presence inside the member list.
There are 4 types of updates that can happen for a user in a group: There are 5 types of updates that can happen for a user in a group:
- from 'offline' to any - from 'offline' to any
- from any to 'offline' - from any to 'offline'
- from any to any - from any to any
@ -683,7 +776,8 @@ class GuildMemberList:
""" """
await self._init_check() await self._init_check()
old_group, old_index, old_presence = None, None, None old_group = None
old_presence = self.list.presences[user_id]
has_nick = 'nick' in partial_presence has_nick = 'nick' in partial_presence
# partial presences don't have 'nick'. we only use it # partial presences don't have 'nick'. we only use it
@ -694,40 +788,42 @@ class GuildMemberList:
except KeyError: except KeyError:
pass pass
for group, presences in self.list: for group, member_ids in self.list:
p_idx = index_by_func( try:
lambda p: p['user']['id'] == str(user_id), old_index = member_ids.index(user_id)
presences) except ValueError:
log.debug('p_idx for group {!r} = {}',
group.gid, p_idx)
if p_idx is None:
log.debug('skipping group {}', group) log.debug('skipping group {}', group)
continue continue
# make a copy since we're modifying in-place log.debug('found index for uid={}: gid={}',
old_group = group.gid user_id, group.gid)
old_index = p_idx
old_presence = dict(presences[p_idx])
# be ready if it is a simple update old_group = group.gid
presences[p_idx].update(partial_presence)
break break
# if we didn't find any old group for
# the member, then that means the member
# wasn't in the list in the first place
if not old_group: if not old_group:
log.warning('pres update with unknown old group uid={}', log.warning('pres update with unknown old group uid={}',
user_id) user_id)
return [] return []
roles = partial_presence.get('roles', old_presence['roles']) roles = partial_presence.get('roles', old_presence['roles'])
new_status = partial_presence.get('status', old_presence['status']) status = partial_presence.get('status', old_presence['status'])
new_group = await self.get_group(user_id, roles, new_status) # calculate a possible new group
new_group = await self.get_group_for_member(
user_id, roles, status)
log.debug('pres update: gid={} cid={} old_g={} new_g={}', log.debug('pres update: gid={} cid={} old_g={} new_g={}',
self.guild_id, self.channel_id, old_group, new_group) self.guild_id, self.channel_id, old_group, new_group)
# update our presence with the given partial presence
# since in both cases we'd update it anyways
self.list.presences[user_id].update(partial_presence)
# if we're going to the same group AND there are no # if we're going to the same group AND there are no
# nickname changes, treat this as a simple update # nickname changes, treat this as a simple update
if old_group == new_group and not has_nick: if old_group == new_group and not has_nick:
@ -834,6 +930,7 @@ class GuildMemberList:
'res={}', 'res={}',
role_id, group.position, self.guild_id, self.channel_id, role_id, group.position, self.guild_id, self.channel_id,
[g.gid for g in new_groups]) [g.gid for g in new_groups])
self.list.groups = new_groups self.list.groups = new_groups
async def role_update(self, role: dict): async def role_update(self, role: dict):
@ -896,10 +993,7 @@ class GuildMemberList:
# states we'll resend the list info to. # states we'll resend the list info to.
# find the item id for the group info # find the item id for the group info
role_item_index = index_by_func( role_item_index = self.get_group_item_index(role_id)
lambda d: str_(d.get('group', {}).get('id')) == str(role_id),
self.items
)
# we only resync when we actually have an item to resync # we only resync when we actually have an item to resync
# we don't have items to resync when we: # we don't have items to resync when we:
@ -926,29 +1020,24 @@ class GuildMemberList:
else: else:
log.warning('list unstable: {} not on group list', role_id) log.warning('list unstable: {} not on group list', role_id)
# then the state of it in the group_info list
try:
self.list.group_info.pop(role_id)
except KeyError:
log.warning('list unstable: {} not on group info', role_id)
# now the data info # now the data info
try: try:
# we need to reassign those orphan presences # we need to reassign those orphan presences
# into a new group # into a new group
presences = self.list.data.pop(role_id) member_ids = self.list.data.pop(role_id)
# by calling the same functions we'd be calling # by calling the same functions we'd be calling
# when generating the guild, we can reassign # when generating the guild, we can reassign
# the presences into new groups and sort # the presences into new groups and sort
# the new presences so we achieve the correct state # the new presences so we achieve the correct state
log.debug('reassigning {} presences', len(presences)) log.debug('reassigning {} presences', len(member_ids))
await self._pass_1(presences) await self._list_fill_groups(
member_ids
)
await self._sort_groups() await self._sort_groups()
except KeyError: except KeyError:
log.warning('list unstable: {} not in data dict', role_id) log.warning('list unstable: {} not in data dict', role_id)
# and then overwrites.
try: try:
self.list.overwrites.pop(role_id) self.list.overwrites.pop(role_id)
except KeyError: except KeyError: