gateway.websocket: add presences field to ready payload

- presences: add PresenceManager.friend_presences
 - blueprints.auth: fix resend route
 - pubsub: add DispatcherWithState to decrease amount of
    repeated code between GuildDispatcher and ChannelDispatcher
 - storage: fix relationship id field
This commit is contained in:
Luna Mendes 2018-10-12 02:10:31 -03:00
parent c4db99aa9a
commit 46fac95979
10 changed files with 196 additions and 68 deletions

View File

@ -101,9 +101,11 @@ async def consent_required():
async def verify_user(): async def verify_user():
user_id = await token_check() user_id = await token_check()
# TODO: actually verify a user by sending an email
await app.db.execute(""" await app.db.execute("""
UPDATE users UPDATE users
SET verified = true SET verified = true
WHERE id = $1
""", user_id) """, user_id)
return '', 204 return '', 204

View File

@ -36,14 +36,16 @@ class EventDispatcher:
async def subscribe(self, backend: str, key: Any, identifier: Any): async def subscribe(self, backend: str, key: Any, identifier: Any):
"""Subscribe a single element to the given backend.""" """Subscribe a single element to the given backend."""
log.debug('SUB bacjend={} key={} <= id={}', log.debug('SUB backend={} key={} <= id={}',
backend, key, identifier, backend) backend, key, identifier, backend)
return await self.action(backend, 'sub', key, identifier) return await self.action(backend, 'sub', key, identifier)
async def unsubscribe(self, backend: str, key: Any, identifier: Any): async def unsubscribe(self, backend: str, key: Any, identifier: Any):
"""Unsubscribe an element from the given backend.""" """Unsubscribe an element from the given backend."""
log.debug('UNSUB bacjend={} key={} => id={}', log.debug('UNSUB backend={} key={} => id={}',
backend, key, identifier, backend) backend, key, identifier, backend)
return await self.action(backend, 'unsub', key, identifier) return await self.action(backend, 'unsub', key, identifier)
async def sub(self, backend, key, identifier): async def sub(self, backend, key, identifier):

View File

