diff --git a/litecord/blueprints/auth.py b/litecord/blueprints/auth.py index 9f0428e..a9c0ea3 100644 --- a/litecord/blueprints/auth.py +++ b/litecord/blueprints/auth.py @@ -101,9 +101,11 @@ async def consent_required(): async def verify_user(): user_id = await token_check() + # TODO: actually verify a user by sending an email await app.db.execute(""" UPDATE users SET verified = true + WHERE id = $1 """, user_id) return '', 204 diff --git a/litecord/dispatcher.py b/litecord/dispatcher.py index f821f01..aa5b12d 100644 --- a/litecord/dispatcher.py +++ b/litecord/dispatcher.py @@ -36,14 +36,16 @@ class EventDispatcher: async def subscribe(self, backend: str, key: Any, identifier: Any): """Subscribe a single element to the given backend.""" - log.debug('SUB bacjend={} key={} <= id={}', + log.debug('SUB backend={} key={} <= id={}', backend, key, identifier, backend) + return await self.action(backend, 'sub', key, identifier) async def unsubscribe(self, backend: str, key: Any, identifier: Any): """Unsubscribe an element from the given backend.""" - log.debug('UNSUB bacjend={} key={} => id={}', + log.debug('UNSUB backend={} key={} => id={}', backend, key, identifier, backend) + return await self.action(backend, 'unsub', key, identifier) async def sub(self, backend, key, identifier): diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index fdcdd27..3c8eb2f 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -10,16 +10,16 @@ import earl import websockets from logbook import Logger -from litecord.errors import WebsocketClose, Unauthorized, Forbidden +from litecord.errors import WebsocketClose, Unauthorized, Forbidden, BadRequest from litecord.auth import raw_token_check +from litecord.enums import RelationshipType +from litecord.schemas import validate, GW_STATUS_UPDATE +from litecord.utils import task_wrapper + from .errors import DecodeError, UnknownOPCode, \ InvalidShard, ShardingRequired from .opcodes import OP from .state import GatewayState -from ..errors import BadRequest - -from ..schemas import validate, GW_STATUS_UPDATE -from ..utils import task_wrapper log = Logger(__name__) @@ -205,10 +205,18 @@ class GatewayWebsocket: user_id = self.state.user_id + relationships = await self.storage.get_relationships(user_id) + + friend_ids = [int(r['user']['id']) for r in relationships + if r['type'] == RelationshipType.FRIEND.value] + + friend_presences = await self.ext.presence.friend_presences(friend_ids) + return { 'user_settings': await self.storage.get_user_settings(user_id), 'notes': await self.storage.fetch_notes(user_id), - 'relationships': await self.storage.get_relationships(user_id), + 'relationships': relationships, + 'presences': friend_presences, 'read_state': await self.storage.get_read_state(user_id), 'friend_suggestion_count': 0, @@ -216,12 +224,7 @@ class GatewayWebsocket: # TODO 'user_guild_settings': [], - # TODO - 'presences': [], - - # TODO 'connected_accounts': [], - 'experiments': [], 'guild_experiments': [], 'analytics_token': 'transbian', diff --git a/litecord/presence.py b/litecord/presence.py index 1eba7f8..45a9bdd 100644 --- a/litecord/presence.py +++ b/litecord/presence.py @@ -1,4 +1,58 @@ from typing import List, Dict, Any +from random import choice + +from quart import current_app as app + + +def status_cmp(status: str, other_status: str) -> bool: + """Compare if `status` is better than the `other_status` + in the status hierarchy. + """ + + hierarchy = { + 'online': 3, + 'idle': 2, + 'dnd': 1, + 'offline': 0, + None: -1, + } + + return hierarchy[status] > hierarchy[other_status] + + +def _best_presence(shards): + """Find the 'best' presence given a list of GatewayState.""" + best = {'status': None, 'game': None} + + for state in shards: + presence = state.presence + + status = presence['status'] + + if not presence: + continue + + # shards with a better status + # in the hierarchy are treated as best + if status_cmp(status, best['status']): + best['status'] = status + + # if we have any game, use it + if presence['game'] is not None: + best['game'] = presence['game'] + + # best['status'] is None when no + # status was good enough. + return None if not best['status'] else best + + +async def _pres(storage, user_id: int, status_obj: dict) -> dict: + ext = { + 'user': await storage.get_user(user_id), + 'activities': [], + } + + return {**status_obj, **ext} class PresenceManager: @@ -70,3 +124,47 @@ class PresenceManager: for guild_id in guild_ids: await self.dispatch_guild_pres(guild_id, user_id, state) + + async def friend_presences(self, friend_ids: int) -> List[Dict[str, Any]]: + """Fetch presences for a group of users. + + This assumes the users are friends and so + only gets states that are single or have ID 0. + """ + storage = self.storage + res = [] + + for friend_id in friend_ids: + friend_states = self.state_manager.user_states(friend_id) + + if not friend_states: + # append offline + res.append(await _pres(storage, friend_id, { + 'afk': False, + 'status': 'offline', + 'game': None, + 'since': 0 + })) + + continue + + # filter the best shards: + # - all with id 0 (are the first shards in the collection) or + # - all shards with count = 1 (single shards) + good_shards = list(filter( + lambda state: state.shard[0] == 0 or state.shard[1] == 1, + friend_states + )) + + if good_shards: + best_pres = _best_presence(good_shards) + best_pres = await _pres(storage, friend_id, best_pres) + res.append(best_pres) + continue + + # if there aren't any shards with id 0 + # AND none that are single, just go with a random + shard = choice(friend_states) + res.append(await _pres(storage, friend_id, shard.presence)) + + return res diff --git a/litecord/pubsub/channel.py b/litecord/pubsub/channel.py index 4b722a4..9991510 100644 --- a/litecord/pubsub/channel.py +++ b/litecord/pubsub/channel.py @@ -3,47 +3,32 @@ from collections import defaultdict from logbook import Logger -from .dispatcher import Dispatcher +from .dispatcher import DispatcherWithState log = Logger(__name__) -class ChannelDispatcher(Dispatcher): +class ChannelDispatcher(DispatcherWithState): """Main channel Pub/Sub logic.""" - def __init__(self, main): - super().__init__(main) - - self.channels = defaultdict(set) - - async def sub(self, channel_id: int, user_id: int): - self.channels[channel_id].add(user_id) - - async def unsub(self, channel_id: int, user_id: int): - self.channels[channel_id].discard(user_id) - - async def reset(self, channel_id: int): - self.channels[channel_id] = set() - - async def remove(self, channel_id: int): - try: - self.channels.pop(channel_id) - except KeyError: - pass + KEY_TYPE = int + VAL_TYPE = int async def dispatch(self, channel_id, event: str, data: Any): - user_ids = self.channels[channel_id] + """Dispatch an event to a channel.""" + user_ids = self.state[channel_id] dispatched = 0 for user_id in set(user_ids): guild_id = await self.app.storage.guild_from_channel(channel_id) - if guild_id: - states = self.sm.fetch_states(user_id, guild_id) - else: - # TODO: maybe a fetch_states with guild_id 0 - # to get the shards with id 0 AND the single shards? - states = self.sm.user_states(user_id) + states = (self.sm.fetch_states(user_id, guild_id) + if guild_id else + + # TODO: use a fetch_states with guild_id 0 + # or maybe something to fetch all shards + # with id 0 and single shards + self.sm.user_states(user_id)) dispatched += await self._dispatch_states(states, event, data) diff --git a/litecord/pubsub/dispatcher.py b/litecord/pubsub/dispatcher.py index f15f0fb..0eaad6a 100644 --- a/litecord/pubsub/dispatcher.py +++ b/litecord/pubsub/dispatcher.py @@ -1,10 +1,13 @@ +from collections import defaultdict from logbook import Logger log = Logger(__name__) class Dispatcher: - """Main dispatcher class.""" + """Pub/Sub backend dispatcher.""" + + # the _ parameter is for (self) KEY_TYPE = lambda _, x: x VAL_TYPE = lambda _, x: x @@ -14,21 +17,33 @@ class Dispatcher: self.app = main.app async def sub(self, _key, _id): + """Subscribe an element to the channel/key.""" raise NotImplementedError async def unsub(self, _key, _id): + """Unsubscribe an elemtnt from the channel/key.""" raise NotImplementedError - async def dispatch(self, _key, *_args, **_kwargs): + async def dispatch(self, _key, *_args): + """Dispatch an event to the given channel/key.""" raise NotImplementedError async def reset(self, _key): + """Reset a key from the backend.""" raise NotImplementedError async def remove(self, _key): + """Remove a key from the backend. + + The meaning from reset() and remove() + is different, reset() is to clear all + subscribers from the given key, + remove() is to remove the key as well. + """ raise NotImplementedError async def _dispatch_states(self, states: list, event: str, data) -> int: + """Dispatch an event to a list of states.""" dispatched = 0 for state in states: @@ -39,3 +54,34 @@ class Dispatcher: log.exception('error while dispatching') return dispatched + + +class DispatcherWithState(Dispatcher): + """Pub/Sub backend with a state dictionary. + + This class was made to decrease the amount + of boilerplate code on Pub/Sub backends + that have that dictionary. + """ + def __init__(self, main): + super().__init__(main) + + self.state = defaultdict(set) + + async def sub(self, key, identifier): + self.state[key].add(identifier) + + async def unsub(self, key, identifier): + self.state[key].discard(identifier) + + async def reset(self, key): + self.state[key] = set() + + async def remove(self, key): + try: + self.state.pop(key) + except KeyError: + pass + + async def dispatch(self, key, *args): + raise NotImplementedError diff --git a/litecord/pubsub/guild.py b/litecord/pubsub/guild.py index df97bdb..add72f2 100644 --- a/litecord/pubsub/guild.py +++ b/litecord/pubsub/guild.py @@ -3,20 +3,16 @@ from typing import Any from logbook import Logger -from .dispatcher import Dispatcher +from .dispatcher import DispatcherWithState log = Logger(__name__) -class GuildDispatcher(Dispatcher): +class GuildDispatcher(DispatcherWithState): """Guild backend for Pub/Sub""" KEY_TYPE = int VAL_TYPE = int - def __init__(self, main): - super().__init__(main) - self.guild_buckets = defaultdict(set) - async def _chan_action(self, action: str, guild_id: int, user_id: int): chan_ids = await self.app.storage.get_channel_ids(guild_id) @@ -41,7 +37,8 @@ class GuildDispatcher(Dispatcher): await method(chan_id, *args) async def sub(self, guild_id: int, user_id: int): - self.guild_buckets[guild_id].add(user_id) + """Subscribe a user to the guild.""" + await super().sub(guild_id, user_id) # when subbing a user to the guild, we should sub them # to every channel they have access to, in the guild. @@ -49,24 +46,14 @@ class GuildDispatcher(Dispatcher): await self._chan_action('sub', guild_id, user_id) async def unsub(self, guild_id: int, user_id: int): - self.guild_buckets[guild_id].discard(user_id) + """Unsubscribe a user from the guild.""" + await super().unsub(guild_id, user_id) await self._chan_action('unsub', guild_id, user_id) - async def reset(self, guild_id: int): - self.guild_buckets[guild_id] = set() - await self._chan_call(guild_id, 'reset') - - async def remove(self, guild_id: int): - try: - self.guild_buckets.pop(guild_id) - except KeyError: - pass - - await self._chan_call(guild_id, 'remove') - async def dispatch(self, guild_id: int, - event_name: str, event_payload: Any): - user_ids = self.guild_buckets[guild_id] + event: str, data: Any): + """Dispatch an event to all subscribers of the guild.""" + user_ids = self.state[guild_id] dispatched = 0 # acquire a copy since we will be modifying @@ -83,7 +70,7 @@ class GuildDispatcher(Dispatcher): continue dispatched += await self._dispatch_states( - states, event_name, event_payload) + states, event, data) log.info('Dispatched {} {!r} to {} states', - guild_id, event_name, dispatched) + guild_id, event, dispatched) diff --git a/litecord/pubsub/member.py b/litecord/pubsub/member.py index bdb61b2..b997417 100644 --- a/litecord/pubsub/member.py +++ b/litecord/pubsub/member.py @@ -2,6 +2,7 @@ from .dispatcher import Dispatcher class MemberDispatcher(Dispatcher): + """Member backend for Pub/Sub.""" KEY_TYPE = tuple async def dispatch(self, key, event, data): @@ -14,6 +15,8 @@ class MemberDispatcher(Dispatcher): # fetch shards states = self.sm.fetch_states(user_id, guild_id) + # if no states were found, we should + # unsub the user from the channel if not states: await self.main_dispatcher.unsub('guild', guild_id, user_id) return diff --git a/litecord/pubsub/user.py b/litecord/pubsub/user.py index 46b3526..5187f71 100644 --- a/litecord/pubsub/user.py +++ b/litecord/pubsub/user.py @@ -2,8 +2,10 @@ from .dispatcher import Dispatcher class UserDispatcher(Dispatcher): + """User backend for Pub/Sub.""" KEY_TYPE = int async def dispatch(self, user_id: int, event, data): + """Dispatch an event to all shards of a user.""" states = self.sm.user_states(user_id) return await self._dispatch_states(states, event, data) diff --git a/litecord/storage.py b/litecord/storage.py index d52ff9c..59789ce 100644 --- a/litecord/storage.py +++ b/litecord/storage.py @@ -737,12 +737,12 @@ class Storage: for drow in friends: drow['type'] = drow['rel_type'] + drow['id'] = str(drow['peer_id']) drow.pop('rel_type') # check if the receiver is a mutual # if it isnt, its still on a friend request stage if drow['peer_id'] not in mutuals: - drow['id'] = str(drow['peer_id']) drow['type'] = _outgoing drow['user'] = await self.get_user(drow['peer_id'])