diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index 6db0a59..009ac5e 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -10,7 +10,21 @@ log = Logger(__name__) class EventDispatcher: - """Pub/Sub routines for litecord.""" + """Pub/Sub routines for litecord. + + EventDispatcher is the middle man between + REST code and gateway event logic. + + It sets up Pub/Sub backends and each of them + have their own ways of dispatching a single event. + + "key" and "identifier" are the "channel" and "subscriber id" + of pub/sub. clients can subscribe to a channel using its backend + and the key inside the backend. + + when dispatching, the backend can do its own logic, given + its subscriber ids. + """ def __init__(self, app): self.state_manager = app.state_manager self.app = app @@ -24,10 +38,14 @@ class EventDispatcher: } async def action(self, backend_str: str, action: str, key, identifier): - """Send an action regarding a key/identifier pair to a backend.""" - backend = self.backends[backend_str] - method = getattr(backend, f'{action}') + """Send an action regarding a key/identifier pair to a backend. + Action is usually "sub" or "unsub". + """ + backend = self.backends[backend_str] + method = getattr(backend, action) + + # convert keys to the types the backend wants key = backend.KEY_TYPE(key) identifier = backend.VAL_TYPE(identifier) @@ -48,17 +66,32 @@ class EventDispatcher: return await self.action(backend, 'unsub', key, identifier) async def sub(self, backend, key, identifier): + """Alias to subscribe().""" return await self.subscribe(backend, key, identifier) async def unsub(self, backend, key, identifier): + """Alias to unsubscribe().""" return await self.unsubscribe(backend, key, identifier) + async def sub_many(self, backend_str: str, identifier: Any, keys: list): + """Subscribe to multiple channels (all in a single backend) + at a time. + + Usually used when connecting to the gateway and the client + needs to subscribe to all their guids. + """ + for key in keys: + await self.subscribe(backend_str, key, identifier) + async def dispatch(self, backend_str: str, key: Any, *args, **kwargs): """Dispatch an event to the backend. - The backend is responsible for everything regarding the dispatch. + The backend is responsible for everything regarding the + actual dispatch. """ backend = self.backends[backend_str] + + # convert types key = backend.KEY_TYPE(key) return await backend.dispatch(key, *args, **kwargs) @@ -67,6 +100,7 @@ class EventDispatcher: """Dispatch to multiple keys in a single backend.""" log.info('MULTI DISPATCH: {!r}, {} keys', backend_str, len(keys)) + for key in keys: await self.dispatch(backend_str, key, *args, **kwargs) @@ -83,20 +117,14 @@ class EventDispatcher: key = backend.KEY_TYPE(key) return await backend.remove(key) - async def sub_many(self, backend_str: str, identifier: Any, keys: list): - """Subscribe to many buckets inside a single backend - at a time.""" - for key in keys: - await self.subscribe(backend_str, key, identifier) - async def dispatch_guild(self, guild_id, event, data): - """Backwards compatibility.""" + """Backwards compatibility with old EventDispatcher.""" return await self.dispatch('guild', guild_id, event, data) async def dispatch_user_guild(self, user_id, guild_id, event, data): - """Backwards compatibility.""" + """Backwards compatibility with old EventDispatcher.""" return await self.dispatch('member', (guild_id, user_id), event, data) async def dispatch_user(self, user_id, event, data): - """Backwards compatibility.""" + """Backwards compatibility with old EventDispatcher.""" return await self.dispatch('user', user_id, event, data) diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 9991510..3c3cb2c 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -16,20 +16,33 @@ class ChannelDispatcher(DispatcherWithState): async def dispatch(self, channel_id, event: str, data: Any): """Dispatch an event to a channel.""" + # get everyone who is subscribed + # and store the number of states we dispatched the event to user_ids = self.state[channel_id] dispatched = 0 + # making a copy of user_ids since + # we'll modify it later on. for user_id in set(user_ids): guild_id = await self.app.storage.guild_from_channel(channel_id) + # if we are dispatching to a guild channel, + # we should only dispatch to the states / shards + # that are connected to the guild (via their shard id). + + # if we aren't, we just get all states tied to the user. + # TODO: make a fetch_states that fetches shards + # - with id 0 (count any) OR + # - single shards (id=0, count=1) states = (self.sm.fetch_states(user_id, guild_id) if guild_id else - - # TODO: use a fetch_states with guild_id 0 - # or maybe something to fetch all shards - # with id 0 and single shards self.sm.user_states(user_id)) + # unsub people who don't have any states tied to the channel. + if not states: + await self.unsub(channel_id, user_id) + continue + dispatched += await self._dispatch_states(states, event, data) log.info('Dispatched chan={} {!r} to {} states', diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index 0eaad6a..f65104d 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -1,19 +1,32 @@ +""" +litecord.pubsub.dispatcher: main dispatcher class +""" from collections import defaultdict + from logbook import Logger log = Logger(__name__) class Dispatcher: - """Pub/Sub backend dispatcher.""" + """Pub/Sub backend dispatcher. + + This just declares functions all Dispatcher subclasses + can implement. This does not mean all Dispatcher + subclasses have them implemented. + """ # the _ parameter is for (self) KEY_TYPE = lambda _, x: x VAL_TYPE = lambda _, x: x def __init__(self, main): + #: main EventDispatcher self.main_dispatcher = main + + #: gateway state storage self.sm = main.state_manager + self.app = main.app async def sub(self, _key, _id): @@ -66,6 +79,10 @@ class DispatcherWithState(Dispatcher): def __init__(self, main): super().__init__(main) + #: the default dict is to a set + # so we make sure someone calling sub() + # twice won't get 2x the events for the + # same channel. self.state = defaultdict(set) async def sub(self, key, identifier): diff --git a/litecord/pubsub/friend.py b/litecord/pubsub/friend.py index eef5cfe..d80ac6d 100644 --- a/litecord/pubsub/friend.py +++ b/litecord/pubsub/friend.py @@ -6,15 +6,25 @@ log = Logger(__name__) class FriendDispatcher(DispatcherWithState): + """Friend Pub/Sub logic. + + When connecting, a client will subscribe to all their friends + channels. If that friend updates their presence, it will be + broadcasted through that channel to basically all their friends. + """ KEY_TYPE = int VAL_TYPE = int async def dispatch(self, user_id: int, event, data): """Dispatch an event to all of a users' friends.""" + # all friends that are connected and subscribed + # to the one we're dispatching from peer_ids = self.state[user_id] dispatched = 0 for peer_id in peer_ids: + # dispatch to the user instead of the "shards tied to a guild" + # since relationships broadcast to all shards. dispatched += await self.main_dispatcher.dispatch( 'user', peer_id, event, data) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index add72f2..a05373a 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -14,6 +14,7 @@ class GuildDispatcher(DispatcherWithState): VAL_TYPE = int async def _chan_action(self, action: str, guild_id: int, user_id: int): + """Send an action to all channels of the guild.""" chan_ids = await self.app.storage.get_channel_ids(guild_id) # TODO: check READ_MESSAGE permissions for the user @@ -27,7 +28,10 @@ class GuildDispatcher(DispatcherWithState): ) async def _chan_call(self, meth: str, guild_id: int, *args): + """Call a method on the ChannelDispatcher, for all channels + in the guild.""" chan_ids = await self.app.storage.get_channel_ids(guild_id) + chan_dispatcher = self.main_dispatcher.backends['channel'] method = getattr(chan_dispatcher, meth) @@ -48,6 +52,8 @@ class GuildDispatcher(DispatcherWithState): async def unsub(self, guild_id: int, user_id: int): """Unsubscribe a user from the guild.""" await super().unsub(guild_id, user_id) + + # same thing happening from sub() happens on unsub() await self._chan_action('unsub', guild_id, user_id) async def dispatch(self, guild_id: int, @@ -56,11 +62,11 @@ class GuildDispatcher(DispatcherWithState): user_ids = self.state[guild_id] dispatched = 0 - # acquire a copy since we will be modifying + # acquire a copy since we may be modifying # the original user_ids for user_id in set(user_ids): - # fetch all states related to the user id and guild id. + # fetch all states / shards that are tied to the guild. states = self.sm.fetch_states(user_id, guild_id) if not states: diff --git a/litecord/pubsub/member.py b/litecord/pubsub/member.py index b997417..8b6f351 100644 --- a/litecord/pubsub/member.py +++ b/litecord/pubsub/member.py @@ -10,15 +10,17 @@ class MemberDispatcher(Dispatcher): This is shard-aware. """ + # we don't keep any state on this dispatcher, so the key + # is just (guild_id, user_id) guild_id, user_id = key # fetch shards states = self.sm.fetch_states(user_id, guild_id) # if no states were found, we should - # unsub the user from the channel + # unsub the user from the GUILD channel if not states: await self.main_dispatcher.unsub('guild', guild_id, user_id) return - await self._dispatch_states(states, event, data) + return await self._dispatch_states(states, event, data)