mirror of https://gitlab.com/litecord/litecord.git
litecord.pubsub: add docstrings for all methods
This commit is contained in:
parent
3f54f35dce
commit
6be85ea305
|
|
@ -10,7 +10,21 @@ log = Logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EventDispatcher:
|
class EventDispatcher:
|
||||||
"""Pub/Sub routines for litecord."""
|
"""Pub/Sub routines for litecord.
|
||||||
|
|
||||||
|
EventDispatcher is the middle man between
|
||||||
|
REST code and gateway event logic.
|
||||||
|
|
||||||
|
It sets up Pub/Sub backends and each of them
|
||||||
|
have their own ways of dispatching a single event.
|
||||||
|
|
||||||
|
"key" and "identifier" are the "channel" and "subscriber id"
|
||||||
|
of pub/sub. clients can subscribe to a channel using its backend
|
||||||
|
and the key inside the backend.
|
||||||
|
|
||||||
|
when dispatching, the backend can do its own logic, given
|
||||||
|
its subscriber ids.
|
||||||
|
"""
|
||||||
def __init__(self, app):
|
def __init__(self, app):
|
||||||
self.state_manager = app.state_manager
|
self.state_manager = app.state_manager
|
||||||
self.app = app
|
self.app = app
|
||||||
|
|
@ -24,10 +38,14 @@ class EventDispatcher:
|
||||||
}
|
}
|
||||||
|
|
||||||
async def action(self, backend_str: str, action: str, key, identifier):
|
async def action(self, backend_str: str, action: str, key, identifier):
|
||||||
"""Send an action regarding a key/identifier pair to a backend."""
|
"""Send an action regarding a key/identifier pair to a backend.
|
||||||
backend = self.backends[backend_str]
|
|
||||||
method = getattr(backend, f'{action}')
|
|
||||||
|
|
||||||
|
Action is usually "sub" or "unsub".
|
||||||
|
"""
|
||||||
|
backend = self.backends[backend_str]
|
||||||
|
method = getattr(backend, action)
|
||||||
|
|
||||||
|
# convert keys to the types the backend wants
|
||||||
key = backend.KEY_TYPE(key)
|
key = backend.KEY_TYPE(key)
|
||||||
identifier = backend.VAL_TYPE(identifier)
|
identifier = backend.VAL_TYPE(identifier)
|
||||||
|
|
||||||
|
|
@ -48,17 +66,32 @@ class EventDispatcher:
|
||||||
return await self.action(backend, 'unsub', key, identifier)
|
return await self.action(backend, 'unsub', key, identifier)
|
||||||
|
|
||||||
async def sub(self, backend, key, identifier):
|
async def sub(self, backend, key, identifier):
|
||||||
|
"""Alias to subscribe()."""
|
||||||
return await self.subscribe(backend, key, identifier)
|
return await self.subscribe(backend, key, identifier)
|
||||||
|
|
||||||
async def unsub(self, backend, key, identifier):
|
async def unsub(self, backend, key, identifier):
|
||||||
|
"""Alias to unsubscribe()."""
|
||||||
return await self.unsubscribe(backend, key, identifier)
|
return await self.unsubscribe(backend, key, identifier)
|
||||||
|
|
||||||
|
async def sub_many(self, backend_str: str, identifier: Any, keys: list):
|
||||||
|
"""Subscribe to multiple channels (all in a single backend)
|
||||||
|
at a time.
|
||||||
|
|
||||||
|
Usually used when connecting to the gateway and the client
|
||||||
|
needs to subscribe to all their guids.
|
||||||
|
"""
|
||||||
|
for key in keys:
|
||||||
|
await self.subscribe(backend_str, key, identifier)
|
||||||
|
|
||||||
async def dispatch(self, backend_str: str, key: Any, *args, **kwargs):
|
async def dispatch(self, backend_str: str, key: Any, *args, **kwargs):
|
||||||
"""Dispatch an event to the backend.
|
"""Dispatch an event to the backend.
|
||||||
|
|
||||||
The backend is responsible for everything regarding the dispatch.
|
The backend is responsible for everything regarding the
|
||||||
|
actual dispatch.
|
||||||
"""
|
"""
|
||||||
backend = self.backends[backend_str]
|
backend = self.backends[backend_str]
|
||||||
|
|
||||||
|
# convert types
|
||||||
key = backend.KEY_TYPE(key)
|
key = backend.KEY_TYPE(key)
|
||||||
return await backend.dispatch(key, *args, **kwargs)
|
return await backend.dispatch(key, *args, **kwargs)
|
||||||
|
|
||||||
|
|
@ -67,6 +100,7 @@ class EventDispatcher:
|
||||||
"""Dispatch to multiple keys in a single backend."""
|
"""Dispatch to multiple keys in a single backend."""
|
||||||
log.info('MULTI DISPATCH: {!r}, {} keys',
|
log.info('MULTI DISPATCH: {!r}, {} keys',
|
||||||
backend_str, len(keys))
|
backend_str, len(keys))
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
await self.dispatch(backend_str, key, *args, **kwargs)
|
await self.dispatch(backend_str, key, *args, **kwargs)
|
||||||
|
|
||||||
|
|
@ -83,20 +117,14 @@ class EventDispatcher:
|
||||||
key = backend.KEY_TYPE(key)
|
key = backend.KEY_TYPE(key)
|
||||||
return await backend.remove(key)
|
return await backend.remove(key)
|
||||||
|
|
||||||
async def sub_many(self, backend_str: str, identifier: Any, keys: list):
|
|
||||||
"""Subscribe to many buckets inside a single backend
|
|
||||||
at a time."""
|
|
||||||
for key in keys:
|
|
||||||
await self.subscribe(backend_str, key, identifier)
|
|
||||||
|
|
||||||
async def dispatch_guild(self, guild_id, event, data):
|
async def dispatch_guild(self, guild_id, event, data):
|
||||||
"""Backwards compatibility."""
|
"""Backwards compatibility with old EventDispatcher."""
|
||||||
return await self.dispatch('guild', guild_id, event, data)
|
return await self.dispatch('guild', guild_id, event, data)
|
||||||
|
|
||||||
async def dispatch_user_guild(self, user_id, guild_id, event, data):
|
async def dispatch_user_guild(self, user_id, guild_id, event, data):
|
||||||
"""Backwards compatibility."""
|
"""Backwards compatibility with old EventDispatcher."""
|
||||||
return await self.dispatch('member', (guild_id, user_id), event, data)
|
return await self.dispatch('member', (guild_id, user_id), event, data)
|
||||||
|
|
||||||
async def dispatch_user(self, user_id, event, data):
|
async def dispatch_user(self, user_id, event, data):
|
||||||
"""Backwards compatibility."""
|
"""Backwards compatibility with old EventDispatcher."""
|
||||||
return await self.dispatch('user', user_id, event, data)
|
return await self.dispatch('user', user_id, event, data)
|
||||||
|
|
|
||||||
|
|
@ -16,20 +16,33 @@ class ChannelDispatcher(DispatcherWithState):
|
||||||
async def dispatch(self, channel_id,
|
async def dispatch(self, channel_id,
|
||||||
event: str, data: Any):
|
event: str, data: Any):
|
||||||
"""Dispatch an event to a channel."""
|
"""Dispatch an event to a channel."""
|
||||||
|
# get everyone who is subscribed
|
||||||
|
# and store the number of states we dispatched the event to
|
||||||
user_ids = self.state[channel_id]
|
user_ids = self.state[channel_id]
|
||||||
dispatched = 0
|
dispatched = 0
|
||||||
|
|
||||||
|
# making a copy of user_ids since
|
||||||
|
# we'll modify it later on.
|
||||||
for user_id in set(user_ids):
|
for user_id in set(user_ids):
|
||||||
guild_id = await self.app.storage.guild_from_channel(channel_id)
|
guild_id = await self.app.storage.guild_from_channel(channel_id)
|
||||||
|
|
||||||
|
# if we are dispatching to a guild channel,
|
||||||
|
# we should only dispatch to the states / shards
|
||||||
|
# that are connected to the guild (via their shard id).
|
||||||
|
|
||||||
|
# if we aren't, we just get all states tied to the user.
|
||||||
|
# TODO: make a fetch_states that fetches shards
|
||||||
|
# - with id 0 (count any) OR
|
||||||
|
# - single shards (id=0, count=1)
|
||||||
states = (self.sm.fetch_states(user_id, guild_id)
|
states = (self.sm.fetch_states(user_id, guild_id)
|
||||||
if guild_id else
|
if guild_id else
|
||||||
|
|
||||||
# TODO: use a fetch_states with guild_id 0
|
|
||||||
# or maybe something to fetch all shards
|
|
||||||
# with id 0 and single shards
|
|
||||||
self.sm.user_states(user_id))
|
self.sm.user_states(user_id))
|
||||||
|
|
||||||
|
# unsub people who don't have any states tied to the channel.
|
||||||
|
if not states:
|
||||||
|
await self.unsub(channel_id, user_id)
|
||||||
|
continue
|
||||||
|
|
||||||
dispatched += await self._dispatch_states(states, event, data)
|
dispatched += await self._dispatch_states(states, event, data)
|
||||||
|
|
||||||
log.info('Dispatched chan={} {!r} to {} states',
|
log.info('Dispatched chan={} {!r} to {} states',
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,32 @@
|
||||||
|
"""
|
||||||
|
litecord.pubsub.dispatcher: main dispatcher class
|
||||||
|
"""
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from logbook import Logger
|
from logbook import Logger
|
||||||
|
|
||||||
log = Logger(__name__)
|
log = Logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Dispatcher:
|
class Dispatcher:
|
||||||
"""Pub/Sub backend dispatcher."""
|
"""Pub/Sub backend dispatcher.
|
||||||
|
|
||||||
|
This just declares functions all Dispatcher subclasses
|
||||||
|
can implement. This does not mean all Dispatcher
|
||||||
|
subclasses have them implemented.
|
||||||
|
"""
|
||||||
|
|
||||||
# the _ parameter is for (self)
|
# the _ parameter is for (self)
|
||||||
KEY_TYPE = lambda _, x: x
|
KEY_TYPE = lambda _, x: x
|
||||||
VAL_TYPE = lambda _, x: x
|
VAL_TYPE = lambda _, x: x
|
||||||
|
|
||||||
def __init__(self, main):
|
def __init__(self, main):
|
||||||
|
#: main EventDispatcher
|
||||||
self.main_dispatcher = main
|
self.main_dispatcher = main
|
||||||
|
|
||||||
|
#: gateway state storage
|
||||||
self.sm = main.state_manager
|
self.sm = main.state_manager
|
||||||
|
|
||||||
self.app = main.app
|
self.app = main.app
|
||||||
|
|
||||||
async def sub(self, _key, _id):
|
async def sub(self, _key, _id):
|
||||||
|
|
@ -66,6 +79,10 @@ class DispatcherWithState(Dispatcher):
|
||||||
def __init__(self, main):
|
def __init__(self, main):
|
||||||
super().__init__(main)
|
super().__init__(main)
|
||||||
|
|
||||||
|
#: the default dict is to a set
|
||||||
|
# so we make sure someone calling sub()
|
||||||
|
# twice won't get 2x the events for the
|
||||||
|
# same channel.
|
||||||
self.state = defaultdict(set)
|
self.state = defaultdict(set)
|
||||||
|
|
||||||
async def sub(self, key, identifier):
|
async def sub(self, key, identifier):
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,25 @@ log = Logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FriendDispatcher(DispatcherWithState):
|
class FriendDispatcher(DispatcherWithState):
|
||||||
|
"""Friend Pub/Sub logic.
|
||||||
|
|
||||||
|
When connecting, a client will subscribe to all their friends
|
||||||
|
channels. If that friend updates their presence, it will be
|
||||||
|
broadcasted through that channel to basically all their friends.
|
||||||
|
"""
|
||||||
KEY_TYPE = int
|
KEY_TYPE = int
|
||||||
VAL_TYPE = int
|
VAL_TYPE = int
|
||||||
|
|
||||||
async def dispatch(self, user_id: int, event, data):
|
async def dispatch(self, user_id: int, event, data):
|
||||||
"""Dispatch an event to all of a users' friends."""
|
"""Dispatch an event to all of a users' friends."""
|
||||||
|
# all friends that are connected and subscribed
|
||||||
|
# to the one we're dispatching from
|
||||||
peer_ids = self.state[user_id]
|
peer_ids = self.state[user_id]
|
||||||
dispatched = 0
|
dispatched = 0
|
||||||
|
|
||||||
for peer_id in peer_ids:
|
for peer_id in peer_ids:
|
||||||
|
# dispatch to the user instead of the "shards tied to a guild"
|
||||||
|
# since relationships broadcast to all shards.
|
||||||
dispatched += await self.main_dispatcher.dispatch(
|
dispatched += await self.main_dispatcher.dispatch(
|
||||||
'user', peer_id, event, data)
|
'user', peer_id, event, data)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
VAL_TYPE = int
|
VAL_TYPE = int
|
||||||
|
|
||||||
async def _chan_action(self, action: str, guild_id: int, user_id: int):
|
async def _chan_action(self, action: str, guild_id: int, user_id: int):
|
||||||
|
"""Send an action to all channels of the guild."""
|
||||||
chan_ids = await self.app.storage.get_channel_ids(guild_id)
|
chan_ids = await self.app.storage.get_channel_ids(guild_id)
|
||||||
|
|
||||||
# TODO: check READ_MESSAGE permissions for the user
|
# TODO: check READ_MESSAGE permissions for the user
|
||||||
|
|
@ -27,7 +28,10 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _chan_call(self, meth: str, guild_id: int, *args):
|
async def _chan_call(self, meth: str, guild_id: int, *args):
|
||||||
|
"""Call a method on the ChannelDispatcher, for all channels
|
||||||
|
in the guild."""
|
||||||
chan_ids = await self.app.storage.get_channel_ids(guild_id)
|
chan_ids = await self.app.storage.get_channel_ids(guild_id)
|
||||||
|
|
||||||
chan_dispatcher = self.main_dispatcher.backends['channel']
|
chan_dispatcher = self.main_dispatcher.backends['channel']
|
||||||
method = getattr(chan_dispatcher, meth)
|
method = getattr(chan_dispatcher, meth)
|
||||||
|
|
||||||
|
|
@ -48,6 +52,8 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
async def unsub(self, guild_id: int, user_id: int):
|
async def unsub(self, guild_id: int, user_id: int):
|
||||||
"""Unsubscribe a user from the guild."""
|
"""Unsubscribe a user from the guild."""
|
||||||
await super().unsub(guild_id, user_id)
|
await super().unsub(guild_id, user_id)
|
||||||
|
|
||||||
|
# same thing happening from sub() happens on unsub()
|
||||||
await self._chan_action('unsub', guild_id, user_id)
|
await self._chan_action('unsub', guild_id, user_id)
|
||||||
|
|
||||||
async def dispatch(self, guild_id: int,
|
async def dispatch(self, guild_id: int,
|
||||||
|
|
@ -56,11 +62,11 @@ class GuildDispatcher(DispatcherWithState):
|
||||||
user_ids = self.state[guild_id]
|
user_ids = self.state[guild_id]
|
||||||
dispatched = 0
|
dispatched = 0
|
||||||
|
|
||||||
# acquire a copy since we will be modifying
|
# acquire a copy since we may be modifying
|
||||||
# the original user_ids
|
# the original user_ids
|
||||||
for user_id in set(user_ids):
|
for user_id in set(user_ids):
|
||||||
|
|
||||||
# fetch all states related to the user id and guild id.
|
# fetch all states / shards that are tied to the guild.
|
||||||
states = self.sm.fetch_states(user_id, guild_id)
|
states = self.sm.fetch_states(user_id, guild_id)
|
||||||
|
|
||||||
if not states:
|
if not states:
|
||||||
|
|
|
||||||
|
|
@ -10,15 +10,17 @@ class MemberDispatcher(Dispatcher):
|
||||||
|
|
||||||
This is shard-aware.
|
This is shard-aware.
|
||||||
"""
|
"""
|
||||||
|
# we don't keep any state on this dispatcher, so the key
|
||||||
|
# is just (guild_id, user_id)
|
||||||
guild_id, user_id = key
|
guild_id, user_id = key
|
||||||
|
|
||||||
# fetch shards
|
# fetch shards
|
||||||
states = self.sm.fetch_states(user_id, guild_id)
|
states = self.sm.fetch_states(user_id, guild_id)
|
||||||
|
|
||||||
# if no states were found, we should
|
# if no states were found, we should
|
||||||
# unsub the user from the channel
|
# unsub the user from the GUILD channel
|
||||||
if not states:
|
if not states:
|
||||||
await self.main_dispatcher.unsub('guild', guild_id, user_id)
|
await self.main_dispatcher.unsub('guild', guild_id, user_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
await self._dispatch_states(states, event, data)
|
return await self._dispatch_states(states, event, data)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue