From 48c7dc539aafde8fc36b6046d8b4cd87fd79e728 Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Mon, 19 Nov 2018 15:33:16 -0300 Subject: [PATCH] users: use dispatch_many_filter when sending USER_UPDATE events Related to #19. - dispatcher: add EventDispatcher.dispatch_many_filter - pubsub.channel, guild: return session ids on dispatch() return - pubsub.friend, user: add dispatch_filter handler --- litecord/blueprints/users.py | 27 +++++++++++++++++++++------ litecord/dispatcher.py | 15 +++++++++++++++ litecord/pubsub/channel.py | 8 +++++++- litecord/pubsub/dispatcher.py | 6 +++--- litecord/pubsub/friend.py | 23 ++++++++++++++++------- litecord/pubsub/guild.py | 9 +++++++-- litecord/pubsub/user.py | 17 +++++++++++++++-- 7 files changed, 84 insertions(+), 21 deletions(-) diff --git a/litecord/blueprints/users.py b/litecord/blueprints/users.py index dfe4f99..2dc65e2 100644 --- a/litecord/blueprints/users.py +++ b/litecord/blueprints/users.py @@ -197,18 +197,33 @@ async def patch_me(): user.pop('password_hash') public_user = await app.storage.get_user(user_id) - await app.dispatcher.dispatch_user( - user_id, 'USER_UPDATE', public_user) + session_ids = [] + + # by using dispatch_with_filter + # we're guaranteeing all shards will get + # a USER_UPDATE once and not any others. + session_ids.extend( + await app.dispatcher.dispatch_user( + user_id, 'USER_UPDATE', public_user) + ) guild_ids = await app.user_storage.get_user_guilds(user_id) friend_ids = await app.user_storage.get_friend_ids(user_id) - await app.dispatcher.dispatch_many( - 'guild', guild_ids, 'USER_UPDATE', public_user + session_ids.extend( + await app.dispatcher.dispatch_many_filter( + 'guild', guild_ids, + lambda sess_id: sess_id not in session_ids, + 'USER_UPDATE', public_user + ) ) - await app.dispatcher.dispatch_many( - 'friend', friend_ids, 'USER_UPDATE', public_user + session_ids.extend( + await app.dispatcher.dispatch_many_filter( + 'friend', friend_ids, + lambda sess_id: sess_id not in session_ids, + 'USER_UPDATE', public_user + ) ) return jsonify(user) diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index 85200f0..31780ce 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -114,6 +114,21 @@ class EventDispatcher: key = backend.KEY_TYPE(key) return await backend.dispatch_filter(key, func, *args) + async def dispatch_many_filter(self, backend_str, keys: List[Any], + func, *args) -> List[str]: + """Call the dispatch_filter method to many keys. + + Not all backends will handle this function. + """ + res = [] + + for key in keys: + res.extend( + await self.dispatch_filter(backend_str, key, func, *args) + ) + + return res + async def reset(self, backend_str: str, key: Any): """Reset the bucket in the given backend.""" backend = self.backends[backend_str] diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 621eeff..38bb273 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -19,6 +19,7 @@ class ChannelDispatcher(DispatcherWithState): # and store the number of states we dispatched the event to user_ids = self.state[channel_id] dispatched = 0 + sessions = [] # making a copy of user_ids since # we'll modify it later on. @@ -42,7 +43,12 @@ class ChannelDispatcher(DispatcherWithState): await self.unsub(channel_id, user_id) continue - dispatched += await self._dispatch_states(states, event, data) + cur_sess = await self._dispatch_states(states, event, data) + + sessions.extend(cur_sess) + dispatched += len(cur_sess) log.info('Dispatched chan={} {!r} to {} states', channel_id, event, dispatched) + + return sessions diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index c9da2e7..cee9f51 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -65,16 +65,16 @@ class Dispatcher: async def _dispatch_states(self, states: list, event: str, data) -> int: """Dispatch an event to a list of states.""" - dispatched = 0 + res = [] for state in states: try: await state.ws.dispatch(event, data) - dispatched += 1 + res.append(state.session_id) except: log.exception('error while dispatching') - return dispatched + return res class DispatcherWithState(Dispatcher): diff --git a/litecord/pubsub/friend.py b/litecord/pubsub/friend.py index d80ac6d..4e3fa54 100644 --- a/litecord/pubsub/friend.py +++ b/litecord/pubsub/friend.py @@ -15,18 +15,27 @@ class FriendDispatcher(DispatcherWithState): KEY_TYPE = int VAL_TYPE = int - async def dispatch(self, user_id: int, event, data): + async def dispatch_filter(self, user_id: int, func, event, data): """Dispatch an event to all of a users' friends.""" - # all friends that are connected and subscribed - # to the one we're dispatching from peer_ids = self.state[user_id] - dispatched = 0 + sessions = [] for peer_id in peer_ids: # dispatch to the user instead of the "shards tied to a guild" # since relationships broadcast to all shards. - dispatched += await self.main_dispatcher.dispatch( - 'user', peer_id, event, data) + sessions.extend( + await self.main_dispatcher.dispatch_filter( + 'user', peer_id, func, event, data) + ) log.info('dispatched uid={} {!r} to {} states', - user_id, event, dispatched) + user_id, event, len(sessions)) + + return sessions + + async def dispatch(self, user_id, event, data): + return await self.dispatch_filter( + user_id, + lambda sess_id: True, + event, data, + ) diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index fcb0f05..c53eb60 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -63,6 +63,7 @@ class GuildDispatcher(DispatcherWithState): func(session_id) true.""" user_ids = self.state[guild_id] dispatched = 0 + sessions = [] # acquire a copy since we may be modifying # the original user_ids @@ -82,16 +83,20 @@ class GuildDispatcher(DispatcherWithState): lambda state: func(state.session_id), states )) - dispatched += await self._dispatch_states( + cur_sess = await self._dispatch_states( states, event, data) + sessions.extend(cur_sess) + dispatched += len(cur_sess) log.info('Dispatched {} {!r} to {} states', guild_id, event, dispatched) + return sessions + async def dispatch(self, guild_id: int, event: str, data: Any): """Dispatch an event to all subscribers of the guild.""" - await self.dispatch_filter( + return await self.dispatch_filter( guild_id, lambda sess_id: True, event, data, diff --git a/litecord/pubsub/user.py b/litecord/pubsub/user.py index 5187f71..78811b3 100644 --- a/litecord/pubsub/user.py +++ b/litecord/pubsub/user.py @@ -5,7 +5,20 @@ class UserDispatcher(Dispatcher): """User backend for Pub/Sub.""" KEY_TYPE = int - async def dispatch(self, user_id: int, event, data): + async def dispatch_filter(self, user_id: int, func, event, data): """Dispatch an event to all shards of a user.""" - states = self.sm.user_states(user_id) + + # filter only states where func() gives true + states = list(filter( + lambda state: func(state.session_id), + self.sm.user_states(user_id) + )) + return await self._dispatch_states(states, event, data) + + async def dispatch(self, user_id: int, event, data): + return await self.dispatch_filter( + user_id, + lambda sess_id: True, + event, data, + )