mirror of https://gitlab.com/litecord/litecord.git
dispatcher: add dispatch_filter
- presence: (basic) handle member lists when presence update.
Note that the respective GUILD_UPDATE_MEMBER_LIST doesn't happen
yet. we'll need roles beforehand.
This commit is contained in:
parent
cd4181c327
commit
2605836790
|
|
@ -106,6 +106,15 @@ class EventDispatcher:
|
||||||
for key in keys:
|
for key in keys:
|
||||||
await self.dispatch(backend_str, key, *args, **kwargs)
|
await self.dispatch(backend_str, key, *args, **kwargs)
|
||||||
|
|
||||||
|
async def dispatch_filter(self, backend_str: str,
|
||||||
|
key: Any, func, *args):
|
||||||
|
"""Dispatch to a backend that only accepts
|
||||||
|
(event, data) arguments with an optional filter
|
||||||
|
function."""
|
||||||
|
backend = self.backends[backend_str]
|
||||||
|
key = backend.KEY_TYPE(key)
|
||||||
|
return await backend.dispatch_filter(key, func, *args)
|
||||||
|
|
||||||
async def reset(self, backend_str: str, key: Any):
|
async def reset(self, backend_str: str, key: Any):
|
||||||
"""Reset the bucket in the given backend."""
|
"""Reset the bucket in the given backend."""
|
||||||
backend = self.backends[backend_str]
|
backend = self.backends[backend_str]
|
||||||
|
|
|
||||||
|
|
@ -685,26 +685,6 @@ class GatewayWebsocket:
|
||||||
}, ...
|
}, ...
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
# Implementation defails.
|
|
||||||
|
|
||||||
Lazy guilds are complicated to deal with in the backend level
|
|
||||||
as there are a lot of computation to be done for each request.
|
|
||||||
|
|
||||||
The current implementation is rudimentary and does not account
|
|
||||||
for any roles inside the guild.
|
|
||||||
|
|
||||||
A correct implementation would take account of roles and make
|
|
||||||
the correct groups on list_data:
|
|
||||||
|
|
||||||
For each channel in lazy_request['channels']:
|
|
||||||
- get all roles that have Read Messages on the channel:
|
|
||||||
- Also fetch their member counts, as it'll be important
|
|
||||||
- with the role list, order them like you normally would
|
|
||||||
(by their role priority)
|
|
||||||
- based on the channel's range's min and max and the ordered
|
|
||||||
role list, you can get the roles wanted for your list_data reply.
|
|
||||||
- make new groups ONLY when the role is hoisted.
|
|
||||||
"""
|
"""
|
||||||
data = payload['d']
|
data = payload['d']
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,11 @@
|
||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from random import choice
|
from random import choice
|
||||||
|
|
||||||
|
from logbook import Logger
|
||||||
from quart import current_app as app
|
from quart import current_app as app
|
||||||
|
|
||||||
|
log = Logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def status_cmp(status: str, other_status: str) -> bool:
|
def status_cmp(status: str, other_status: str) -> bool:
|
||||||
"""Compare if `status` is better than the `other_status`
|
"""Compare if `status` is better than the `other_status`
|
||||||
|
|
@ -100,11 +103,31 @@ class PresenceManager:
|
||||||
|
|
||||||
game = state['game']
|
game = state['game']
|
||||||
|
|
||||||
await self.dispatcher.dispatch_guild(
|
lazy_guild_store = self.dispatcher.backends['lazy_guild']
|
||||||
guild_id, 'PRESENCE_UPDATE', {
|
lists = lazy_guild_store.get_gml_guild(guild_id)
|
||||||
|
|
||||||
|
# shards that are in lazy guilds with 'everyone'
|
||||||
|
# enabled
|
||||||
|
in_lazy = []
|
||||||
|
|
||||||
|
for member_list in lists:
|
||||||
|
session_ids = await member_list.pres_update(
|
||||||
|
int(member['user']['id']),
|
||||||
|
member['roles'],
|
||||||
|
state['status'],
|
||||||
|
game
|
||||||
|
)
|
||||||
|
|
||||||
|
log.debug('Lazy Dispatch to {}',
|
||||||
|
len(session_ids))
|
||||||
|
|
||||||
|
if member_list.channel_id == 'everyone':
|
||||||
|
in_lazy.extend(session_ids)
|
||||||
|
|
||||||
|
pres_update_payload = {
|
||||||
'user': member['user'],
|
'user': member['user'],
|
||||||
'roles': member['roles'],
|
'roles': member['roles'],
|
||||||
'guild_id': guild_id,
|
'guild_id': str(guild_id),
|
||||||
|
|
||||||
'status': state['status'],
|
'status': state['status'],
|
||||||
|
|
||||||
|
|
@ -112,8 +135,18 @@ class PresenceManager:
|
||||||
'game': game,
|
'game': game,
|
||||||
'activities': [game] if game else []
|
'activities': [game] if game else []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# everyone not in lazy guild mode
|
||||||
|
# gets a PRESENCE_UPDATE
|
||||||
|
await self.dispatcher.dispatch_filter(
|
||||||
|
'guild', guild_id,
|
||||||
|
lambda session_id: session_id not in in_lazy,
|
||||||
|
|
||||||
|
'PRESENCE_UPDATE', pres_update_payload
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return in_lazy
|
||||||
|
|
||||||
async def dispatch_pres(self, user_id: int, state: dict):
|
async def dispatch_pres(self, user_id: int, state: dict):
|
||||||
"""Dispatch a new presence to all guilds the user is in.
|
"""Dispatch a new presence to all guilds the user is in.
|
||||||
|
|
||||||
|
|
@ -122,10 +155,12 @@ class PresenceManager:
|
||||||
if state['status'] == 'invisible':
|
if state['status'] == 'invisible':
|
||||||
state['status'] = 'offline'
|
state['status'] = 'offline'
|
||||||
|
|
||||||
|
# TODO: shard-aware
|
||||||
guild_ids = await self.storage.get_user_guilds(user_id)
|
guild_ids = await self.storage.get_user_guilds(user_id)
|
||||||
|
|
||||||
for guild_id in guild_ids:
|
for guild_id in guild_ids:
|
||||||
await self.dispatch_guild_pres(guild_id, user_id, state)
|
await self.dispatch_guild_pres(
|
||||||
|
guild_id, user_id, state)
|
||||||
|
|
||||||
# dispatch to all friends that are subscribed to them
|
# dispatch to all friends that are subscribed to them
|
||||||
user = await self.storage.get_user(user_id)
|
user = await self.storage.get_user(user_id)
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,14 @@ class Dispatcher:
|
||||||
"""Unsubscribe an elemtnt from the channel/key."""
|
"""Unsubscribe an elemtnt from the channel/key."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def dispatch_filter(self, _key, _func, *_args):
|
||||||
|
"""Selectively dispatch to the list of subscribed users.
|
||||||
|
|
||||||
|
The selection logic is completly arbitraty and up to the
|
||||||
|
Pub/Sub backend.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
async def dispatch(self, _key, *_args):
|
async def dispatch(self, _key, *_args):
|
||||||
"""Dispatch an event to the given channel/key."""
|
"""Dispatch an event to the given channel/key."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
|
||||||
|
|
@ -55,9 +55,10 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
# same thing happening from sub() happens on unsub()
|
# same thing happening from sub() happens on unsub()
|
||||||
await self._chan_action('unsub', guild_id, user_id)
|
await self._chan_action('unsub', guild_id, user_id)
|
||||||
|
|
||||||
async def dispatch(self, guild_id: int,
|
async def dispatch_filter(self, guild_id: int, func,
|
||||||
event: str, data: Any):
|
event: str, data: Any):
|
||||||
"""Dispatch an event to all subscribers of the guild."""
|
"""Selectively dispatch to session ids that have
|
||||||
|
func(session_id) true."""
|
||||||
user_ids = self.state[guild_id]
|
user_ids = self.state[guild_id]
|
||||||
dispatched = 0
|
dispatched = 0
|
||||||
|
|
||||||
|
|
@ -74,8 +75,22 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
await self.unsub(guild_id, user_id)
|
await self.unsub(guild_id, user_id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# filter the ones that matter
|
||||||
|
states = list(filter(
|
||||||
|
lambda state: func(state.session_id), states
|
||||||
|
))
|
||||||
|
|
||||||
dispatched += await self._dispatch_states(
|
dispatched += await self._dispatch_states(
|
||||||
states, event, data)
|
states, event, data)
|
||||||
|
|
||||||
log.info('Dispatched {} {!r} to {} states',
|
log.info('Dispatched {} {!r} to {} states',
|
||||||
guild_id, event, dispatched)
|
guild_id, event, dispatched)
|
||||||
|
|
||||||
|
async def dispatch(self, guild_id: int,
|
||||||
|
event: str, data: Any):
|
||||||
|
"""Dispatch an event to all subscribers of the guild."""
|
||||||
|
await self.dispatch_filter(
|
||||||
|
guild_id,
|
||||||
|
lambda sess_id: True,
|
||||||
|
event, data,
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -256,6 +256,9 @@ class GuildMemberList:
|
||||||
# subscribe the state to this list
|
# subscribe the state to this list
|
||||||
await self.sub(session_id)
|
await self.sub(session_id)
|
||||||
|
|
||||||
|
# TODO: subscribe shard to 'everyone'
|
||||||
|
# and forward the query to that list
|
||||||
|
|
||||||
reply = {
|
reply = {
|
||||||
'guild_id': str(self.guild_id),
|
'guild_id': str(self.guild_id),
|
||||||
|
|
||||||
|
|
@ -290,8 +293,14 @@ class GuildMemberList:
|
||||||
'items': items[start:end],
|
'items': items[start:end],
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# the first GUILD_MEMBER_LIST_UPDATE for a shard
|
||||||
|
# is dispatched here.
|
||||||
await state.ws.dispatch('GUILD_MEMBER_LIST_UPDATE', reply)
|
await state.ws.dispatch('GUILD_MEMBER_LIST_UPDATE', reply)
|
||||||
|
|
||||||
|
async def pres_update(self, user_id: int, roles: List[str],
|
||||||
|
status: str, game: dict) -> List[str]:
|
||||||
|
return list(self.state)
|
||||||
|
|
||||||
async def dispatch(self, event: str, data: Any):
|
async def dispatch(self, event: str, data: Any):
|
||||||
"""The dispatch() method here, instead of being
|
"""The dispatch() method here, instead of being
|
||||||
about dispatching a single event to the subscribed
|
about dispatching a single event to the subscribed
|
||||||
|
|
@ -328,7 +337,14 @@ class LazyGuildDispatcher(Dispatcher):
|
||||||
# {chan_id: gml, ...}
|
# {chan_id: gml, ...}
|
||||||
self.state = {}
|
self.state = {}
|
||||||
|
|
||||||
|
#: store which guilds have their
|
||||||
|
# respective GMLs
|
||||||
|
# {guild_id: [chan_id, ...], ...}
|
||||||
|
self.guild_map = defaultdict(list)
|
||||||
|
|
||||||
async def get_gml(self, channel_id: int):
|
async def get_gml(self, channel_id: int):
|
||||||
|
"""Get a guild list for a channel ID,
|
||||||
|
generating it if it doesn't exist."""
|
||||||
try:
|
try:
|
||||||
return self.state[channel_id]
|
return self.state[channel_id]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
@ -338,8 +354,16 @@ class LazyGuildDispatcher(Dispatcher):
|
||||||
|
|
||||||
gml = GuildMemberList(guild_id, channel_id, self)
|
gml = GuildMemberList(guild_id, channel_id, self)
|
||||||
self.state[channel_id] = gml
|
self.state[channel_id] = gml
|
||||||
|
self.guild_map[guild_id].append(channel_id)
|
||||||
return gml
|
return gml
|
||||||
|
|
||||||
|
def get_gml_guild(self, guild_id: int) -> List[GuildMemberList]:
|
||||||
|
"""Get all member lists for a given guild."""
|
||||||
|
return list(map(
|
||||||
|
self.state.get,
|
||||||
|
self.guild_map[guild_id]
|
||||||
|
))
|
||||||
|
|
||||||
async def sub(self, chan_id, session_id):
|
async def sub(self, chan_id, session_id):
|
||||||
gml = await self.get_gml(chan_id)
|
gml = await self.get_gml(chan_id)
|
||||||
await gml.sub(session_id)
|
await gml.sub(session_id)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue