users: use dispatch_many_filter when sending USER_UPDATE events

Related to #19.

 - dispatcher: add EventDispatcher.dispatch_many_filter
 - pubsub.channel, guild: return session ids on dispatch() return
 - pubsub.friend, user: add dispatch_filter handler
This commit is contained in:
Luna Mendes 2018-11-19 15:33:16 -03:00
parent 8f56648544
commit 48c7dc539a
7 changed files with 84 additions and 21 deletions

View File

@ -197,18 +197,33 @@ async def patch_me():
user.pop('password_hash') user.pop('password_hash')
public_user = await app.storage.get_user(user_id) public_user = await app.storage.get_user(user_id)
await app.dispatcher.dispatch_user( session_ids = []
user_id, 'USER_UPDATE', public_user)
# by using dispatch_with_filter
# we're guaranteeing all shards will get
# a USER_UPDATE once and not any others.
session_ids.extend(
await app.dispatcher.dispatch_user(
user_id, 'USER_UPDATE', public_user)
)
guild_ids = await app.user_storage.get_user_guilds(user_id) guild_ids = await app.user_storage.get_user_guilds(user_id)
friend_ids = await app.user_storage.get_friend_ids(user_id) friend_ids = await app.user_storage.get_friend_ids(user_id)
await app.dispatcher.dispatch_many( session_ids.extend(
'guild', guild_ids, 'USER_UPDATE', public_user await app.dispatcher.dispatch_many_filter(
'guild', guild_ids,
lambda sess_id: sess_id not in session_ids,
'USER_UPDATE', public_user
)
) )
await app.dispatcher.dispatch_many( session_ids.extend(
'friend', friend_ids, 'USER_UPDATE', public_user await app.dispatcher.dispatch_many_filter(
'friend', friend_ids,
lambda sess_id: sess_id not in session_ids,
'USER_UPDATE', public_user
)
) )
return jsonify(user) return jsonify(user)

View File

@ -114,6 +114,21 @@ class EventDispatcher:
key = backend.KEY_TYPE(key) key = backend.KEY_TYPE(key)
return await backend.dispatch_filter(key, func, *args) return await backend.dispatch_filter(key, func, *args)
async def dispatch_many_filter(self, backend_str, keys: List[Any],
func, *args) -> List[str]:
"""Call the dispatch_filter method to many keys.
Not all backends will handle this function.
"""
res = []
for key in keys:
res.extend(
await self.dispatch_filter(backend_str, key, func, *args)
)
return res
async def reset(self, backend_str: str, key: Any): async def reset(self, backend_str: str, key: Any):
"""Reset the bucket in the given backend.""" """Reset the bucket in the given backend."""
backend = self.backends[backend_str] backend = self.backends[backend_str]

View File

@ -19,6 +19,7 @@ class ChannelDispatcher(DispatcherWithState):
# and store the number of states we dispatched the event to # and store the number of states we dispatched the event to
user_ids = self.state[channel_id] user_ids = self.state[channel_id]
dispatched = 0 dispatched = 0
sessions = []
# making a copy of user_ids since # making a copy of user_ids since
# we'll modify it later on. # we'll modify it later on.
@ -42,7 +43,12 @@ class ChannelDispatcher(DispatcherWithState):
await self.unsub(channel_id, user_id) await self.unsub(channel_id, user_id)
continue continue
dispatched += await self._dispatch_states(states, event, data) cur_sess = await self._dispatch_states(states, event, data)
sessions.extend(cur_sess)
dispatched += len(cur_sess)
log.info('Dispatched chan={} {!r} to {} states', log.info('Dispatched chan={} {!r} to {} states',
channel_id, event, dispatched) channel_id, event, dispatched)
return sessions

View File

