dispatcher: add some dispatch_* methods for backwards compatibility

- dispatcher: add EventDispatcher.remove()
 - blueprints.guilds: use remove() and unsub()
 - gateway.websocket: fix sub_many() call
This commit is contained in:
Luna Mendes 2018-10-09 20:13:39 -03:00
parent aa76cc2c7d
commit b0d1c9765c
7 changed files with 45 additions and 9 deletions

View File

@ -183,7 +183,7 @@ async def delete_guild(guild_id):
# remove from the dispatcher so nobody # remove from the dispatcher so nobody
# becomes the little memer that tries to fuck up with # becomes the little memer that tries to fuck up with
# everybody's gateway # everybody's gateway
app.dispatcher.remove_guild(guild_id) await app.dispatcher.remove('guild', guild_id)
return '', 204 return '', 204

View File

@ -106,7 +106,7 @@ async def leave_guild(guild_id: int):
} }
) )
await app.dispatcher.unsub_guild(guild_id, user_id) await app.dispatcher.unsub('guild', guild_id, user_id)
await app.dispatcher.dispatch_guild('GUILD_MEMBER_REMOVE', { await app.dispatcher.dispatch_guild('GUILD_MEMBER_REMOVE', {
'guild_id': str(guild_id), 'guild_id': str(guild_id),

View File

@ -18,6 +18,8 @@ class EventDispatcher:
'guild': GuildDispatcher(self), 'guild': GuildDispatcher(self),
'member': MemberDispatcher(self), 'member': MemberDispatcher(self),
'user': UserDispatcher(self), 'user': UserDispatcher(self),
# TODO: channel, friends
} }
async def action(self, backend_str: str, action: str, key, identifier): async def action(self, backend_str: str, action: str, key, identifier):
@ -45,16 +47,35 @@ class EventDispatcher:
""" """
backend = self.backends[backend_str] backend = self.backends[backend_str]
key = backend.KEY_TYPE(key) key = backend.KEY_TYPE(key)
return await backend._dispatch(key, *args, **kwargs) return await backend.dispatch(key, *args, **kwargs)
async def reset(self, backend_str: str, key: Any): async def reset(self, backend_str: str, key: Any):
"""Reset the bucket in the given backend.""" """Reset the bucket in the given backend."""
backend = self.backends[backend_str] backend = self.backends[backend_str]
key = backend.KEY_TYPE(key) key = backend.KEY_TYPE(key)
return await backend._reset(key) return await backend.reset(key)
async def remove(self, backend_str: str, key: Any):
"""Remove a key from the backend. This
might be a different operation than resetting."""
backend = self.backends[backend_str]
key = backend.KEY_TYPE(key)
return await backend.remove(key)
async def sub_many(self, backend_str: str, identifier: Any, keys: list): async def sub_many(self, backend_str: str, identifier: Any, keys: list):
"""Subscribe to many buckets inside a single backend """Subscribe to many buckets inside a single backend
at a time.""" at a time."""
for key in keys: for key in keys:
await self.subscribe(backend_str, key, identifier) await self.subscribe(backend_str, key, identifier)
async def dispatch_guild(self, guild_id, event, data):
"""Backwards compatibility."""
return await self.dispatch('guild', guild_id, event, data)
async def dispatch_user_guild(self, user_id, guild_id, event, data):
"""Backwards compatibility."""
return await self.dispatch('member', (guild_id, user_id), event, data)
async def dispatch_user(self, user_id, event, data):
"""Backwards compatibility."""
return await self.dispatch('user', user_id, event, data)

View File

@ -287,9 +287,11 @@ class GatewayWebsocket:
"""Subscribe to all available guilds""" """Subscribe to all available guilds"""
guild_ids = await self._guild_ids() guild_ids = await self._guild_ids()
log.info('subscribing to {} guilds', len(guild_ids)) log.info('subscribing to {} guilds', len(guild_ids))
self.ext.dispatcher.sub_many(self.state.user_id, guild_ids) await self.ext.dispatcher.sub_many('guild',
self.state.user_id, guild_ids)
async def update_status(self, status: dict): async def update_status(self, status: dict):
"""Update the status of the current websocket connection."""
if status is None: if status is None:
status = { status = {
'afk': False, 'afk': False,

View File

@ -21,6 +21,12 @@ class Dispatcher:
async def dispatch(self, _key, *_args, **_kwargs): async def dispatch(self, _key, *_args, **_kwargs):
raise NotImplementedError raise NotImplementedError
async def reset(self, _key):
raise NotImplementedError
async def remove(self, _key):
raise NotImplementedError
async def _dispatch_states(self, states: list, event: str, data) -> int: async def _dispatch_states(self, states: list, event: str, data) -> int:
dispatched = 0 dispatched = 0

View File

@ -26,6 +26,12 @@ class GuildDispatcher(Dispatcher):
async def reset(self, guild_id: int): async def reset(self, guild_id: int):
self.guild_buckets[guild_id] = set() self.guild_buckets[guild_id] = set()
async def remove(self, guild_id: int):
try:
self.guild_buckets.pop(guild_id)
except KeyError:
pass
async def dispatch(self, guild_id: int, async def dispatch(self, guild_id: int,
event_name: str, event_payload: Any): event_name: str, event_payload: Any):
user_ids = self.guild_buckets[guild_id] user_ids = self.guild_buckets[guild_id]
@ -41,7 +47,7 @@ class GuildDispatcher(Dispatcher):
if not states: if not states:
# user is actually disconnected, # user is actually disconnected,
# so we should just unsub it # so we should just unsub it
await self._unsub(guild_id, user_id) await self.unsub(guild_id, user_id)
continue continue
dispatched += await self._dispatch_states( dispatched += await self._dispatch_states(

View File

@ -2,14 +2,15 @@ from .dispatcher import Dispatcher
class MemberDispatcher(Dispatcher): class MemberDispatcher(Dispatcher):
KEY_TYPE = int KEY_TYPE = tuple
VAL_TYPE = int
async def dispatch(self, guild_id: int, user_id: int, event, data): async def dispatch(self, key, event, data):
"""Dispatch a single event to a member. """Dispatch a single event to a member.
This is shard-aware. This is shard-aware.
""" """
guild_id, user_id = key
# fetch shards # fetch shards
states = self.sm.fetch_states(user_id, guild_id) states = self.sm.fetch_states(user_id, guild_id)