@ -10,16 +10,16 @@ import earl
import websockets import websockets
from logbook import Logger from logbook import Logger
from litecord.errors import WebsocketClose, Unauthorized, Forbidden from litecord.errors import WebsocketClose, Unauthorized, Forbidden, BadRequest
from litecord.auth import raw_token_check from litecord.auth import raw_token_check
from litecord.enums import RelationshipType
from litecord.schemas import validate, GW_STATUS_UPDATE
from litecord.utils import task_wrapper
from .errors import DecodeError, UnknownOPCode, \ from .errors import DecodeError, UnknownOPCode, \
InvalidShard, ShardingRequired InvalidShard, ShardingRequired
from .opcodes import OP from .opcodes import OP
from .state import GatewayState from .state import GatewayState
from ..errors import BadRequest
from ..schemas import validate, GW_STATUS_UPDATE
from ..utils import task_wrapper
log = Logger(__name__) log = Logger(__name__)
@ -205,10 +205,18 @@ class GatewayWebsocket:
user_id = self.state.user_id user_id = self.state.user_id
relationships = await self.storage.get_relationships(user_id)
friend_ids = [int(r['user']['id']) for r in relationships
if r['type'] == RelationshipType.FRIEND.value]
friend_presences = await self.ext.presence.friend_presences(friend_ids)
return { return {
'user_settings': await self.storage.get_user_settings(user_id), 'user_settings': await self.storage.get_user_settings(user_id),
'notes': await self.storage.fetch_notes(user_id), 'notes': await self.storage.fetch_notes(user_id),
'relationships': await self.storage.get_relationships(user_id), 'relationships': relationships,
'presences': friend_presences,
'read_state': await self.storage.get_read_state(user_id), 'read_state': await self.storage.get_read_state(user_id),
'friend_suggestion_count': 0, 'friend_suggestion_count': 0,
@ -216,12 +224,7 @@ class GatewayWebsocket:
# TODO # TODO
'user_guild_settings': [], 'user_guild_settings': [],
# TODO
'presences': [],
# TODO
'connected_accounts': [], 'connected_accounts': [],
'experiments': [], 'experiments': [],
'guild_experiments': [], 'guild_experiments': [],
'analytics_token': 'transbian', 'analytics_token': 'transbian',

View File

@ -1,4 +1,58 @@
from typing import List, Dict, Any from typing import List, Dict, Any
from random import choice
from quart import current_app as app
def status_cmp(status: str, other_status: str) -> bool:
"""Compare if `status` is better than the `other_status`
in the status hierarchy.
"""
hierarchy = {
'online': 3,
'idle': 2,
'dnd': 1,
'offline': 0,
None: -1,
}
return hierarchy[status] > hierarchy[other_status]
def _best_presence(shards):
"""Find the 'best' presence given a list of GatewayState."""
best = {'status': None, 'game': None}
for state in shards:
presence = state.presence
status = presence['status']
if not presence:
continue
# shards with a better status
# in the hierarchy are treated as best
if status_cmp(status, best['status']):
best['status'] = status
# if we have any game, use it
if presence['game'] is not None:
best['game'] = presence['game']
# best['status'] is None when no
# status was good enough.
return None if not best['status'] else best
async def _pres(storage, user_id: int, status_obj: dict) -> dict:
ext = {
'user': await storage.get_user(user_id),
'activities': [],
}
return {**status_obj, **ext}
class PresenceManager: class PresenceManager:
@ -70,3 +124,47 @@ class PresenceManager:
for guild_id in guild_ids: for guild_id in guild_ids:
await self.dispatch_guild_pres(guild_id, user_id, state) await self.dispatch_guild_pres(guild_id, user_id, state)
async def friend_presences(self, friend_ids: int) -> List[Dict[str, Any]]:
"""Fetch presences for a group of users.
This assumes the users are friends and so
only gets states that are single or have ID 0.
"""
storage = self.storage
res = []
for friend_id in friend_ids:
friend_states = self.state_manager.user_states(friend_id)
if not friend_states:
# append offline
res.append(await _pres(storage, friend_id, {
'afk': False,
'status': 'offline',
'game': None,
'since': 0
}))
continue
# filter the best shards:
# - all with id 0 (are the first shards in the collection) or
# - all shards with count = 1 (single shards)
good_shards = list(filter(
lambda state: state.shard[0] == 0 or state.shard[1] == 1,
friend_states
))
if good_shards:
best_pres = _best_presence(good_shards)
best_pres = await _pres(storage, friend_id, best_pres)
res.append(best_pres)
continue
# if there aren't any shards with id 0
# AND none that are single, just go with a random
shard = choice(friend_states)
res.append(await _pres(storage, friend_id, shard.presence))
return res

View File

@ -3,47 +3,32 @@ from collections import defaultdict
from logbook import Logger from logbook import Logger
from .dispatcher import Dispatcher from .dispatcher import DispatcherWithState
log = Logger(__name__) log = Logger(__name__)
class ChannelDispatcher(Dispatcher): class ChannelDispatcher(DispatcherWithState):
"""Main channel Pub/Sub logic.""" """Main channel Pub/Sub logic."""
def __init__(self, main): KEY_TYPE = int
super().__init__(main) VAL_TYPE = int
self.channels = defaultdict(set)
async def sub(self, channel_id: int, user_id: int):
self.channels[channel_id].add(user_id)
async def unsub(self, channel_id: int, user_id: int):
self.channels[channel_id].discard(user_id)
async def reset(self, channel_id: int):
self.channels[channel_id] = set()
async def remove(self, channel_id: int):
try:
self.channels.pop(channel_id)
except KeyError:
pass
async def dispatch(self, channel_id, async def dispatch(self, channel_id,
event: str, data: Any): event: str, data: Any):
user_ids = self.channels[channel_id] """Dispatch an event to a channel."""
user_ids = self.state[channel_id]
dispatched = 0 dispatched = 0
for user_id in set(user_ids): for user_id in set(user_ids):
guild_id = await self.app.storage.guild_from_channel(channel_id) guild_id = await self.app.storage.guild_from_channel(channel_id)
if guild_id: states = (self.sm.fetch_states(user_id, guild_id)
states = self.sm.fetch_states(user_id, guild_id) if guild_id else
else:
# TODO: maybe a fetch_states with guild_id 0 # TODO: use a fetch_states with guild_id 0
# to get the shards with id 0 AND the single shards? # or maybe something to fetch all shards
states = self.sm.user_states(user_id) # with id 0 and single shards
self.sm.user_states(user_id))
dispatched += await self._dispatch_states(states, event, data) dispatched += await self._dispatch_states(states, event, data)

View File

@ -1,10 +1,13 @@
from collections import defaultdict
from logbook import Logger from logbook import Logger
log = Logger(__name__) log = Logger(__name__)
class Dispatcher: class Dispatcher:
"""Main dispatcher class.""" """Pub/Sub backend dispatcher."""
# the _ parameter is for (self)
KEY_TYPE = lambda _, x: x KEY_TYPE = lambda _, x: x
VAL_TYPE = lambda _, x: x VAL_TYPE = lambda _, x: x
@ -14,21 +17,33 @@ class Dispatcher:
self.app = main.app self.app = main.app
async def sub(self, _key, _id): async def sub(self, _key, _id):
"""Subscribe an element to the channel/key."""
raise NotImplementedError raise NotImplementedError
async def unsub(self, _key, _id): async def unsub(self, _key, _id):
"""Unsubscribe an elemtnt from the channel/key."""
raise NotImplementedError raise NotImplementedError
async def dispatch(self, _key, *_args, **_kwargs): async def dispatch(self, _key, *_args):
"""Dispatch an event to the given channel/key."""
raise NotImplementedError raise NotImplementedError
async def reset(self, _key): async def reset(self, _key):
"""Reset a key from the backend."""
raise NotImplementedError raise NotImplementedError
async def remove(self, _key): async def remove(self, _key):
"""Remove a key from the backend.
The meaning from reset() and remove()
is different, reset() is to clear all
subscribers from the given key,
remove() is to remove the key as well.
"""
raise NotImplementedError raise NotImplementedError
async def _dispatch_states(self, states: list, event: str, data) -> int: async def _dispatch_states(self, states: list, event: str, data) -> int:
"""Dispatch an event to a list of states."""
dispatched = 0 dispatched = 0
for state in states: for state in states:
@ -39,3 +54,34 @@ class Dispatcher:
log.exception('error while dispatching') log.exception('error while dispatching')
return dispatched return dispatched
class DispatcherWithState(Dispatcher):
"""Pub/Sub backend with a state dictionary.
This class was made to decrease the amount
of boilerplate code on Pub/Sub backends
that have that dictionary.
"""
def __init__(self, main):
super().__init__(main)
self.state = defaultdict(set)
async def sub(self, key, identifier):
self.state[key].add(identifier)
async def unsub(self, key, identifier):
self.state[key].discard(identifier)
async def reset(self, key):
self.state[key] = set()
async def remove(self, key):
try:
self.state.pop(key)
except KeyError:
pass
async def dispatch(self, key, *args):
raise NotImplementedError

View File

@ -3,20 +3,16 @@ from typing import Any
from logbook import Logger from logbook import Logger
from .dispatcher import Dispatcher from .dispatcher import DispatcherWithState
log = Logger(__name__) log = Logger(__name__)
class GuildDispatcher(Dispatcher): class GuildDispatcher(DispatcherWithState):
"""Guild backend for Pub/Sub""" """Guild backend for Pub/Sub"""
KEY_TYPE = int KEY_TYPE = int
VAL_TYPE = int VAL_TYPE = int
def __init__(self, main):
super().__init__(main)
self.guild_buckets = defaultdict(set)
async def _chan_action(self, action: str, guild_id: int, user_id: int): async def _chan_action(self, action: str, guild_id: int, user_id: int):
chan_ids = await self.app.storage.get_channel_ids(guild_id) chan_ids = await self.app.storage.get_channel_ids(guild_id)
@ -41,7 +37,8 @@ class GuildDispatcher(Dispatcher):
await method(chan_id, *args) await method(chan_id, *args)
async def sub(self, guild_id: int, user_id: int): async def sub(self, guild_id: int, user_id: int):
self.guild_buckets[guild_id].add(user_id) """Subscribe a user to the guild."""
await super().sub(guild_id, user_id)
# when subbing a user to the guild, we should sub them # when subbing a user to the guild, we should sub them
# to every channel they have access to, in the guild. # to every channel they have access to, in the guild.
@ -49,24 +46,14 @@ class GuildDispatcher(Dispatcher):
await self._chan_action('sub', guild_id, user_id) await self._chan_action('sub', guild_id, user_id)
async def unsub(self, guild_id: int, user_id: int): async def unsub(self, guild_id: int, user_id: int):
self.guild_buckets[guild_id].discard(user_id) """Unsubscribe a user from the guild."""
await super().unsub(guild_id, user_id)
await self._chan_action('unsub', guild_id, user_id) await self._chan_action('unsub', guild_id, user_id)
async def reset(self, guild_id: int):
self.guild_buckets[guild_id] = set()
await self._chan_call(guild_id, 'reset')
async def remove(self, guild_id: int):
try:
self.guild_buckets.pop(guild_id)
except KeyError:
pass
await self._chan_call(guild_id, 'remove')
async def dispatch(self, guild_id: int, async def dispatch(self, guild_id: int,
event_name: str, event_payload: Any): event: str, data: Any):
user_ids = self.guild_buckets[guild_id] """Dispatch an event to all subscribers of the guild."""
user_ids = self.state[guild_id]
dispatched = 0 dispatched = 0
# acquire a copy since we will be modifying # acquire a copy since we will be modifying
@ -83,7 +70,7 @@ class GuildDispatcher(Dispatcher):
continue continue
dispatched += await self._dispatch_states( dispatched += await self._dispatch_states(
states, event_name, event_payload) states, event, data)
log.info('Dispatched {} {!r} to {} states', log.info('Dispatched {} {!r} to {} states',
guild_id, event_name, dispatched) guild_id, event, dispatched)

View File

@ -2,6 +2,7 @@ from .dispatcher import Dispatcher
class MemberDispatcher(Dispatcher): class MemberDispatcher(Dispatcher):
"""Member backend for Pub/Sub."""
KEY_TYPE = tuple KEY_TYPE = tuple
async def dispatch(self, key, event, data): async def dispatch(self, key, event, data):
@ -14,6 +15,8 @@ class MemberDispatcher(Dispatcher):
# fetch shards # fetch shards
states = self.sm.fetch_states(user_id, guild_id) states = self.sm.fetch_states(user_id, guild_id)
# if no states were found, we should
# unsub the user from the channel
if not states: if not states:
await self.main_dispatcher.unsub('guild', guild_id, user_id) await self.main_dispatcher.unsub('guild', guild_id, user_id)
return return

View File

@ -2,8 +2,10 @@ from .dispatcher import Dispatcher
class UserDispatcher(Dispatcher): class UserDispatcher(Dispatcher):
"""User backend for Pub/Sub."""
KEY_TYPE = int KEY_TYPE = int
async def dispatch(self, user_id: int, event, data): async def dispatch(self, user_id: int, event, data):
"""Dispatch an event to all shards of a user."""
states = self.sm.user_states(user_id) states = self.sm.user_states(user_id)
return await self._dispatch_states(states, event, data) return await self._dispatch_states(states, event, data)

View File

@ -737,12 +737,12 @@ class Storage:
for drow in friends: for drow in friends:
drow['type'] = drow['rel_type'] drow['type'] = drow['rel_type']
drow['id'] = str(drow['peer_id'])
drow.pop('rel_type') drow.pop('rel_type')
# check if the receiver is a mutual # check if the receiver is a mutual
# if it isnt, its still on a friend request stage # if it isnt, its still on a friend request stage
if drow['peer_id'] not in mutuals: if drow['peer_id'] not in mutuals:
drow['id'] = str(drow['peer_id'])
drow['type'] = _outgoing drow['type'] = _outgoing
drow['user'] = await self.get_user(drow['peer_id']) drow['user'] = await self.get_user(drow['peer_id'])