From bbea185a7d0ae095fd38389852b862b0c7fe12ea Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:29:51 -0300 Subject: [PATCH 01/13] gateway.websocket: add basic getting of guild_subscriptions field --- litecord/gateway/websocket.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 365163c..af660f2 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -421,7 +421,7 @@ class GatewayWebsocket: return list(filtered) - async def subscribe_all(self): + async def subscribe_all(self, guild_subscriptions: bool): """Subscribe to all guilds, DM channels, and friends. Note: subscribing to channels is already handled @@ -441,6 +441,8 @@ class GatewayWebsocket: log.info('subscribing to {} dms', len(dm_ids)) log.info('subscribing to {} group dms', len(gdm_ids)) + # TODO(gw-guild-subscriptions) + # make a channel:typing and guild:presence subchannels await self.ext.dispatcher.mass_sub(user_id, [ ('guild', guild_ids), ('channel', dm_ids), @@ -573,7 +575,7 @@ class GatewayWebsocket: self.ext.state_manager.insert(self.state) await self.update_status(presence) - await self.subscribe_all() + await self.subscribe_all(data.get('guild_subscriptions', True)) await self.dispatch_ready() async def handle_3(self, payload: Dict[str, Any]): From e2cb49669edd04bb73741fdabc10f206e97e71be Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:46:59 -0300 Subject: [PATCH 02/13] dispatcher: add basics of subscription flags --- litecord/dispatcher.py | 41 ++++++++++++++++++++++++++--------- litecord/gateway/websocket.py | 24 ++++++++++++-------- litecord/pubsub/dispatcher.py | 2 +- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index 8d38303..29748a9 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -17,7 +17,7 @@ along with this program. If not, see . """ -from typing import List, Any +from typing import List, Any, Dict from logbook import Logger @@ -57,7 +57,7 @@ class EventDispatcher: 'lazy_guild': LazyGuildDispatcher(self), } - async def action(self, backend_str: str, action: str, key, identifier): + async def action(self, backend_str: str, action: str, key, identifier, *args): """Send an action regarding a key/identifier pair to a backend. Action is usually "sub" or "unsub". @@ -69,13 +69,24 @@ class EventDispatcher: key = backend.KEY_TYPE(key) identifier = backend.VAL_TYPE(identifier) - return await method(key, identifier) + return await method(key, identifier, *args) - async def subscribe(self, backend: str, key: Any, identifier: Any): + async def subscribe(self, backend: str, key: Any, identifier: Any, + flags: Dict[str, Any] = None): """Subscribe a single element to the given backend.""" + flags = flags or {} + log.debug('SUB backend={} key={} <= id={}', backend, key, identifier, backend) + # this is a hacky solution for backwards compatibility between backends + # that implement flags and backends that don't. + + # passing flags to backends that don't implement flags will + # cause errors as expected. + if flags: + return await self.action(backend, 'sub', key, identifier, flags) + return await self.action(backend, 'sub', key, identifier) async def unsubscribe(self, backend: str, key: Any, identifier: Any): @@ -93,24 +104,34 @@ class EventDispatcher: """Alias to unsubscribe().""" return await self.unsubscribe(backend, key, identifier) - async def sub_many(self, backend_str: str, identifier: Any, keys: list): + async def sub_many(self, backend_str: str, identifier: Any, + keys: list, flags: Dict[str, Any] = None): """Subscribe to multiple channels (all in a single backend) at a time. Usually used when connecting to the gateway and the client needs to subscribe to all their guids. """ + flags = flags or {} for key in keys: - await self.subscribe(backend_str, key, identifier) + await self.subscribe(backend_str, key, identifier, flags) async def mass_sub(self, identifier: Any, backends: List[tuple]): """Mass subscribe to many backends at once.""" - for backend_str, keys in backends: - log.debug('subscribing {} to {} keys in backend {}', - identifier, len(keys), backend_str) + for bcall in backends: + backend_str, keys = bcall[0], bcall[1] - await self.sub_many(backend_str, identifier, keys) + if len(bcall) == 2: + flags = {} + elif len(bcall == 3): + # we have flags + flags = bcall[2] + + log.debug('subscribing {} to {} keys in backend {}, flags: {}', + identifier, len(keys), backend_str, flags) + + await self.sub_many(backend_str, identifier, keys, flags) async def dispatch(self, backend_str: str, key: Any, *args, **kwargs): """Dispatch an event to the backend. diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index af660f2..d68f2d6 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -437,17 +437,23 @@ class GatewayWebsocket: # fetch all group dms the user is a member of. gdm_ids = await self.user_storage.get_gdms_internal(user_id) - log.info('subscribing to {} guilds', len(guild_ids)) - log.info('subscribing to {} dms', len(dm_ids)) - log.info('subscribing to {} group dms', len(gdm_ids)) + log.info('subscribing to {} guilds {} dms {} gdms', + len(guild_ids), len(dm_ids), len(gdm_ids)) - # TODO(gw-guild-subscriptions) - # make a channel:typing and guild:presence subchannels - await self.ext.dispatcher.mass_sub(user_id, [ - ('guild', guild_ids), + # guild_subscriptions: + # enables dispatching of guild subscription events + # (presence and typing events) + + # we enable processing of guild_subscriptions by adding flags + # when subscribing to the given backend. those are optional. + channels_to_sub = [ + ('guild', guild_ids, + {'presence': guild_subscriptions, 'typing': guild_subscriptions}), ('channel', dm_ids), - ('channel', gdm_ids) - ]) + ('channel', gdm_ids), + ] + + await self.ext.dispatcher.mass_sub(user_id, channels_to_sub) if not self.state.bot: # subscribe to all friends diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index dd03ef2..4ad5ee7 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -89,7 +89,7 @@ class Dispatcher: try: await state.ws.dispatch(event, data) res.append(state.session_id) - except: + except Exception: log.exception('error while dispatching') return res From dda99c9a6ac911d284c0b8a881b8d0ed1e43465f Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:51:13 -0300 Subject: [PATCH 03/13] add basic flags support to GuildDispatcher --- litecord/pubsub/guild.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 5a419e2..069b664 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -18,6 +18,7 @@ along with this program. If not, see . """ from typing import Any +from collections import defaultdict from logbook import Logger @@ -32,9 +33,18 @@ class GuildDispatcher(DispatcherWithState): KEY_TYPE = int VAL_TYPE = int + def __init__(self, main): + super().__init__(main) + + #: keep flags for subscribers, so for example + # a subscriber could drop all presence events at the + # pubsub level. see gateway's guild_subscriptions field for more + self.flags = defaultdict(dict) + async def _chan_action(self, action: str, - guild_id: int, user_id: int): + guild_id: int, user_id: int, flags=None): """Send an action to all channels of the guild.""" + flags = flags or {} chan_ids = await self.app.storage.get_channel_ids(guild_id) for chan_id in chan_ids: @@ -54,7 +64,7 @@ class GuildDispatcher(DispatcherWithState): action, chan_id) await self.main_dispatcher.action( - 'channel', action, chan_id, user_id + 'channel', action, chan_id, user_id, flags ) async def _chan_call(self, meth: str, guild_id: int, *args): @@ -70,14 +80,17 @@ class GuildDispatcher(DispatcherWithState): meth, chan_id) await method(chan_id, *args) - async def sub(self, guild_id: int, user_id: int): + async def sub(self, guild_id: int, user_id: int, flags = None): """Subscribe a user to the guild.""" await super().sub(guild_id, user_id) - await self._chan_action('sub', guild_id, user_id) + self.flags[guild_id][user_id] = flags or {} + + await self._chan_action('sub', guild_id, user_id, flags) async def unsub(self, guild_id: int, user_id: int): """Unsubscribe a user from the guild.""" await super().unsub(guild_id, user_id) + self.flags[guild_id].pop(user_id) await self._chan_action('unsub', guild_id, user_id) async def dispatch_filter(self, guild_id: int, func, From 4d7e6ec93c4e2aaf792a5dd4ed8b3216e514b720 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:55:14 -0300 Subject: [PATCH 04/13] pubsub.guild: skip presence events based on flags.presence --- litecord/pubsub/guild.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 069b664..c03a5db 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -114,6 +114,15 @@ class GuildDispatcher(DispatcherWithState): await self.unsub(guild_id, user_id) continue + # skip the given subscriber if event starts with PRESENCE_ + # and the flags say they don't want it. + + # note that this does not equate to any unsubscription + # of the channel. + flags = self.flags[guild_id][user_id] + if event.startswith('PRESENCE_') and not flags.get('presence'): + continue + # filter the ones that matter states = list(filter( lambda state: func(state.session_id), states @@ -121,6 +130,7 @@ class GuildDispatcher(DispatcherWithState): cur_sess = await self._dispatch_states( states, event, data) + sessions.extend(cur_sess) dispatched += len(cur_sess) From 99e29ad39fa9a0e82f3f7313a3837b07ab2cc926 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:55:39 -0300 Subject: [PATCH 05/13] pubsub.guild: default to always getting presence --- litecord/pubsub/guild.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index c03a5db..fdfdf2c 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -120,7 +120,7 @@ class GuildDispatcher(DispatcherWithState): # note that this does not equate to any unsubscription # of the channel. flags = self.flags[guild_id][user_id] - if event.startswith('PRESENCE_') and not flags.get('presence'): + if event.startswith('PRESENCE_') and not flags.get('presence', True): continue # filter the ones that matter From 4c29575b2c8e163b1206d02858c165af0fa7ae06 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 12:59:20 -0300 Subject: [PATCH 06/13] pubsub.dispatcher: add DispatcherWithFlags --- litecord/pubsub/dispatcher.py | 22 ++++++++++++++++++++++ litecord/pubsub/guild.py | 19 ++++--------------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index 4ad5ee7..7d40c07 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -128,3 +128,25 @@ class DispatcherWithState(Dispatcher): async def dispatch(self, key, *args): raise NotImplementedError + + +class DispatcherWithFlags(DispatcherWithState): + """Pub/Sub backend with both a state and a flags store.""" + + def __init__(self, main): + super().__init__(main) + + #: keep flags for subscribers, so for example + # a subscriber could drop all presence events at the + # pubsub level. see gateway's guild_subscriptions field for more + self.flags = defaultdict(dict) + + async def sub(self, key, identifier, flags=None): + """Subscribe a user to the guild.""" + await super().sub(key, identifier) + self.flags[key][identifier] = flags or {} + + async def unsub(self, key, identifier): + """Unsubscribe a user from the guild.""" + await super().unsub(key, identifier) + self.flags[key].pop(identifier) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index fdfdf2c..20b3bd1 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -22,25 +22,17 @@ from collections import defaultdict from logbook import Logger -from .dispatcher import DispatcherWithState +from .dispatcher import DispatcherWithFlags from litecord.permissions import get_permissions log = Logger(__name__) -class GuildDispatcher(DispatcherWithState): +class GuildDispatcher(DispatcherWithFlags): """Guild backend for Pub/Sub""" KEY_TYPE = int VAL_TYPE = int - def __init__(self, main): - super().__init__(main) - - #: keep flags for subscribers, so for example - # a subscriber could drop all presence events at the - # pubsub level. see gateway's guild_subscriptions field for more - self.flags = defaultdict(dict) - async def _chan_action(self, action: str, guild_id: int, user_id: int, flags=None): """Send an action to all channels of the guild.""" @@ -80,17 +72,14 @@ class GuildDispatcher(DispatcherWithState): meth, chan_id) await method(chan_id, *args) - async def sub(self, guild_id: int, user_id: int, flags = None): + async def sub(self, guild_id: int, user_id: int, flags=None): """Subscribe a user to the guild.""" - await super().sub(guild_id, user_id) - self.flags[guild_id][user_id] = flags or {} - + await super().sub(guild_id, user_id, flags) await self._chan_action('sub', guild_id, user_id, flags) async def unsub(self, guild_id: int, user_id: int): """Unsubscribe a user from the guild.""" await super().unsub(guild_id, user_id) - self.flags[guild_id].pop(user_id) await self._chan_action('unsub', guild_id, user_id) async def dispatch_filter(self, guild_id: int, func, From 29d13c53b875897381862f610fcd9bfc061b7603 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:00:19 -0300 Subject: [PATCH 07/13] pubsub.channel: add flags to ChannelDispatcher --- litecord/pubsub/channel.py | 2 +- litecord/pubsub/guild.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 15e4010..5b3cf55 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -48,7 +48,7 @@ def gdm_recipient_view(orig: dict, user_id: int) -> dict: return data -class ChannelDispatcher(DispatcherWithState): +class ChannelDispatcher(DispatcherWithFlags): """Main channel Pub/Sub logic.""" KEY_TYPE = int VAL_TYPE = int diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 20b3bd1..7b56fcf 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -18,7 +18,6 @@ along with this program. If not, see . """ from typing import Any -from collections import defaultdict from logbook import Logger From 49ed61438ca7bc9460a14d4a144214444ac19977 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:05:47 -0300 Subject: [PATCH 08/13] pubsub: add indirection layer to access flags --- litecord/pubsub/channel.py | 5 +++++ litecord/pubsub/dispatcher.py | 7 +++++++ litecord/pubsub/guild.py | 4 ++-- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 5b3cf55..8ad47b5 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -84,6 +84,11 @@ class ChannelDispatcher(DispatcherWithFlags): await self.unsub(channel_id, user_id) continue + # skip typing events for users that don't want it + if event.startswith('TYPING_') and \ + not self.flags_get(channel_id, user_id, 'typing', True): + continue + cur_sess = [] if event in ('CHANNEL_CREATE', 'CHANNEL_UPDATE') \ diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index 7d40c07..493162a 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -150,3 +150,10 @@ class DispatcherWithFlags(DispatcherWithState): """Unsubscribe a user from the guild.""" await super().unsub(key, identifier) self.flags[key].pop(identifier) + + def flags_get(self, key, identifier, field: str, default): + """Get a single field from the flags store.""" + # yes, i know its simply an indirection from the main flags store, + # but i'd rather have this than change every call if i ever change + # the structure of the flags store. + return self.flags[key][identifier].get(field, default) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 7b56fcf..54d77e4 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -107,8 +107,8 @@ class GuildDispatcher(DispatcherWithFlags): # note that this does not equate to any unsubscription # of the channel. - flags = self.flags[guild_id][user_id] - if event.startswith('PRESENCE_') and not flags.get('presence', True): + if event.startswith('PRESENCE_') and \ + not self.flags_get(guild_id, user_id, 'presence', True): continue # filter the ones that matter From 91679c3aee09d3a1029c603ebf0a808e1ed50a08 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:08:08 -0300 Subject: [PATCH 09/13] fix import --- litecord/pubsub/channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 8ad47b5..443d1e3 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -21,7 +21,7 @@ from typing import Any, List from logbook import Logger -from .dispatcher import DispatcherWithState +from .dispatcher import DispatcherWithFlags from litecord.enums import ChannelType from litecord.utils import index_by_func From fb18662af46fdeafc8002166e0f427f0cd023dd5 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:16:41 -0300 Subject: [PATCH 10/13] dispatcher: fix typo --- litecord/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index 29748a9..96f29d3 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -124,7 +124,7 @@ class EventDispatcher: if len(bcall) == 2: flags = {} - elif len(bcall == 3): + elif len(bcall) == 3: # we have flags flags = bcall[2] From ad751cdad7afa958270ce3e06167956fc7b15870 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:21:32 -0300 Subject: [PATCH 11/13] pubsub.guild: only pass flags when required --- litecord/pubsub/guild.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 54d77e4..aef55cc 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -54,8 +54,14 @@ class GuildDispatcher(DispatcherWithFlags): log.debug('sending raw action {!r} to chan={}', action, chan_id) + # for now, only sub() has support for flags. + # it is an idea to have flags support for other actions + args = [] + if action == 'sub': + args.append(flags) + await self.main_dispatcher.action( - 'channel', action, chan_id, user_id, flags + 'channel', action, chan_id, user_id, *args ) async def _chan_call(self, meth: str, guild_id: int, *args): From f99ad7a207d949d34f9771978ee113dd77eae632 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 13:22:56 -0300 Subject: [PATCH 12/13] pubsub.guild: drop presence flags when passing it to channels --- litecord/pubsub/guild.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index aef55cc..5ebc101 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -58,7 +58,11 @@ class GuildDispatcher(DispatcherWithFlags): # it is an idea to have flags support for other actions args = [] if action == 'sub': - args.append(flags) + chanflags = dict(flags) + + # channels don't need presence flags + chanflags.pop('presence') + args.append(chanflags) await self.main_dispatcher.action( 'channel', action, chan_id, user_id, *args From 80a54e8b8d853383a65125758229b5f253352ab3 Mon Sep 17 00:00:00 2001 From: Luna Date: Sat, 20 Jul 2019 14:39:22 -0300 Subject: [PATCH 13/13] ignore when flags.presence isn't given --- litecord/pubsub/guild.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index 5ebc101..462fb63 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -61,7 +61,11 @@ class GuildDispatcher(DispatcherWithFlags): chanflags = dict(flags) # channels don't need presence flags - chanflags.pop('presence') + try: + chanflags.pop('presence') + except KeyError: + pass + args.append(chanflags) await self.main_dispatcher.action(