blueprints.guild: use EventDispatcher

- dispatcher: add sub_guild, unsub_guild, remove_guild, dispatch_guild
 - gateway.state_manager: fix fetch_states
 - gateway.websocket: add current_shard, shard_count to atributes
 - schema: add ON DELETE CASCADE to channel tables
This commit is contained in:
Luna Mendes 2018-06-21 17:59:08 -03:00
parent b085df207d
commit de98d0f609
6 changed files with 86 additions and 11 deletions

View File

@ -70,7 +70,11 @@ async def create_guild():
guild_json = await app.storage.get_guild(guild_id, user_id)
guild_extra = await app.storage.get_guild_extra(guild_id, user_id, 250)
return jsonify({**guild_json, **guild_extra})
guild_total = {**guild_json, **guild_extra}
app.dispatcher.sub_guild(guild_id, user_id)
await app.dispatcher.dispatch_guild(guild_id, 'GUILD_CREATE', guild_total)
return jsonify(guild_total)
@bp.route('/<int:guild_id>', methods=['GET'])
@ -84,6 +88,7 @@ async def get_guild(guild_id):
@bp.route('/<int:guild_id>', methods=['DELETE'])
async def delete_guild(guild_id):
"""Delete a guild."""
user_id = await token_check()
owner_id = await app.db.fetchval("""
@ -98,7 +103,20 @@ async def delete_guild(guild_id):
if user_id != owner_id:
raise Forbidden('You are not the owner of the guild')
# TODO: delete guild, fire GUILD_DELETE to guild
await app.db.execute("""
DELETE FROM guild
WHERE guilds.id = $1
""", guild_id)
await app.dispatcher.dispatch_guild(guild_id, 'GUILD_DELETE', {
'id': guild_id,
'unavailable': False,
})
# remove from the dispatcher so nobody
# becomes the little memer that tries to fuck up with
# everybody's gateway
app.dispatcher.remove_guild(guild_id)
return '', 204
@ -166,8 +184,7 @@ async def create_channel(guild_id):
raise NotImplementedError()
# TODO: fire Channel Create event
await app.dispatcher.dispatch_guild(guild_id, 'CHANNEL_CREATE', channel)
return jsonify(channel)

View File

@ -1,4 +1,55 @@
import collections
from typing import Any
from logbook import Logger
log = Logger(__name__)
class EventDispatcher:
"""Pub/Sub routines for litecord."""
pass
def __init__(self, sm):
self.state_manager = sm
self.guild_buckets = collections.defaultdict(set)
def sub_guild(self, guild_id: int, user_id: int):
"""Subscribe to a guild's events, given the user ID."""
self.guild_buckets[guild_id].add(user_id)
def unsub_guild(self, guild_id: int, user_id: int):
"""Unsubscribe from a guild, given user ID"""
self.guild_buckets[guild_id].discard(user_id)
def remove_guild(self, guild_id):
"""Reset the guild bucket."""
self.guild_buckets[guild_id] = set()
async def dispatch_guild(self, guild_id: int,
event_name: str, event_payload: Any):
"""Dispatch an event to a guild"""
users = self.guild_buckets[guild_id]
dispatched = 0
log.info('Dispatching {} {!r} to {} users',
guild_id, event_name, len(users))
for user_id in set(users):
# fetch all connections that are tied to the guild,
# this includes all connections that are just a single shard
# and all shards that are nicely working
states = self.state_manager.fetch_states(user_id, guild_id)
# if there are no more states tied to the guild,
# why keep the user as a subscriber?
if not states:
self.unsub_guild(guild_id, user_id)
continue
# for each reasonable state/shard, dispatch event
for state in states:
# NOTE: maybe a separate task for that async?
await state.ws.dispatch(event_name, event_payload)
dispatched += 1
log.info('Dispatched {} {!r} to {} states',
guild_id, event_name, dispatched)

View File

@ -37,11 +37,14 @@ class StateManager:
except KeyError:
pass
def fetch_states(self, user_id, guild_id) -> List[GatewayState]:
def fetch_states(self, user_id: int, guild_id: int) -> List[GatewayState]:
"""Fetch all states that are tied to a guild."""
states = []
for state in self.states[user_id]:
for state in self.states[user_id].values():
# find out if we are the shard for the guild id
# this works if shard_count == 1 (the default for
# single gw connections) since N % 1 is always 0
shard_id = (guild_id >> 22) % state.shard_count
if shard_id == state.current_shard:

View File

@ -204,6 +204,8 @@ class GatewayWebsocket:
compress=compress,
large=large,
shard=shard,
current_shard=shard[0],
shard_count=shard[1],
presence=presence,
ws=self
)

2
run.py
View File

@ -13,6 +13,7 @@ from litecord.gateway import websocket_handler
from litecord.errors import LitecordError
from litecord.gateway.state_manager import StateManager
from litecord.storage import Storage
from litecord.dispatcher import EventDispatcher
# setup logbook
handler = StreamHandler(sys.stdout, level=logbook.INFO)
@ -49,6 +50,7 @@ async def app_before_serving():
g.loop = asyncio.get_event_loop()
app.state_manager = StateManager()
app.dispatcher = EventDispatcher(app.state_manager)
app.storage = Storage(app.db)
# start the websocket, etc

View File

@ -143,7 +143,7 @@ CREATE TABLE IF NOT EXISTS guilds (
CREATE TABLE IF NOT EXISTS guild_channels (
id bigint REFERENCES channels (id) PRIMARY KEY,
guild_id bigint REFERENCES guilds (id),
guild_id bigint REFERENCES guilds (id) ON DELETE CASCADE,
-- an id to guild_channels
parent_id bigint DEFAULT NULL,
@ -155,12 +155,12 @@ CREATE TABLE IF NOT EXISTS guild_channels (
CREATE TABLE IF NOT EXISTS guild_text_channels (
id bigint REFERENCES guild_channels (id) PRIMARY KEY,
id bigint REFERENCES guild_channels (id) PRIMARY KEY ON DELETE CASCADE,
topic text DEFAULT ''
);
CREATE TABLE IF NOT EXISTS guild_voice_channels (
id bigint REFERENCES guild_channels (id) PRIMARY KEY,
id bigint REFERENCES guild_channels (id) PRIMARY KEY ON DELETE CASCADE,
-- default bitrate for discord is 64kbps
bitrate int DEFAULT 64,
@ -176,7 +176,7 @@ CREATE TABLE IF NOT EXISTS dm_channels (
CREATE TABLE IF NOT EXISTS group_dm_channels (
id bigint REFERENCES channels (id) PRIMARY KEY,
id bigint REFERENCES channels (id) PRIMARY KEY ON DELETE CASCADE,
owner_id bigint REFERENCES users (id),
icon bigint REFERENCES files (id)
);