@ -65,16 +65,16 @@ class Dispatcher:
async def _dispatch_states(self, states: list, event: str, data) -> int: async def _dispatch_states(self, states: list, event: str, data) -> int:
"""Dispatch an event to a list of states.""" """Dispatch an event to a list of states."""
dispatched = 0 res = []
for state in states: for state in states:
try: try:
await state.ws.dispatch(event, data) await state.ws.dispatch(event, data)
dispatched += 1 res.append(state.session_id)
except: except:
log.exception('error while dispatching') log.exception('error while dispatching')
return dispatched return res
class DispatcherWithState(Dispatcher): class DispatcherWithState(Dispatcher):

View File

@ -15,18 +15,27 @@ class FriendDispatcher(DispatcherWithState):
KEY_TYPE = int KEY_TYPE = int
VAL_TYPE = int VAL_TYPE = int
async def dispatch(self, user_id: int, event, data): async def dispatch_filter(self, user_id: int, func, event, data):
"""Dispatch an event to all of a users' friends.""" """Dispatch an event to all of a users' friends."""
# all friends that are connected and subscribed
# to the one we're dispatching from
peer_ids = self.state[user_id] peer_ids = self.state[user_id]
dispatched = 0 sessions = []
for peer_id in peer_ids: for peer_id in peer_ids:
# dispatch to the user instead of the "shards tied to a guild" # dispatch to the user instead of the "shards tied to a guild"
# since relationships broadcast to all shards. # since relationships broadcast to all shards.
dispatched += await self.main_dispatcher.dispatch( sessions.extend(
'user', peer_id, event, data) await self.main_dispatcher.dispatch_filter(
'user', peer_id, func, event, data)
)
log.info('dispatched uid={} {!r} to {} states', log.info('dispatched uid={} {!r} to {} states',
user_id, event, dispatched) user_id, event, len(sessions))
return sessions
async def dispatch(self, user_id, event, data):
return await self.dispatch_filter(
user_id,
lambda sess_id: True,
event, data,
)

View File

@ -63,6 +63,7 @@ class GuildDispatcher(DispatcherWithState):
func(session_id) true.""" func(session_id) true."""
user_ids = self.state[guild_id] user_ids = self.state[guild_id]
dispatched = 0 dispatched = 0
sessions = []
# acquire a copy since we may be modifying # acquire a copy since we may be modifying
# the original user_ids # the original user_ids
@ -82,16 +83,20 @@ class GuildDispatcher(DispatcherWithState):
lambda state: func(state.session_id), states lambda state: func(state.session_id), states
)) ))
dispatched += await self._dispatch_states( cur_sess = await self._dispatch_states(
states, event, data) states, event, data)
sessions.extend(cur_sess)
dispatched += len(cur_sess)
log.info('Dispatched {} {!r} to {} states', log.info('Dispatched {} {!r} to {} states',
guild_id, event, dispatched) guild_id, event, dispatched)
return sessions
async def dispatch(self, guild_id: int, async def dispatch(self, guild_id: int,
event: str, data: Any): event: str, data: Any):
"""Dispatch an event to all subscribers of the guild.""" """Dispatch an event to all subscribers of the guild."""
await self.dispatch_filter( return await self.dispatch_filter(
guild_id, guild_id,
lambda sess_id: True, lambda sess_id: True,
event, data, event, data,

View File

@ -5,7 +5,20 @@ class UserDispatcher(Dispatcher):
"""User backend for Pub/Sub.""" """User backend for Pub/Sub."""
KEY_TYPE = int KEY_TYPE = int
async def dispatch(self, user_id: int, event, data): async def dispatch_filter(self, user_id: int, func, event, data):
"""Dispatch an event to all shards of a user.""" """Dispatch an event to all shards of a user."""
states = self.sm.user_states(user_id)
# filter only states where func() gives true
states = list(filter(
lambda state: func(state.session_id),
self.sm.user_states(user_id)
))
return await self._dispatch_states(states, event, data) return await self._dispatch_states(states, event, data)
async def dispatch(self, user_id: int, event, data):
return await self.dispatch_filter(
user_id,
lambda sess_id: True,
event, data,
)