mirror of https://gitlab.com/litecord/litecord.git
pubsub.lazy_guild: add implementation for pres_update
- utils: fix index_by_func
This commit is contained in:
parent
c212cbd392
commit
bd9c4cb26c
|
|
@ -710,7 +710,7 @@ class GatewayWebsocket:
|
|||
list_op = 'SYNC' | 'INVALIDATE' | 'INSERT' | 'UPDATE' | 'DELETE'
|
||||
|
||||
list_data = {
|
||||
'id': "everyone" // ??
|
||||
'id': channel_id | 'everyone',
|
||||
'guild_id': guild_id,
|
||||
|
||||
'ops': [
|
||||
|
|
@ -723,10 +723,10 @@ class GatewayWebsocket:
|
|||
// exists if op = 'SYNC'
|
||||
'items': sync_item[],
|
||||
|
||||
// exists if op = 'INSERT' or 'DELETE'
|
||||
// exists if op == 'INSERT' | 'DELETE' | 'UPDATE'
|
||||
'index': num,
|
||||
|
||||
// exists if op = 'INSERT'
|
||||
// exists if op == 'INSERT' | 'UPDATE'
|
||||
'item': sync_item,
|
||||
}
|
||||
],
|
||||
|
|
|
|||
|
|
@ -113,9 +113,11 @@ class PresenceManager:
|
|||
for member_list in lists:
|
||||
session_ids = await member_list.pres_update(
|
||||
int(member['user']['id']),
|
||||
member['roles'],
|
||||
state['status'],
|
||||
game
|
||||
{
|
||||
'roles': member['roles'],
|
||||
'status': state['status'],
|
||||
'game': game
|
||||
}
|
||||
)
|
||||
|
||||
log.debug('Lazy Dispatch to {}',
|
||||
|
|
|
|||
|
|
@ -48,10 +48,40 @@ class MemberList:
|
|||
Yields a tuple containing :class:`GroupInfo` and
|
||||
the List[Presence] for the group.
|
||||
"""
|
||||
if not self.groups:
|
||||
return
|
||||
|
||||
for group in self.groups:
|
||||
yield group, self.data[group.gid]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Operation:
|
||||
"""Represents a member list operation."""
|
||||
list_op: str
|
||||
params: Dict[str, Any]
|
||||
|
||||
@property
|
||||
def to_dict(self) -> dict:
|
||||
res = {
|
||||
'op': self.list_op
|
||||
}
|
||||
|
||||
if self.list_op == 'SYNC':
|
||||
res['items'] = self.params['items']
|
||||
|
||||
if self.list_op in ('SYNC', 'INVALIDATE'):
|
||||
res['range'] = self.params['range']
|
||||
|
||||
if self.list_op in ('INSERT', 'DELETE', 'UPDATE'):
|
||||
res['index'] = self.params['index']
|
||||
|
||||
if self.list_op in ('INSERT', 'UPDATE'):
|
||||
res['item'] = self.params['item']
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def _to_simple_group(presence: dict) -> str:
|
||||
"""Return a simple group (not a role), given a presence."""
|
||||
return 'offline' if presence['status'] == 'offline' else 'online'
|
||||
|
|
@ -111,6 +141,13 @@ class GuildMemberList:
|
|||
"""Get the global :class:`StateManager` instance."""
|
||||
return self.main.app.state_manager
|
||||
|
||||
@property
|
||||
def list_id(self):
|
||||
"""get the id of the member list."""
|
||||
return ('everyone'
|
||||
if self.channel_id == self.guild_id
|
||||
else str(self.channel_id))
|
||||
|
||||
def _set_empty_list(self):
|
||||
"""Set the member list as being empty."""
|
||||
self.list = MemberList(None, None, None, None)
|
||||
|
|
@ -311,7 +348,7 @@ class GuildMemberList:
|
|||
"""Subscribe a shard to the member list."""
|
||||
await self._init_check()
|
||||
|
||||
async def unsub(self, session_id: str):
|
||||
def unsub(self, session_id: str):
|
||||
"""Unsubscribe a shard from the member list"""
|
||||
try:
|
||||
self.state.pop(session_id)
|
||||
|
|
@ -326,6 +363,41 @@ class GuildMemberList:
|
|||
if not self.state:
|
||||
self._set_empty_list()
|
||||
|
||||
def get_state(self, session_id: str):
|
||||
state = self.state_man.fetch_raw(session_id)
|
||||
|
||||
if not state:
|
||||
self.unsub(session_id)
|
||||
return
|
||||
|
||||
return state
|
||||
|
||||
async def _dispatch_sess(self, session_ids: List[str],
|
||||
operations: List[Operation]):
|
||||
|
||||
# construct the payload to dispatch
|
||||
payload = {
|
||||
'id': self.list_id,
|
||||
'guild_id': str(self.guild_id),
|
||||
|
||||
'groups': [
|
||||
{
|
||||
'count': len(presences),
|
||||
'id': group.gid
|
||||
} for group, presences in self.list
|
||||
],
|
||||
|
||||
'ops': [
|
||||
operation.to_dict
|
||||
for operation in operations
|
||||
]
|
||||
}
|
||||
|
||||
states = map(self.get_state, session_ids)
|
||||
for state in (s for s in states if s is not None):
|
||||
await state.ws.dispatch(
|
||||
'GUILD_MEMBER_LIST_UPDATE', payload)
|
||||
|
||||
async def shard_query(self, session_id: str, ranges: list):
|
||||
"""Send a GUILD_MEMBER_LIST_UPDATE event
|
||||
for a shard that is querying about the member list.
|
||||
|
|
@ -342,9 +414,7 @@ class GuildMemberList:
|
|||
|
||||
# a guild list with a channel id of the guild
|
||||
# represents the 'everyone' global list.
|
||||
list_id = ('everyone'
|
||||
if self.channel_id == self.guild_id
|
||||
else str(self.channel_id))
|
||||
list_id = self.list_id
|
||||
|
||||
# if everyone can read the channel,
|
||||
# we direct the request to the 'everyone' gml instance
|
||||
|
|
@ -365,25 +435,7 @@ class GuildMemberList:
|
|||
|
||||
await self._init_check()
|
||||
|
||||
# make sure this is a sane state
|
||||
state = self.state_man.fetch_raw(session_id)
|
||||
if not state:
|
||||
await self.unsub(session_id)
|
||||
return
|
||||
|
||||
reply = {
|
||||
'guild_id': str(self.guild_id),
|
||||
'id': list_id,
|
||||
|
||||
'groups': [
|
||||
{
|
||||
'count': len(presences),
|
||||
'id': group.gid
|
||||
} for group, presences in self.list
|
||||
],
|
||||
|
||||
'ops': [],
|
||||
}
|
||||
ops = []
|
||||
|
||||
for start, end in ranges:
|
||||
itemcount = end - start
|
||||
|
|
@ -394,19 +446,64 @@ class GuildMemberList:
|
|||
|
||||
self.state[session_id].add((start, end))
|
||||
|
||||
reply['ops'].append({
|
||||
'op': 'SYNC',
|
||||
ops.append(Operation('SYNC', {
|
||||
'range': [start, end],
|
||||
'items': self.items[start:end],
|
||||
})
|
||||
'items': self.items[start:end]
|
||||
}))
|
||||
|
||||
# the first GUILD_MEMBER_LIST_UPDATE for a shard
|
||||
# is dispatched here.
|
||||
await state.ws.dispatch('GUILD_MEMBER_LIST_UPDATE', reply)
|
||||
await self._dispatch_sess([session_id], ops)
|
||||
|
||||
async def pres_update(self, user_id: int, roles: List[str],
|
||||
status: str, game: dict) -> List[str]:
|
||||
return list(self.state)
|
||||
async def pres_update(self, user_id: int,
|
||||
partial_presence: Dict[str, Any]):
|
||||
"""Update a presence inside the member listlist."""
|
||||
await self._init_check()
|
||||
|
||||
for _group, presences in self.list:
|
||||
p_idx = index_by_func(
|
||||
lambda p: p['user']['id'] == str(user_id),
|
||||
presences)
|
||||
|
||||
if not p_idx:
|
||||
continue
|
||||
|
||||
presences[p_idx].update(partial_presence)
|
||||
|
||||
item_index = index_by_func(
|
||||
lambda p: p.get('user', {}).get('id') == str(user_id),
|
||||
self.items
|
||||
)
|
||||
|
||||
pprint.pprint(self.items)
|
||||
|
||||
if not item_index:
|
||||
log.warning('lazy guild got invalid pres update uid={}',
|
||||
user_id)
|
||||
return []
|
||||
|
||||
item = self.items[item_index]
|
||||
|
||||
def _is_in(sess_id):
|
||||
ranges = self.state[sess_id]
|
||||
|
||||
for range_start, range_end in ranges:
|
||||
if range_start <= item_index <= range_end:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
session_ids = filter(_is_in, self.state.keys())
|
||||
|
||||
await self._dispatch_sess(
|
||||
session_ids,
|
||||
[
|
||||
Operation('UPDATE', {
|
||||
'index': item_index,
|
||||
'item': item,
|
||||
})
|
||||
]
|
||||
)
|
||||
|
||||
return list(session_ids)
|
||||
|
||||
async def dispatch(self, event: str, data: Any):
|
||||
"""Modify the member list and dispatch the respective
|
||||
|
|
@ -473,4 +570,4 @@ class LazyGuildDispatcher(Dispatcher):
|
|||
|
||||
async def unsub(self, chan_id, session_id):
|
||||
gml = await self.get_gml(chan_id)
|
||||
await gml.unsub(session_id)
|
||||
gml.unsub(session_id)
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ def dict_get(mapping, key, default):
|
|||
def index_by_func(function, indexable: iter) -> int:
|
||||
"""Search in an idexable and return the index number
|
||||
for an iterm that has func(item) = True."""
|
||||
for index, item in indexable:
|
||||
for index, item in enumerate(indexable):
|
||||
if function(item):
|
||||
return index
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue