litecord.dispatcher: change dispatch_* methods into pubsub backends

- litecord: add pubsub module
 - schemas: change type to snowflake in MESSAGE_CREATE's nonce
This commit is contained in:
Luna Mendes 2018-10-09 18:56:34 -03:00
parent 5afc15c4f6
commit aa76cc2c7d
7 changed files with 155 additions and 61 deletions

View File

@ -3,6 +3,9 @@ from typing import Any
from logbook import Logger
from .pubsub import GuildDispatcher, MemberDispatcher, \
UserDispatcher
log = Logger(__name__)
@ -10,74 +13,48 @@ class EventDispatcher:
"""Pub/Sub routines for litecord."""
def __init__(self, sm):
self.state_manager = sm
self.guild_buckets = collections.defaultdict(set)
def sub_guild(self, guild_id: int, user_id: int):
"""Subscribe to a guild's events, given the user ID."""
self.guild_buckets[guild_id].add(user_id)
self.backends = {
'guild': GuildDispatcher(self),
'member': MemberDispatcher(self),
'user': UserDispatcher(self),
}
def unsub_guild(self, guild_id: int, user_id: int):
"""Unsubscribe from a guild, given user ID"""
self.guild_buckets[guild_id].discard(user_id)
async def action(self, backend_str: str, action: str, key, identifier):
"""Send an action regarding a key/identifier pair to a backend."""
backend = self.backends[backend_str]
method = getattr(backend, f'{action}')
def remove_guild(self, guild_id):
"""Reset the guild bucket."""
self.guild_buckets[guild_id] = set()
key = backend.KEY_TYPE(key)
identifier = backend.VAL_TYPE(identifier)
def sub_many(self, user_id: int, guild_ids: list):
"""Subscribe to many guilds at a time."""
for guild_id in guild_ids:
self.sub_guild(guild_id, user_id)
return await method(key, identifier)
async def dispatch_guild(self, guild_id: int,
event_name: str, event_payload: Any):
"""Dispatch an event to a guild"""
users = self.guild_buckets[guild_id]
dispatched = 0
async def subscribe(self, backend: str, key: Any, identifier: Any):
"""Subscribe a single element to the given backend."""
return await self.action(backend, 'sub', key, identifier)
log.debug('Dispatching {} {!r} to {} users',
guild_id, event_name, len(users))
async def unsubscribe(self, backend: str, key: Any, identifier: Any):
"""Unsubscribe an element from the given backend."""
return await self.action(backend, 'unsub', key, identifier)
for user_id in set(users):
# fetch all connections that are tied to the guild,
# this includes all connections that are just a single shard
# and all shards that are nicely working
states = self.state_manager.fetch_states(user_id, guild_id)
async def dispatch(self, backend_str: str, key: Any, *args, **kwargs):
"""Dispatch an event to the backend.
# if there are no more states tied to the guild,
# why keep the user as a subscriber?
if not states:
self.unsub_guild(guild_id, user_id)
continue
# for each reasonable state/shard, dispatch event
for state in states:
# NOTE: maybe a separate task for that async?
await state.ws.dispatch(event_name, event_payload)
dispatched += 1
log.info('Dispatched {} {!r} to {} states',
guild_id, event_name, dispatched)
async def _dispatch_states(self, states: list, event: str, data: Any):
for state in states:
await state.ws.dispatch(event, data)
async def dispatch_user_guild(self, user_id: int, guild_id: int,
event: str, data: Any):
"""Dispatch a single event to a user inside a guild.
The difference between dispatch_user and dispatch_user_guild
is sharding management happening here, via StateManager.fetch_states
The backend is responsible for everything regarding the dispatch.
"""
states = self.state_manager.fetch_states(user_id, guild_id)
backend = self.backends[backend_str]
key = backend.KEY_TYPE(key)
return await backend._dispatch(key, *args, **kwargs)
if not states:
self.unsub_guild(guild_id, user_id)
async def reset(self, backend_str: str, key: Any):
"""Reset the bucket in the given backend."""
backend = self.backends[backend_str]
key = backend.KEY_TYPE(key)
return await backend._reset(key)
await self._dispatch_states(states, event, data)
async def dispatch_user(self, user_id: int, event: str, data: Any):
"""Dispatch an event to a single user."""
states = self.state_manager.user_states(user_id)
await self._dispatch_states(states, event, data)
async def sub_many(self, backend_str: str, identifier: Any, keys: list):
"""Subscribe to many buckets inside a single backend
at a time."""
for key in keys:
await self.subscribe(backend_str, key, identifier)

View File

@ -0,0 +1,3 @@
from .guild import GuildDispatcher
from .member import MemberDispatcher
from .user import UserDispatcher

View File

@ -0,0 +1,34 @@
from logbook import Logger
log = Logger(__name__)
class Dispatcher:
"""Main dispatcher class."""
KEY_TYPE = lambda x: x
VAL_TYPE = lambda x: x
def __init__(self, main):
self.main_dispatcher = main
self.sm = main.state_manager
async def sub(self, _key, _id):
raise NotImplementedError
async def unsub(self, _key, _id):
raise NotImplementedError
async def dispatch(self, _key, *_args, **_kwargs):
raise NotImplementedError
async def _dispatch_states(self, states: list, event: str, data) -> int:
dispatched = 0
for state in states:
try:
await state.ws.dispatch(event, data)
dispatched += 1
except:
log.exception('error while dispatching')
return dispatched

51
litecord/pubsub/guild.py Normal file
View File

@ -0,0 +1,51 @@
from collections import defaultdict
from typing import Any
from logbook import Logger
from .dispatcher import Dispatcher
log = Logger(__name__)
class GuildDispatcher(Dispatcher):
"""Guild backend for Pub/Sub"""
KEY_TYPE = int
VAL_TYPE = int
def __init__(self, main):
super().__init__(main)
self.guild_buckets = defaultdict(set)
async def sub(self, guild_id: int, user_id: int):
self.guild_buckets[guild_id].add(user_id)
async def unsub(self, guild_id: int, user_id: int):
self.guild_buckets[guild_id].discard(user_id)
async def reset(self, guild_id: int):
self.guild_buckets[guild_id] = set()
async def dispatch(self, guild_id: int,
event_name: str, event_payload: Any):
user_ids = self.guild_buckets[guild_id]
dispatched = 0
# acquire a copy since we will be modifying
# the original user_ids
for user_id in set(user_ids):
# fetch all states related to the user id and guild id.
states = self.sm.fetch_states(user_id, guild_id)
if not states:
# user is actually disconnected,
# so we should just unsub it
await self._unsub(guild_id, user_id)
continue
dispatched += await self._dispatch_states(
states, event_name, event_payload)
log.info('Dispatched {} {!r} to {} states',
guild_id, event_name, dispatched)

20
litecord/pubsub/member.py Normal file
View File

@ -0,0 +1,20 @@
from .dispatcher import Dispatcher
class MemberDispatcher(Dispatcher):
KEY_TYPE = int
VAL_TYPE = int
async def dispatch(self, guild_id: int, user_id: int, event, data):
"""Dispatch a single event to a member.
This is shard-aware.
"""
# fetch shards
states = self.sm.fetch_states(user_id, guild_id)
if not states:
await self.main_dispatcher.unsub('guild', guild_id, user_id)
return
await self._dispatch_states(states, event, data)

9
litecord/pubsub/user.py Normal file
View File

@ -0,0 +1,9 @@
from .dispatcher import Dispatcher
class UserDispatcher(Dispatcher):
KEY_TYPE = int
async def dispatch(self, user_id: int, event, data):
states = self.sm.user_states(user_id)
return await self._dispatch_states(states, event, data)

View File

@ -123,7 +123,7 @@ MEMBER_UPDATE = {
MESSAGE_CREATE = {
'content': {'type': 'string', 'minlength': 1, 'maxlength': 2000},
'nonce': {'type': 'string', 'required': False},
'nonce': {'type': 'snowflake', 'required': False},
'tts': {'type': 'boolean', 'required': False},
# TODO: file, embed, payload_json