dispatcher: add basics of subscription flags

This commit is contained in:
Luna 2019-07-20 12:46:59 -03:00
parent bbea185a7d
commit e2cb49669e
3 changed files with 47 additions and 20 deletions

View File

@ -17,7 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import List, Any
from typing import List, Any, Dict
from logbook import Logger
@ -57,7 +57,7 @@ class EventDispatcher:
'lazy_guild': LazyGuildDispatcher(self),
}
async def action(self, backend_str: str, action: str, key, identifier):
async def action(self, backend_str: str, action: str, key, identifier, *args):
"""Send an action regarding a key/identifier pair to a backend.
Action is usually "sub" or "unsub".
@ -69,13 +69,24 @@ class EventDispatcher:
key = backend.KEY_TYPE(key)
identifier = backend.VAL_TYPE(identifier)
return await method(key, identifier)
return await method(key, identifier, *args)
async def subscribe(self, backend: str, key: Any, identifier: Any):
async def subscribe(self, backend: str, key: Any, identifier: Any,
flags: Dict[str, Any] = None):
"""Subscribe a single element to the given backend."""
flags = flags or {}
log.debug('SUB backend={} key={} <= id={}',
backend, key, identifier, backend)
# this is a hacky solution for backwards compatibility between backends
# that implement flags and backends that don't.
# passing flags to backends that don't implement flags will
# cause errors as expected.
if flags:
return await self.action(backend, 'sub', key, identifier, flags)
return await self.action(backend, 'sub', key, identifier)
async def unsubscribe(self, backend: str, key: Any, identifier: Any):
@ -93,24 +104,34 @@ class EventDispatcher:
"""Alias to unsubscribe()."""
return await self.unsubscribe(backend, key, identifier)
async def sub_many(self, backend_str: str, identifier: Any, keys: list):
async def sub_many(self, backend_str: str, identifier: Any,
keys: list, flags: Dict[str, Any] = None):
"""Subscribe to multiple channels (all in a single backend)
at a time.
Usually used when connecting to the gateway and the client
needs to subscribe to all their guids.
"""
flags = flags or {}
for key in keys:
await self.subscribe(backend_str, key, identifier)
await self.subscribe(backend_str, key, identifier, flags)
async def mass_sub(self, identifier: Any,
backends: List[tuple]):
"""Mass subscribe to many backends at once."""
for backend_str, keys in backends:
log.debug('subscribing {} to {} keys in backend {}',
identifier, len(keys), backend_str)
for bcall in backends:
backend_str, keys = bcall[0], bcall[1]
await self.sub_many(backend_str, identifier, keys)
if len(bcall) == 2:
flags = {}
elif len(bcall == 3):
# we have flags
flags = bcall[2]
log.debug('subscribing {} to {} keys in backend {}, flags: {}',
identifier, len(keys), backend_str, flags)
await self.sub_many(backend_str, identifier, keys, flags)
async def dispatch(self, backend_str: str, key: Any, *args, **kwargs):
"""Dispatch an event to the backend.

View File

@ -437,17 +437,23 @@ class GatewayWebsocket:
# fetch all group dms the user is a member of.
gdm_ids = await self.user_storage.get_gdms_internal(user_id)
log.info('subscribing to {} guilds', len(guild_ids))
log.info('subscribing to {} dms', len(dm_ids))
log.info('subscribing to {} group dms', len(gdm_ids))
log.info('subscribing to {} guilds {} dms {} gdms',
len(guild_ids), len(dm_ids), len(gdm_ids))
# TODO(gw-guild-subscriptions)
# make a channel:typing and guild:presence subchannels
await self.ext.dispatcher.mass_sub(user_id, [
('guild', guild_ids),
# guild_subscriptions:
# enables dispatching of guild subscription events
# (presence and typing events)
# we enable processing of guild_subscriptions by adding flags
# when subscribing to the given backend. those are optional.
channels_to_sub = [
('guild', guild_ids,
{'presence': guild_subscriptions, 'typing': guild_subscriptions}),
('channel', dm_ids),
('channel', gdm_ids)
])
('channel', gdm_ids),
]
await self.ext.dispatcher.mass_sub(user_id, channels_to_sub)
if not self.state.bot:
# subscribe to all friends

View File

@ -89,7 +89,7 @@ class Dispatcher:
try:
await state.ws.dispatch(event, data)
res.append(state.session_id)
except:
except Exception:
log.exception('error while dispatching')
return res