diff --git a/litecord/blueprints/guilds.py b/litecord/blueprints/guilds.py index 1d3fdc3..fb76001 100644 --- a/litecord/blueprints/guilds.py +++ b/litecord/blueprints/guilds.py @@ -70,7 +70,11 @@ async def create_guild(): guild_json = await app.storage.get_guild(guild_id, user_id) guild_extra = await app.storage.get_guild_extra(guild_id, user_id, 250) - return jsonify({**guild_json, **guild_extra}) + guild_total = {**guild_json, **guild_extra} + + app.dispatcher.sub_guild(guild_id, user_id) + await app.dispatcher.dispatch_guild(guild_id, 'GUILD_CREATE', guild_total) + return jsonify(guild_total) @bp.route('/', methods=['GET']) @@ -84,6 +88,7 @@ async def get_guild(guild_id): @bp.route('/', methods=['DELETE']) async def delete_guild(guild_id): + """Delete a guild.""" user_id = await token_check() owner_id = await app.db.fetchval(""" @@ -98,7 +103,20 @@ async def delete_guild(guild_id): if user_id != owner_id: raise Forbidden('You are not the owner of the guild') - # TODO: delete guild, fire GUILD_DELETE to guild + await app.db.execute(""" + DELETE FROM guild + WHERE guilds.id = $1 + """, guild_id) + + await app.dispatcher.dispatch_guild(guild_id, 'GUILD_DELETE', { + 'id': guild_id, + 'unavailable': False, + }) + + # remove from the dispatcher so nobody + # becomes the little memer that tries to fuck up with + # everybody's gateway + app.dispatcher.remove_guild(guild_id) return '', 204 @@ -166,8 +184,7 @@ async def create_channel(guild_id): raise NotImplementedError() - # TODO: fire Channel Create event - + await app.dispatcher.dispatch_guild(guild_id, 'CHANNEL_CREATE', channel) return jsonify(channel) diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index f0b9512..200ca47 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -1,4 +1,55 @@ +import collections +from typing import Any + +from logbook import Logger + +log = Logger(__name__) + class EventDispatcher: """Pub/Sub routines for litecord.""" - pass + def __init__(self, sm): + self.state_manager = sm + self.guild_buckets = collections.defaultdict(set) + + def sub_guild(self, guild_id: int, user_id: int): + """Subscribe to a guild's events, given the user ID.""" + self.guild_buckets[guild_id].add(user_id) + + def unsub_guild(self, guild_id: int, user_id: int): + """Unsubscribe from a guild, given user ID""" + self.guild_buckets[guild_id].discard(user_id) + + def remove_guild(self, guild_id): + """Reset the guild bucket.""" + self.guild_buckets[guild_id] = set() + + async def dispatch_guild(self, guild_id: int, + event_name: str, event_payload: Any): + """Dispatch an event to a guild""" + users = self.guild_buckets[guild_id] + dispatched = 0 + + log.info('Dispatching {} {!r} to {} users', + guild_id, event_name, len(users)) + + for user_id in set(users): + # fetch all connections that are tied to the guild, + # this includes all connections that are just a single shard + # and all shards that are nicely working + states = self.state_manager.fetch_states(user_id, guild_id) + + # if there are no more states tied to the guild, + # why keep the user as a subscriber? + if not states: + self.unsub_guild(guild_id, user_id) + continue + + # for each reasonable state/shard, dispatch event + for state in states: + # NOTE: maybe a separate task for that async? + await state.ws.dispatch(event_name, event_payload) + dispatched += 1 + + log.info('Dispatched {} {!r} to {} states', + guild_id, event_name, dispatched) diff --git a/litecord/gateway/state_manager.py b/litecord/gateway/state_manager.py index 5735943..67c4dd5 100644 --- a/litecord/gateway/state_manager.py +++ b/litecord/gateway/state_manager.py @@ -37,11 +37,14 @@ class StateManager: except KeyError: pass - def fetch_states(self, user_id, guild_id) -> List[GatewayState]: + def fetch_states(self, user_id: int, guild_id: int) -> List[GatewayState]: """Fetch all states that are tied to a guild.""" states = [] - for state in self.states[user_id]: + for state in self.states[user_id].values(): + # find out if we are the shard for the guild id + # this works if shard_count == 1 (the default for + # single gw connections) since N % 1 is always 0 shard_id = (guild_id >> 22) % state.shard_count if shard_id == state.current_shard: diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index c319bdb..d8dc530 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -204,6 +204,8 @@ class GatewayWebsocket: compress=compress, large=large, shard=shard, + current_shard=shard[0], + shard_count=shard[1], presence=presence, ws=self ) diff --git a/run.py b/run.py index 537cc87..c35516a 100644 --- a/run.py +++ b/run.py @@ -13,6 +13,7 @@ from litecord.gateway import websocket_handler from litecord.errors import LitecordError from litecord.gateway.state_manager import StateManager from litecord.storage import Storage +from litecord.dispatcher import EventDispatcher # setup logbook handler = StreamHandler(sys.stdout, level=logbook.INFO) @@ -49,6 +50,7 @@ async def app_before_serving(): g.loop = asyncio.get_event_loop() app.state_manager = StateManager() + app.dispatcher = EventDispatcher(app.state_manager) app.storage = Storage(app.db) # start the websocket, etc diff --git a/schema.sql b/schema.sql index 581d42c..9e9bf62 100644 --- a/schema.sql +++ b/schema.sql @@ -143,7 +143,7 @@ CREATE TABLE IF NOT EXISTS guilds ( CREATE TABLE IF NOT EXISTS guild_channels ( id bigint REFERENCES channels (id) PRIMARY KEY, - guild_id bigint REFERENCES guilds (id), + guild_id bigint REFERENCES guilds (id) ON DELETE CASCADE, -- an id to guild_channels parent_id bigint DEFAULT NULL, @@ -155,12 +155,12 @@ CREATE TABLE IF NOT EXISTS guild_channels ( CREATE TABLE IF NOT EXISTS guild_text_channels ( - id bigint REFERENCES guild_channels (id) PRIMARY KEY, + id bigint REFERENCES guild_channels (id) PRIMARY KEY ON DELETE CASCADE, topic text DEFAULT '' ); CREATE TABLE IF NOT EXISTS guild_voice_channels ( - id bigint REFERENCES guild_channels (id) PRIMARY KEY, + id bigint REFERENCES guild_channels (id) PRIMARY KEY ON DELETE CASCADE, -- default bitrate for discord is 64kbps bitrate int DEFAULT 64, @@ -176,7 +176,7 @@ CREATE TABLE IF NOT EXISTS dm_channels ( CREATE TABLE IF NOT EXISTS group_dm_channels ( - id bigint REFERENCES channels (id) PRIMARY KEY, + id bigint REFERENCES channels (id) PRIMARY KEY ON DELETE CASCADE, owner_id bigint REFERENCES users (id), icon bigint REFERENCES files (id) );