From aa76cc2c7d94afd2689aee8750e2f6f64fe1249c Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Tue, 9 Oct 2018 18:56:34 -0300 Subject: [PATCH] litecord.dispatcher: change dispatch_* methods into pubsub backends - litecord: add pubsub module - schemas: change type to snowflake in MESSAGE_CREATE's nonce --- litecord/dispatcher.py | 97 +++++++++++++---------------------- litecord/pubsub/__init__.py | 3 ++ litecord/pubsub/dispatcher.py | 34 ++++++++++++ litecord/pubsub/guild.py | 51 ++++++++++++++++++ litecord/pubsub/member.py | 20 ++++++++ litecord/pubsub/user.py | 9 ++++ litecord/schemas.py | 2 +- 7 files changed, 155 insertions(+), 61 deletions(-) create mode 100644 litecord/pubsub/__init__.py create mode 100644 litecord/pubsub/dispatcher.py create mode 100644 litecord/pubsub/guild.py create mode 100644 litecord/pubsub/member.py create mode 100644 litecord/pubsub/user.py diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index e949bf6..224cccf 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -3,6 +3,9 @@ from typing import Any from logbook import Logger +from .pubsub import GuildDispatcher, MemberDispatcher, \ + UserDispatcher + log = Logger(__name__) @@ -10,74 +13,48 @@ class EventDispatcher: """Pub/Sub routines for litecord.""" def __init__(self, sm): self.state_manager = sm - self.guild_buckets = collections.defaultdict(set) - def sub_guild(self, guild_id: int, user_id: int): - """Subscribe to a guild's events, given the user ID.""" - self.guild_buckets[guild_id].add(user_id) + self.backends = { + 'guild': GuildDispatcher(self), + 'member': MemberDispatcher(self), + 'user': UserDispatcher(self), + } - def unsub_guild(self, guild_id: int, user_id: int): - """Unsubscribe from a guild, given user ID""" - self.guild_buckets[guild_id].discard(user_id) + async def action(self, backend_str: str, action: str, key, identifier): + """Send an action regarding a key/identifier pair to a backend.""" + backend = self.backends[backend_str] + method = getattr(backend, f'{action}') - def remove_guild(self, guild_id): - """Reset the guild bucket.""" - self.guild_buckets[guild_id] = set() + key = backend.KEY_TYPE(key) + identifier = backend.VAL_TYPE(identifier) - def sub_many(self, user_id: int, guild_ids: list): - """Subscribe to many guilds at a time.""" - for guild_id in guild_ids: - self.sub_guild(guild_id, user_id) + return await method(key, identifier) - async def dispatch_guild(self, guild_id: int, - event_name: str, event_payload: Any): - """Dispatch an event to a guild""" - users = self.guild_buckets[guild_id] - dispatched = 0 + async def subscribe(self, backend: str, key: Any, identifier: Any): + """Subscribe a single element to the given backend.""" + return await self.action(backend, 'sub', key, identifier) - log.debug('Dispatching {} {!r} to {} users', - guild_id, event_name, len(users)) + async def unsubscribe(self, backend: str, key: Any, identifier: Any): + """Unsubscribe an element from the given backend.""" + return await self.action(backend, 'unsub', key, identifier) - for user_id in set(users): - # fetch all connections that are tied to the guild, - # this includes all connections that are just a single shard - # and all shards that are nicely working - states = self.state_manager.fetch_states(user_id, guild_id) + async def dispatch(self, backend_str: str, key: Any, *args, **kwargs): + """Dispatch an event to the backend. - # if there are no more states tied to the guild, - # why keep the user as a subscriber? - if not states: - self.unsub_guild(guild_id, user_id) - continue - - # for each reasonable state/shard, dispatch event - for state in states: - # NOTE: maybe a separate task for that async? - await state.ws.dispatch(event_name, event_payload) - dispatched += 1 - - log.info('Dispatched {} {!r} to {} states', - guild_id, event_name, dispatched) - - async def _dispatch_states(self, states: list, event: str, data: Any): - for state in states: - await state.ws.dispatch(event, data) - - async def dispatch_user_guild(self, user_id: int, guild_id: int, - event: str, data: Any): - """Dispatch a single event to a user inside a guild. - - The difference between dispatch_user and dispatch_user_guild - is sharding management happening here, via StateManager.fetch_states + The backend is responsible for everything regarding the dispatch. """ - states = self.state_manager.fetch_states(user_id, guild_id) + backend = self.backends[backend_str] + key = backend.KEY_TYPE(key) + return await backend._dispatch(key, *args, **kwargs) - if not states: - self.unsub_guild(guild_id, user_id) + async def reset(self, backend_str: str, key: Any): + """Reset the bucket in the given backend.""" + backend = self.backends[backend_str] + key = backend.KEY_TYPE(key) + return await backend._reset(key) - await self._dispatch_states(states, event, data) - - async def dispatch_user(self, user_id: int, event: str, data: Any): - """Dispatch an event to a single user.""" - states = self.state_manager.user_states(user_id) - await self._dispatch_states(states, event, data) + async def sub_many(self, backend_str: str, identifier: Any, keys: list): + """Subscribe to many buckets inside a single backend + at a time.""" + for key in keys: + await self.subscribe(backend_str, key, identifier) diff --git a/litecord/pubsub/__init__.py b/litecord/pubsub/__init__.py new file mode 100644 index 0000000..9586ed9 --- /dev/null +++ b/litecord/pubsub/__init__.py @@ -0,0 +1,3 @@ +from .guild import GuildDispatcher +from .member import MemberDispatcher +from .user import UserDispatcher diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py new file mode 100644 index 0000000..8c93d67 --- /dev/null +++ b/litecord/pubsub/dispatcher.py @@ -0,0 +1,34 @@ +from logbook import Logger + +log = Logger(__name__) + + +class Dispatcher: + """Main dispatcher class.""" + KEY_TYPE = lambda x: x + VAL_TYPE = lambda x: x + + def __init__(self, main): + self.main_dispatcher = main + self.sm = main.state_manager + + async def sub(self, _key, _id): + raise NotImplementedError + + async def unsub(self, _key, _id): + raise NotImplementedError + + async def dispatch(self, _key, *_args, **_kwargs): + raise NotImplementedError + + async def _dispatch_states(self, states: list, event: str, data) -> int: + dispatched = 0 + + for state in states: + try: + await state.ws.dispatch(event, data) + dispatched += 1 + except: + log.exception('error while dispatching') + + return dispatched diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py new file mode 100644 index 0000000..c6f0ecf --- /dev/null +++ b/litecord/pubsub/guild.py @@ -0,0 +1,51 @@ +from collections import defaultdict +from typing import Any + +from logbook import Logger + +from .dispatcher import Dispatcher + +log = Logger(__name__) + + +class GuildDispatcher(Dispatcher): + """Guild backend for Pub/Sub""" + KEY_TYPE = int + VAL_TYPE = int + + def __init__(self, main): + super().__init__(main) + self.guild_buckets = defaultdict(set) + + async def sub(self, guild_id: int, user_id: int): + self.guild_buckets[guild_id].add(user_id) + + async def unsub(self, guild_id: int, user_id: int): + self.guild_buckets[guild_id].discard(user_id) + + async def reset(self, guild_id: int): + self.guild_buckets[guild_id] = set() + + async def dispatch(self, guild_id: int, + event_name: str, event_payload: Any): + user_ids = self.guild_buckets[guild_id] + dispatched = 0 + + # acquire a copy since we will be modifying + # the original user_ids + for user_id in set(user_ids): + + # fetch all states related to the user id and guild id. + states = self.sm.fetch_states(user_id, guild_id) + + if not states: + # user is actually disconnected, + # so we should just unsub it + await self._unsub(guild_id, user_id) + continue + + dispatched += await self._dispatch_states( + states, event_name, event_payload) + + log.info('Dispatched {} {!r} to {} states', + guild_id, event_name, dispatched) diff --git a/litecord/pubsub/member.py b/litecord/pubsub/member.py new file mode 100644 index 0000000..328f452 --- /dev/null +++ b/litecord/pubsub/member.py @@ -0,0 +1,20 @@ +from .dispatcher import Dispatcher + + +class MemberDispatcher(Dispatcher): + KEY_TYPE = int + VAL_TYPE = int + + async def dispatch(self, guild_id: int, user_id: int, event, data): + """Dispatch a single event to a member. + + This is shard-aware. + """ + # fetch shards + states = self.sm.fetch_states(user_id, guild_id) + + if not states: + await self.main_dispatcher.unsub('guild', guild_id, user_id) + return + + await self._dispatch_states(states, event, data) diff --git a/litecord/pubsub/user.py b/litecord/pubsub/user.py new file mode 100644 index 0000000..46b3526 --- /dev/null +++ b/litecord/pubsub/user.py @@ -0,0 +1,9 @@ +from .dispatcher import Dispatcher + + +class UserDispatcher(Dispatcher): + KEY_TYPE = int + + async def dispatch(self, user_id: int, event, data): + states = self.sm.user_states(user_id) + return await self._dispatch_states(states, event, data) diff --git a/litecord/schemas.py b/litecord/schemas.py index d265e47..b4ecdf9 100644 --- a/litecord/schemas.py +++ b/litecord/schemas.py @@ -123,7 +123,7 @@ MEMBER_UPDATE = { MESSAGE_CREATE = { 'content': {'type': 'string', 'minlength': 1, 'maxlength': 2000}, - 'nonce': {'type': 'string', 'required': False}, + 'nonce': {'type': 'snowflake', 'required': False}, 'tts': {'type': 'boolean', 'required': False}, # TODO: file, embed, payload_json