diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index e7e392b..10d2f7a 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -106,6 +106,15 @@ class EventDispatcher: for key in keys: await self.dispatch(backend_str, key, *args, **kwargs) + async def dispatch_filter(self, backend_str: str, + key: Any, func, *args): + """Dispatch to a backend that only accepts + (event, data) arguments with an optional filter + function.""" + backend = self.backends[backend_str] + key = backend.KEY_TYPE(key) + return await backend.dispatch_filter(key, func, *args) + async def reset(self, backend_str: str, key: Any): """Reset the bucket in the given backend.""" backend = self.backends[backend_str] diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 7fce369..6ad0b22 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -685,26 +685,6 @@ class GatewayWebsocket: }, ... ] } - - # Implementation defails. - - Lazy guilds are complicated to deal with in the backend level - as there are a lot of computation to be done for each request. - - The current implementation is rudimentary and does not account - for any roles inside the guild. - - A correct implementation would take account of roles and make - the correct groups on list_data: - - For each channel in lazy_request['channels']: - - get all roles that have Read Messages on the channel: - - Also fetch their member counts, as it'll be important - - with the role list, order them like you normally would - (by their role priority) - - based on the channel's range's min and max and the ordered - role list, you can get the roles wanted for your list_data reply. - - make new groups ONLY when the role is hoisted. """ data = payload['d'] diff --git a/litecord/presence.py b/litecord/presence.py index 3666f3e..8e107c9 100644 --- a/litecord/presence.py +++ b/litecord/presence.py @@ -1,8 +1,11 @@ from typing import List, Dict, Any from random import choice +from logbook import Logger from quart import current_app as app +log = Logger(__name__) + def status_cmp(status: str, other_status: str) -> bool: """Compare if `status` is better than the `other_status` @@ -100,20 +103,50 @@ class PresenceManager: game = state['game'] - await self.dispatcher.dispatch_guild( - guild_id, 'PRESENCE_UPDATE', { - 'user': member['user'], - 'roles': member['roles'], - 'guild_id': guild_id, + lazy_guild_store = self.dispatcher.backends['lazy_guild'] + lists = lazy_guild_store.get_gml_guild(guild_id) - 'status': state['status'], + # shards that are in lazy guilds with 'everyone' + # enabled + in_lazy = [] - # rich presence stuff - 'game': game, - 'activities': [game] if game else [] - } + for member_list in lists: + session_ids = await member_list.pres_update( + int(member['user']['id']), + member['roles'], + state['status'], + game + ) + + log.debug('Lazy Dispatch to {}', + len(session_ids)) + + if member_list.channel_id == 'everyone': + in_lazy.extend(session_ids) + + pres_update_payload = { + 'user': member['user'], + 'roles': member['roles'], + 'guild_id': str(guild_id), + + 'status': state['status'], + + # rich presence stuff + 'game': game, + 'activities': [game] if game else [] + } + + # everyone not in lazy guild mode + # gets a PRESENCE_UPDATE + await self.dispatcher.dispatch_filter( + 'guild', guild_id, + lambda session_id: session_id not in in_lazy, + + 'PRESENCE_UPDATE', pres_update_payload ) + return in_lazy + async def dispatch_pres(self, user_id: int, state: dict): """Dispatch a new presence to all guilds the user is in. @@ -122,10 +155,12 @@ class PresenceManager: if state['status'] == 'invisible': state['status'] = 'offline' + # TODO: shard-aware guild_ids = await self.storage.get_user_guilds(user_id) for guild_id in guild_ids: - await self.dispatch_guild_pres(guild_id, user_id, state) + await self.dispatch_guild_pres( + guild_id, user_id, state) # dispatch to all friends that are subscribed to them user = await self.storage.get_user(user_id) diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index f65104d..c9da2e7 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -37,6 +37,14 @@ class Dispatcher: """Unsubscribe an elemtnt from the channel/key.""" raise NotImplementedError + async def dispatch_filter(self, _key, _func, *_args): + """Selectively dispatch to the list of subscribed users. + + The selection logic is completly arbitraty and up to the + Pub/Sub backend. + """ + raise NotImplementedError + async def dispatch(self, _key, *_args): """Dispatch an event to the given channel/key.""" raise NotImplementedError diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 6896d93..6613fcb 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -55,9 +55,10 @@ class GuildDispatcher(DispatcherWithState): # same thing happening from sub() happens on unsub() await self._chan_action('unsub', guild_id, user_id) - async def dispatch(self, guild_id: int, - event: str, data: Any): - """Dispatch an event to all subscribers of the guild.""" + async def dispatch_filter(self, guild_id: int, func, + event: str, data: Any): + """Selectively dispatch to session ids that have + func(session_id) true.""" user_ids = self.state[guild_id] dispatched = 0 @@ -74,8 +75,22 @@ class GuildDispatcher(DispatcherWithState): await self.unsub(guild_id, user_id) continue + # filter the ones that matter + states = list(filter( + lambda state: func(state.session_id), states + )) + dispatched += await self._dispatch_states( states, event, data) log.info('Dispatched {} {!r} to {} states', guild_id, event, dispatched) + + async def dispatch(self, guild_id: int, + event: str, data: Any): + """Dispatch an event to all subscribers of the guild.""" + await self.dispatch_filter( + guild_id, + lambda sess_id: True, + event, data, + ) diff --git a/litecord/pubsub/lazy_guild.py b/litecord/pubsub/lazy_guild.py index ef7e0b2..0bcf1f5 100644 --- a/litecord/pubsub/lazy_guild.py +++ b/litecord/pubsub/lazy_guild.py @@ -256,6 +256,9 @@ class GuildMemberList: # subscribe the state to this list await self.sub(session_id) + # TODO: subscribe shard to 'everyone' + # and forward the query to that list + reply = { 'guild_id': str(self.guild_id), @@ -290,8 +293,14 @@ class GuildMemberList: 'items': items[start:end], }) + # the first GUILD_MEMBER_LIST_UPDATE for a shard + # is dispatched here. await state.ws.dispatch('GUILD_MEMBER_LIST_UPDATE', reply) + async def pres_update(self, user_id: int, roles: List[str], + status: str, game: dict) -> List[str]: + return list(self.state) + async def dispatch(self, event: str, data: Any): """The dispatch() method here, instead of being about dispatching a single event to the subscribed @@ -328,7 +337,14 @@ class LazyGuildDispatcher(Dispatcher): # {chan_id: gml, ...} self.state = {} + #: store which guilds have their + # respective GMLs + # {guild_id: [chan_id, ...], ...} + self.guild_map = defaultdict(list) + async def get_gml(self, channel_id: int): + """Get a guild list for a channel ID, + generating it if it doesn't exist.""" try: return self.state[channel_id] except KeyError: @@ -338,8 +354,16 @@ class LazyGuildDispatcher(Dispatcher): gml = GuildMemberList(guild_id, channel_id, self) self.state[channel_id] = gml + self.guild_map[guild_id].append(channel_id) return gml + def get_gml_guild(self, guild_id: int) -> List[GuildMemberList]: + """Get all member lists for a given guild.""" + return list(map( + self.state.get, + self.guild_map[guild_id] + )) + async def sub(self, chan_id, session_id): gml = await self.get_gml(chan_id) await gml.sub(session_id)