embed.sanitizer: full refactor

- refactor the three main mediaproxy api client functions into a
    template function
 - simplify common code between everyone
 - add typings to fill_embed()

 - embed.schemas: add querystring to EmbedURL.to_md_path
This commit is contained in:
Luna 2019-04-25 22:12:04 -03:00
parent c9a7b3dfb5
commit ff9d421a3c
2 changed files with 107 additions and 75 deletions

View File

@ -22,7 +22,8 @@ litecord.embed.sanitizer
sanitize embeds by giving common values
such as type: rich
"""
from typing import Dict, Any, Optional, Union, List
import urllib.parse
from typing import Dict, Any, Optional, Union, List, Tuple
from logbook import Logger
from quart import current_app as app
@ -76,27 +77,9 @@ def path_exists(embed: Embed, components_in: Union[List[str], str]):
return False
def proxify(url, *, config=None) -> str:
"""Return a mediaproxy url for the given EmbedURL."""
if not config:
config = app.config
if isinstance(url, str):
url = EmbedURL(url)
md_base_url = config['MEDIA_PROXY']
parsed = url.parsed
proto = 'https' if config['IS_SSL'] else 'http'
return (
# base mediaproxy url
f'{proto}://{md_base_url}/img/'
f'{parsed.scheme}/{parsed.netloc}{parsed.path}'
)
def _mk_cfg_sess(config, session) -> tuple:
"""Return a tuple of (config, session)."""
if config is None:
config = app.config
@ -107,86 +90,132 @@ def _mk_cfg_sess(config, session) -> tuple:
def _md_base(config) -> tuple:
"""Return the protocol and base url for the mediaproxy."""
md_base_url = config['MEDIA_PROXY']
proto = 'https' if config['IS_SSL'] else 'http'
return proto, md_base_url
async def fetch_metadata(url, *, config=None, session=None) -> Optional[Dict]:
"""Fetch metadata for a url."""
def _make_md_req_url(config, scope: str, url):
"""Make a mediaproxy request URL given the config, scope, and the url
to be proxied."""
proto, base_url = _md_base(config)
return f'{proto}://{base_url}/{scope}/{url.to_md_path}'
def proxify(url, *, config=None) -> str:
"""Return a mediaproxy url for the given EmbedURL. Returns an
/img/ scope."""
config, _sess = _mk_cfg_sess(config, None)
if isinstance(url, str):
url = EmbedURL(url)
return _make_md_req_url(config, 'img', url)
async def _md_client_req(config, session, scope: str,
url, *, ret_resp=False) -> Optional[Union[Tuple, Dict]]:
"""Makes a request to the mediaproxy.
This has common code between all the main mediaproxy request functions
to decrease code repetition.
Note that config and session exist because there are cases where the app
isn't retrievable (as those functions usually run in background tasks,
not in the app itself).
Parameters
----------
config: dict-like
the app configuration, if None, this will get the global one from the
app instance.
session: aiohttp client session
the aiohttp ClientSession instance to use, if None, this will get
the global one from the app.
scope: str
the scope of your request. one of 'meta', 'img', or 'embed' are
available for the mediaproxy's API.
url: string or EmbedURL
the url in question to give to the mediaproxy.
ret_resp: bool, default false
if this function returns the response and its bytes as a tuple, instead
of the raw json object. used by 'img' scope to proxy images, as we want
the raw bytes of the response, but by the time this function is
returned, the response object is invalid and the socket is closed
"""
config, session = _mk_cfg_sess(config, session)
if not isinstance(url, EmbedURL):
url = EmbedURL(url)
proto, md_base_url = _md_base(config)
request_url = f'{proto}://{md_base_url}/meta/{url.to_md_path}'
request_url = _make_md_req_url(config, scope, url)
async with session.get(request_url) as resp:
if resp.status != 200:
body = await resp.text()
if resp.status == 200:
if ret_resp:
return resp, await resp.read()
log.warning('failed to generate meta for {!r}: {} {!r}',
url, resp.status, body)
return None
return await resp.json()
return await resp.json()
body = await resp.text()
log.warning('failed to call {!r}, {} {!r}',
request_url, resp.status, body)
return None
async def fetch_metadata(url, *, config=None,
session=None) -> Optional[Dict]:
"""Fetch metadata for a url (image width, mime, etc)."""
return await _md_client_req(
config, session, 'meta', url
)
async def fetch_raw_img(url, *, config=None, session=None) -> Optional[tuple]:
"""Fetch metadata for a url."""
config, session = _mk_cfg_sess(config, session)
"""Fetch raw data for a url (the bytes given off, used to proxy images).
if not isinstance(url, EmbedURL):
url = EmbedURL(url)
Returns a tuple containing the response object and the raw bytes given by
the website.
"""
tup = await _md_client_req(
config, session, 'img', url, ret_resp=True
)
proto, md_base_url = _md_base(config)
# NOTE: the img, instead of /meta/.
request_url = f'{proto}://{md_base_url}/img/{url.to_md_path}'
if not tup:
return None
async with session.get(request_url) as resp:
if resp.status != 200:
body = await resp.text()
log.warning('failed to get img for {!r}: {} {!r}',
url, resp.status, body)
return None
return resp, await resp.read()
return tup
async def fetch_embed(url, *, config=None, session=None) -> dict:
"""Fetch an embed"""
config, session = _mk_cfg_sess(config, session)
async def fetch_embed(url, *, config=None, session=None) -> Dict[str, Any]:
"""Fetch an embed for a given webpage (an automatically generated embed
by the mediaproxy, look over the project on how it generates embeds).
if not isinstance(url, EmbedURL):
url = EmbedURL(url)
parsed = url.parsed
# TODO: handle query string
md_path = f'{parsed.scheme}/{parsed.netloc}{parsed.path}'
md_base_url = config['MEDIA_PROXY']
secure = 's' if config['IS_SSL'] else ''
request_url = f'http{secure}://{md_base_url}/embed/{md_path}'
async with session.get(request_url) as resp:
if resp.status != 200:
body = await resp.text()
log.warning('failed to embed {!r}, {} {!r}',
parsed, resp.status, body)
return
return await resp.json()
Returns a discord embed object.
"""
return await _md_client_req(
config, session, 'embed', url
)
async def fill_embed(embed: Embed) -> Embed:
"""Fill an embed with more information, such as proxy URLs."""
async def fill_embed(embed: Optional[Embed]) -> Optional[Embed]:
"""Fill an embed with more information, such as proxy URLs.
Uses path_exists() to check if a given element exists in an embed by
checking if its parent fields also exist, which is why we do
`path_exists(embed, 'footer.icon_url')`
instead of
`embed.get('icon_url', embed.get('footer', {}))`.
Uses the proxify function so that clients don't directly contact websites
in embeds and instead use the mediaproxy.
"""
if embed is None:
return
return None
embed = sanitize_embed(embed)

View File

@ -47,9 +47,12 @@ class EmbedURL:
@property
def to_md_path(self) -> str:
"""Convert the EmbedURL to a mediaproxy path."""
"""Convert the EmbedURL to a mediaproxy path (post img/meta)."""
parsed = self.parsed
return f'{parsed.scheme}/{parsed.netloc}{parsed.path}'
return (
f'{parsed.scheme}/{parsed.netloc}'
f'{parsed.path}?{parsed.query}'
)
EMBED_FOOTER = {