diff --git a/litecord/embed/sanitizer.py b/litecord/embed/sanitizer.py index c04e208..b6b3570 100644 --- a/litecord/embed/sanitizer.py +++ b/litecord/embed/sanitizer.py @@ -22,7 +22,8 @@ litecord.embed.sanitizer sanitize embeds by giving common values such as type: rich """ -from typing import Dict, Any, Optional, Union, List +import urllib.parse +from typing import Dict, Any, Optional, Union, List, Tuple from logbook import Logger from quart import current_app as app @@ -76,27 +77,9 @@ def path_exists(embed: Embed, components_in: Union[List[str], str]): return False -def proxify(url, *, config=None) -> str: - """Return a mediaproxy url for the given EmbedURL.""" - - if not config: - config = app.config - - if isinstance(url, str): - url = EmbedURL(url) - - md_base_url = config['MEDIA_PROXY'] - parsed = url.parsed - proto = 'https' if config['IS_SSL'] else 'http' - - return ( - # base mediaproxy url - f'{proto}://{md_base_url}/img/' - f'{parsed.scheme}/{parsed.netloc}{parsed.path}' - ) - def _mk_cfg_sess(config, session) -> tuple: + """Return a tuple of (config, session).""" if config is None: config = app.config @@ -107,86 +90,132 @@ def _mk_cfg_sess(config, session) -> tuple: def _md_base(config) -> tuple: + """Return the protocol and base url for the mediaproxy.""" md_base_url = config['MEDIA_PROXY'] proto = 'https' if config['IS_SSL'] else 'http' return proto, md_base_url -async def fetch_metadata(url, *, config=None, session=None) -> Optional[Dict]: - """Fetch metadata for a url.""" +def _make_md_req_url(config, scope: str, url): + """Make a mediaproxy request URL given the config, scope, and the url + to be proxied.""" + proto, base_url = _md_base(config) + return f'{proto}://{base_url}/{scope}/{url.to_md_path}' + + +def proxify(url, *, config=None) -> str: + """Return a mediaproxy url for the given EmbedURL. Returns an + /img/ scope.""" + config, _sess = _mk_cfg_sess(config, None) + + if isinstance(url, str): + url = EmbedURL(url) + + return _make_md_req_url(config, 'img', url) + + +async def _md_client_req(config, session, scope: str, + url, *, ret_resp=False) -> Optional[Union[Tuple, Dict]]: + """Makes a request to the mediaproxy. + + This has common code between all the main mediaproxy request functions + to decrease code repetition. + + Note that config and session exist because there are cases where the app + isn't retrievable (as those functions usually run in background tasks, + not in the app itself). + + Parameters + ---------- + config: dict-like + the app configuration, if None, this will get the global one from the + app instance. + session: aiohttp client session + the aiohttp ClientSession instance to use, if None, this will get + the global one from the app. + + scope: str + the scope of your request. one of 'meta', 'img', or 'embed' are + available for the mediaproxy's API. + url: string or EmbedURL + the url in question to give to the mediaproxy. + + ret_resp: bool, default false + if this function returns the response and its bytes as a tuple, instead + of the raw json object. used by 'img' scope to proxy images, as we want + the raw bytes of the response, but by the time this function is + returned, the response object is invalid and the socket is closed + """ config, session = _mk_cfg_sess(config, session) if not isinstance(url, EmbedURL): url = EmbedURL(url) - proto, md_base_url = _md_base(config) - request_url = f'{proto}://{md_base_url}/meta/{url.to_md_path}' + request_url = _make_md_req_url(config, scope, url) async with session.get(request_url) as resp: - if resp.status != 200: - body = await resp.text() + if resp.status == 200: + if ret_resp: + return resp, await resp.read() - log.warning('failed to generate meta for {!r}: {} {!r}', - url, resp.status, body) - return None + return await resp.json() - return await resp.json() + body = await resp.text() + log.warning('failed to call {!r}, {} {!r}', + request_url, resp.status, body) + return None + + +async def fetch_metadata(url, *, config=None, + session=None) -> Optional[Dict]: + """Fetch metadata for a url (image width, mime, etc).""" + return await _md_client_req( + config, session, 'meta', url + ) async def fetch_raw_img(url, *, config=None, session=None) -> Optional[tuple]: - """Fetch metadata for a url.""" - config, session = _mk_cfg_sess(config, session) + """Fetch raw data for a url (the bytes given off, used to proxy images). - if not isinstance(url, EmbedURL): - url = EmbedURL(url) + Returns a tuple containing the response object and the raw bytes given by + the website. + """ + tup = await _md_client_req( + config, session, 'img', url, ret_resp=True + ) - proto, md_base_url = _md_base(config) - # NOTE: the img, instead of /meta/. - request_url = f'{proto}://{md_base_url}/img/{url.to_md_path}' + if not tup: + return None - async with session.get(request_url) as resp: - if resp.status != 200: - body = await resp.text() - - log.warning('failed to get img for {!r}: {} {!r}', - url, resp.status, body) - return None - - return resp, await resp.read() + return tup -async def fetch_embed(url, *, config=None, session=None) -> dict: - """Fetch an embed""" - config, session = _mk_cfg_sess(config, session) +async def fetch_embed(url, *, config=None, session=None) -> Dict[str, Any]: + """Fetch an embed for a given webpage (an automatically generated embed + by the mediaproxy, look over the project on how it generates embeds). - if not isinstance(url, EmbedURL): - url = EmbedURL(url) - - parsed = url.parsed - - # TODO: handle query string - md_path = f'{parsed.scheme}/{parsed.netloc}{parsed.path}' - - md_base_url = config['MEDIA_PROXY'] - secure = 's' if config['IS_SSL'] else '' - - request_url = f'http{secure}://{md_base_url}/embed/{md_path}' - - async with session.get(request_url) as resp: - if resp.status != 200: - body = await resp.text() - log.warning('failed to embed {!r}, {} {!r}', - parsed, resp.status, body) - return - - return await resp.json() + Returns a discord embed object. + """ + return await _md_client_req( + config, session, 'embed', url + ) -async def fill_embed(embed: Embed) -> Embed: - """Fill an embed with more information, such as proxy URLs.""" +async def fill_embed(embed: Optional[Embed]) -> Optional[Embed]: + """Fill an embed with more information, such as proxy URLs. + + Uses path_exists() to check if a given element exists in an embed by + checking if its parent fields also exist, which is why we do + `path_exists(embed, 'footer.icon_url')` + instead of + `embed.get('icon_url', embed.get('footer', {}))`. + + Uses the proxify function so that clients don't directly contact websites + in embeds and instead use the mediaproxy. + """ if embed is None: - return + return None embed = sanitize_embed(embed) diff --git a/litecord/embed/schemas.py b/litecord/embed/schemas.py index 819e072..ef60702 100644 --- a/litecord/embed/schemas.py +++ b/litecord/embed/schemas.py @@ -47,9 +47,12 @@ class EmbedURL: @property def to_md_path(self) -> str: - """Convert the EmbedURL to a mediaproxy path.""" + """Convert the EmbedURL to a mediaproxy path (post img/meta).""" parsed = self.parsed - return f'{parsed.scheme}/{parsed.netloc}{parsed.path}' + return ( + f'{parsed.scheme}/{parsed.netloc}' + f'{parsed.path}?{parsed.query}' + ) EMBED_FOOTER = {