mirror of https://gitlab.com/litecord/litecord.git
Merge branch 'enhance/tests' into 'master'
enhance tests Closes #51 See merge request litecord/litecord!42
This commit is contained in:
commit
68e0ab186c
|
|
@ -20,5 +20,4 @@ tests:
|
||||||
- ls
|
- ls
|
||||||
- cp config.ci.py config.py
|
- cp config.ci.py config.py
|
||||||
- pipenv run ./manage.py migrate
|
- pipenv run ./manage.py migrate
|
||||||
- pipenv run ./manage.py setup_tests
|
|
||||||
- tox
|
- tox
|
||||||
|
|
|
||||||
|
|
@ -145,13 +145,7 @@ $ pipenv run ./manage.py migrate
|
||||||
|
|
||||||
## Running tests
|
## Running tests
|
||||||
|
|
||||||
Running tests involves creating dummy users with known passwords. Because of
|
|
||||||
this, you should never setup a testing environment in production.
|
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
# Setup any testing users:
|
|
||||||
$ pipenv run ./manage.py setup_tests
|
|
||||||
|
|
||||||
# Install tox:
|
# Install tox:
|
||||||
$ pip install tox
|
$ pip install tox
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,54 +0,0 @@
|
||||||
"""
|
|
||||||
|
|
||||||
Litecord
|
|
||||||
Copyright (C) 2018-2019 Luna Mendes
|
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation, version 3 of the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
from tests.credentials import CREDS
|
|
||||||
from litecord.blueprints.auth import create_user
|
|
||||||
from manage.cmd.users import set_user_staff
|
|
||||||
|
|
||||||
|
|
||||||
async def setup_tests(ctx, _args):
|
|
||||||
"""Setup users for the testing environment."""
|
|
||||||
for name, creds in CREDS.items():
|
|
||||||
uid, _ = await create_user(
|
|
||||||
creds['username'],
|
|
||||||
creds['email'],
|
|
||||||
creds['password'],
|
|
||||||
ctx.db,
|
|
||||||
ctx.loop
|
|
||||||
)
|
|
||||||
|
|
||||||
discrim = await ctx.db.fetchval("""
|
|
||||||
SELECT discriminator FROM users WHERE id = $1
|
|
||||||
""", uid)
|
|
||||||
|
|
||||||
print(f'created {name} {discrim} user: {uid}')
|
|
||||||
|
|
||||||
if name == 'admin':
|
|
||||||
await set_user_staff(uid, ctx)
|
|
||||||
|
|
||||||
print('OK')
|
|
||||||
|
|
||||||
|
|
||||||
def setup(subparser):
|
|
||||||
setup_test_parser = subparser.add_parser(
|
|
||||||
'setup_tests',
|
|
||||||
help='Create test users',
|
|
||||||
)
|
|
||||||
|
|
||||||
setup_test_parser.set_defaults(func=setup_tests)
|
|
||||||
|
|
@ -26,7 +26,7 @@ from logbook import Logger
|
||||||
|
|
||||||
from run import init_app_managers, init_app_db
|
from run import init_app_managers, init_app_db
|
||||||
from manage.cmd.migration import migration
|
from manage.cmd.migration import migration
|
||||||
from manage.cmd import users, tests, invites
|
from manage.cmd import users, invites
|
||||||
|
|
||||||
log = Logger(__name__)
|
log = Logger(__name__)
|
||||||
|
|
||||||
|
|
@ -54,7 +54,6 @@ def init_parser():
|
||||||
|
|
||||||
migration(subparser)
|
migration(subparser)
|
||||||
users.setup(subparser)
|
users.setup(subparser)
|
||||||
tests.setup(subparser)
|
|
||||||
invites.setup(subparser)
|
invites.setup(subparser)
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
|
||||||
|
|
@ -17,30 +17,51 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from .credentials import CREDS
|
import secrets
|
||||||
|
|
||||||
async def login(acc_name: str, test_cli):
|
def email() -> str:
|
||||||
creds = CREDS[acc_name]
|
return f'{secrets.token_hex(5)}@{secrets.token_hex(5)}.com'
|
||||||
|
|
||||||
resp = await test_cli.post('/api/v6/auth/login', json={
|
|
||||||
'email': creds['email'],
|
|
||||||
'password': creds['password']
|
|
||||||
})
|
|
||||||
|
|
||||||
if resp.status_code != 200:
|
|
||||||
raise RuntimeError(f'non-200 on login: {resp.status_code}')
|
|
||||||
|
|
||||||
rjson = await resp.json
|
|
||||||
return rjson['token']
|
|
||||||
|
|
||||||
|
|
||||||
async def get_uid(token, test_cli):
|
class TestClient:
|
||||||
resp = await test_cli.get('/api/v6/users/@me', headers={
|
"""Test client that wraps pytest-sanic's TestClient and a test
|
||||||
'Authorization': token
|
user and adds authorization headers to test requests."""
|
||||||
})
|
def __init__(self, test_cli, test_user):
|
||||||
|
self.cli = test_cli
|
||||||
|
self.app = test_cli.app
|
||||||
|
self.user = test_user
|
||||||
|
|
||||||
if resp.status_code != 200:
|
def __getitem__(self, key):
|
||||||
raise RuntimeError(f'non-200 on get uid: {resp.status_code}')
|
return self.user[key]
|
||||||
|
|
||||||
rjson = await resp.json
|
def _inject_auth(self, kwargs: dict) -> list:
|
||||||
return rjson['id']
|
"""Inject the test user's API key into the test request before
|
||||||
|
passing the request on to the underlying TestClient."""
|
||||||
|
headers = kwargs.get('headers', {})
|
||||||
|
headers['authorization'] = self.user['token']
|
||||||
|
return headers
|
||||||
|
|
||||||
|
async def get(self, *args, **kwargs):
|
||||||
|
"""Send a GET request."""
|
||||||
|
kwargs['headers'] = self._inject_auth(kwargs)
|
||||||
|
return await self.cli.get(*args, **kwargs)
|
||||||
|
|
||||||
|
async def post(self, *args, **kwargs):
|
||||||
|
"""Send a POST request."""
|
||||||
|
kwargs['headers'] = self._inject_auth(kwargs)
|
||||||
|
return await self.cli.post(*args, **kwargs)
|
||||||
|
|
||||||
|
async def put(self, *args, **kwargs):
|
||||||
|
"""Send a POST request."""
|
||||||
|
kwargs['headers'] = self._inject_auth(kwargs)
|
||||||
|
return await self.cli.put(*args, **kwargs)
|
||||||
|
|
||||||
|
async def patch(self, *args, **kwargs):
|
||||||
|
"""Send a PATCH request."""
|
||||||
|
kwargs['headers'] = self._inject_auth(kwargs)
|
||||||
|
return await self.cli.patch(*args, **kwargs)
|
||||||
|
|
||||||
|
async def delete(self, *args, **kwargs):
|
||||||
|
"""Send a DELETE request."""
|
||||||
|
kwargs['headers'] = self._inject_auth(kwargs)
|
||||||
|
return await self.cli.delete(*args, **kwargs)
|
||||||
|
|
|
||||||
|
|
@ -17,18 +17,24 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import secrets
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import socket
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
# this is very hacky.
|
# this is very hacky.
|
||||||
sys.path.append(os.getcwd())
|
sys.path.append(os.getcwd())
|
||||||
|
|
||||||
|
from tests.common import email, TestClient
|
||||||
|
|
||||||
from run import app as main_app, set_blueprints
|
from run import app as main_app, set_blueprints
|
||||||
|
|
||||||
|
from litecord.auth import create_user
|
||||||
|
from litecord.enums import UserFlags
|
||||||
|
from litecord.blueprints.auth import make_token
|
||||||
|
from litecord.blueprints.users import delete_user
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name='app')
|
@pytest.fixture(name='app')
|
||||||
def _test_app(unused_tcp_port, event_loop):
|
def _test_app(unused_tcp_port, event_loop):
|
||||||
|
|
@ -61,3 +67,64 @@ def _test_app(unused_tcp_port, event_loop):
|
||||||
def _test_cli(app):
|
def _test_cli(app):
|
||||||
"""Give a test client."""
|
"""Give a test client."""
|
||||||
return app.test_client()
|
return app.test_client()
|
||||||
|
|
||||||
|
# code shamelessly stolen from my elixire mr
|
||||||
|
# https://gitlab.com/elixire/elixire/merge_requests/52
|
||||||
|
async def _user_fixture_setup(app):
|
||||||
|
username = secrets.token_hex(6)
|
||||||
|
password = secrets.token_hex(6)
|
||||||
|
user_email = email()
|
||||||
|
|
||||||
|
user_id, pwd_hash = await create_user(
|
||||||
|
username, user_email, password, app.db, app.loop)
|
||||||
|
|
||||||
|
# generate a token for api access
|
||||||
|
user_token = make_token(user_id, pwd_hash)
|
||||||
|
|
||||||
|
return {'id': user_id, 'token': user_token,
|
||||||
|
'email': user_email, 'username': username,
|
||||||
|
'password': password}
|
||||||
|
|
||||||
|
|
||||||
|
async def _user_fixture_teardown(app, udata: dict):
|
||||||
|
await delete_user(udata['id'], db=app.db)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name='test_user')
|
||||||
|
async def test_user_fixture(app):
|
||||||
|
"""Yield a randomly generated test user."""
|
||||||
|
udata = await _user_fixture_setup(app)
|
||||||
|
yield udata
|
||||||
|
await _user_fixture_teardown(app, udata)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def test_cli_user(test_cli, test_user):
|
||||||
|
"""Yield a TestClient instance that contains a randomly generated
|
||||||
|
user."""
|
||||||
|
yield TestClient(test_cli, test_user)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def test_cli_staff(test_cli):
|
||||||
|
"""Yield a TestClient with a staff user."""
|
||||||
|
# This does not use the test_user because if a given test uses both
|
||||||
|
# test_cli_user and test_cli_admin, test_cli_admin will just point to that
|
||||||
|
# same test_cli_user, which isn't acceptable.
|
||||||
|
app = test_cli.app
|
||||||
|
test_user = await _user_fixture_setup(app)
|
||||||
|
user_id = test_user['id']
|
||||||
|
|
||||||
|
# copied from manage.cmd.users.set_user_staff.
|
||||||
|
old_flags = await app.db.fetchval("""
|
||||||
|
SELECT flags FROM users WHERE id = $1
|
||||||
|
""", user_id)
|
||||||
|
|
||||||
|
new_flags = old_flags | UserFlags.staff
|
||||||
|
|
||||||
|
await app.db.execute("""
|
||||||
|
UPDATE users SET flags = $1 WHERE id = $2
|
||||||
|
""", new_flags, user_id)
|
||||||
|
|
||||||
|
yield TestClient(test_cli, test_user)
|
||||||
|
await _user_fixture_teardown(test_cli.app, test_user)
|
||||||
|
|
|
||||||
|
|
@ -1,37 +0,0 @@
|
||||||
"""
|
|
||||||
|
|
||||||
Litecord
|
|
||||||
Copyright (C) 2018-2019 Luna Mendes
|
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation, version 3 of the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
"""
|
|
||||||
dummy credentials for litecord tests
|
|
||||||
"""
|
|
||||||
|
|
||||||
CREDS = {
|
|
||||||
'normal': {
|
|
||||||
'username': 'testygirl',
|
|
||||||
'email': 'girls@girls.com',
|
|
||||||
|
|
||||||
# 10 char password should work
|
|
||||||
'password': 'girls,,,,,'
|
|
||||||
},
|
|
||||||
'admin': {
|
|
||||||
'username': 'big_girl',
|
|
||||||
'email': 'big_girl@energy.com',
|
|
||||||
'password': 'big_girl_dot_com',
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -21,18 +21,13 @@ import secrets
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
from litecord.blueprints.guilds import delete_guild
|
from litecord.blueprints.guilds import delete_guild
|
||||||
from litecord.errors import GuildNotFound
|
from litecord.errors import GuildNotFound
|
||||||
|
|
||||||
async def _create_guild(test_cli, *, token=None):
|
async def _create_guild(test_cli_staff):
|
||||||
token = token or await login('admin', test_cli)
|
|
||||||
|
|
||||||
genned_name = secrets.token_hex(6)
|
genned_name = secrets.token_hex(6)
|
||||||
|
|
||||||
resp = await test_cli.post('/api/v6/guilds', headers={
|
resp = await test_cli_staff.post('/api/v6/guilds', json={
|
||||||
'Authorization': token
|
|
||||||
}, json={
|
|
||||||
'name': genned_name,
|
'name': genned_name,
|
||||||
'region': None
|
'region': None
|
||||||
})
|
})
|
||||||
|
|
@ -45,12 +40,8 @@ async def _create_guild(test_cli, *, token=None):
|
||||||
return rjson
|
return rjson
|
||||||
|
|
||||||
|
|
||||||
async def _fetch_guild(test_cli, guild_id, *, token=None, ret_early=False):
|
async def _fetch_guild(test_cli_staff, guild_id, *, ret_early=False):
|
||||||
token = token or await login('admin', test_cli)
|
resp = await test_cli_staff.get(f'/api/v6/admin/guilds/{guild_id}')
|
||||||
|
|
||||||
resp = await test_cli.get(f'/api/v6/admin/guilds/{guild_id}', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
if ret_early:
|
if ret_early:
|
||||||
return resp
|
return resp
|
||||||
|
|
@ -64,23 +55,21 @@ async def _fetch_guild(test_cli, guild_id, *, token=None, ret_early=False):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_guild_fetch(test_cli):
|
async def test_guild_fetch(test_cli_staff):
|
||||||
"""Test the creation and fetching of a guild via the Admin API."""
|
"""Test the creation and fetching of a guild via the Admin API."""
|
||||||
token = await login('admin', test_cli)
|
rjson = await _create_guild(test_cli_staff)
|
||||||
rjson = await _create_guild(test_cli, token=token)
|
|
||||||
guild_id = rjson['id']
|
guild_id = rjson['id']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await _fetch_guild(test_cli, guild_id)
|
await _fetch_guild(test_cli_staff, guild_id)
|
||||||
finally:
|
finally:
|
||||||
await delete_guild(int(guild_id), app_=test_cli.app)
|
await delete_guild(int(guild_id), app_=test_cli_staff.app)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_guild_update(test_cli):
|
async def test_guild_update(test_cli_staff):
|
||||||
"""Test the update of a guild via the Admin API."""
|
"""Test the update of a guild via the Admin API."""
|
||||||
token = await login('admin', test_cli)
|
rjson = await _create_guild(test_cli_staff)
|
||||||
rjson = await _create_guild(test_cli, token=token)
|
|
||||||
guild_id = rjson['id']
|
guild_id = rjson['id']
|
||||||
assert not rjson['unavailable']
|
assert not rjson['unavailable']
|
||||||
|
|
||||||
|
|
@ -89,11 +78,9 @@ async def test_guild_update(test_cli):
|
||||||
# would be overkill to test the side-effects, so... I'm not
|
# would be overkill to test the side-effects, so... I'm not
|
||||||
# testing them. Yes, I know its a bad idea, but if someone has an easier
|
# testing them. Yes, I know its a bad idea, but if someone has an easier
|
||||||
# way to write that, do send an MR.
|
# way to write that, do send an MR.
|
||||||
resp = await test_cli.patch(
|
resp = await test_cli_staff.patch(
|
||||||
f'/api/v6/admin/guilds/{guild_id}',
|
f'/api/v6/admin/guilds/{guild_id}',
|
||||||
headers={
|
json={
|
||||||
'Authorization': token
|
|
||||||
}, json={
|
|
||||||
'unavailable': True
|
'unavailable': True
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -103,30 +90,25 @@ async def test_guild_update(test_cli):
|
||||||
assert rjson['id'] == guild_id
|
assert rjson['id'] == guild_id
|
||||||
assert rjson['unavailable']
|
assert rjson['unavailable']
|
||||||
|
|
||||||
rjson = await _fetch_guild(test_cli, guild_id, token=token)
|
rjson = await _fetch_guild(test_cli_staff, guild_id)
|
||||||
assert rjson['unavailable']
|
assert rjson['unavailable']
|
||||||
finally:
|
finally:
|
||||||
await delete_guild(int(guild_id), app_=test_cli.app)
|
await delete_guild(int(guild_id), app_=test_cli_staff.app)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_guild_delete(test_cli):
|
async def test_guild_delete(test_cli_staff):
|
||||||
"""Test the update of a guild via the Admin API."""
|
"""Test the update of a guild via the Admin API."""
|
||||||
token = await login('admin', test_cli)
|
rjson = await _create_guild(test_cli_staff)
|
||||||
rjson = await _create_guild(test_cli, token=token)
|
|
||||||
guild_id = rjson['id']
|
guild_id = rjson['id']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = await test_cli.delete(
|
resp = await test_cli_staff.delete(f'/api/v6/admin/guilds/{guild_id}')
|
||||||
f'/api/v6/admin/guilds/{guild_id}',
|
|
||||||
headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 204
|
assert resp.status_code == 204
|
||||||
|
|
||||||
resp = await _fetch_guild(test_cli, guild_id, token=token,
|
resp = await _fetch_guild(
|
||||||
ret_early=True)
|
test_cli_staff, guild_id, ret_early=True)
|
||||||
|
|
||||||
assert resp.status_code == 404
|
assert resp.status_code == 404
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -134,4 +116,4 @@ async def test_guild_delete(test_cli):
|
||||||
assert rjson['error']
|
assert rjson['error']
|
||||||
assert rjson['code'] == GuildNotFound.error_code
|
assert rjson['code'] == GuildNotFound.error_code
|
||||||
finally:
|
finally:
|
||||||
await delete_guild(int(guild_id), app_=test_cli.app)
|
await delete_guild(int(guild_id), app_=test_cli_staff.app)
|
||||||
|
|
|
||||||
|
|
@ -19,15 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
|
|
||||||
async def _get_invs(test_cli, token=None):
|
async def _get_invs(test_cli):
|
||||||
if token is None:
|
resp = await test_cli.get('/api/v6/admin/instance/invites')
|
||||||
token = await login('admin', test_cli)
|
|
||||||
|
|
||||||
resp = await test_cli.get('/api/v6/admin/instance/invites', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -36,34 +30,25 @@ async def _get_invs(test_cli, token=None):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_invites(test_cli):
|
async def test_get_invites(test_cli_staff):
|
||||||
"""Test the listing of instance invites."""
|
"""Test the listing of instance invites."""
|
||||||
await _get_invs(test_cli)
|
await _get_invs(test_cli_staff)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_inv_delete_invalid(test_cli):
|
async def test_inv_delete_invalid(test_cli_staff):
|
||||||
"""Test errors happen when trying to delete a
|
"""Test errors happen when trying to delete a
|
||||||
non-existing instance invite."""
|
non-existing instance invite."""
|
||||||
token = await login('admin', test_cli)
|
resp = await test_cli_staff.delete('/api/v6/admin/instance/invites/aaaaaa')
|
||||||
resp = await test_cli.delete(
|
|
||||||
'/api/v6/admin/instance/invites/aaaaaaaaaa',
|
|
||||||
headers={
|
|
||||||
'Authorization': token
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert resp.status_code == 404
|
assert resp.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_create_invite(test_cli):
|
async def test_create_invite(test_cli_staff):
|
||||||
"""Test the creation of an instance invite, then listing it,
|
"""Test the creation of an instance invite, then listing it,
|
||||||
then deleting it."""
|
then deleting it."""
|
||||||
token = await login('admin', test_cli)
|
resp = await test_cli_staff.put('/api/v6/admin/instance/invites', json={
|
||||||
resp = await test_cli.put('/api/v6/admin/instance/invites', headers={
|
|
||||||
'Authorization': token
|
|
||||||
}, json={
|
|
||||||
'max_uses': 1
|
'max_uses': 1
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -73,15 +58,11 @@ async def test_create_invite(test_cli):
|
||||||
code = rjson['code']
|
code = rjson['code']
|
||||||
|
|
||||||
# assert that the invite is in the list
|
# assert that the invite is in the list
|
||||||
invites = await _get_invs(test_cli, token)
|
invites = await _get_invs(test_cli_staff)
|
||||||
assert any(inv['code'] == code for inv in invites)
|
assert any(inv['code'] == code for inv in invites)
|
||||||
|
|
||||||
# delete it, and assert it worked
|
# delete it, and assert it worked
|
||||||
resp = await test_cli.delete(
|
resp = await test_cli_staff.delete(
|
||||||
f'/api/v6/admin/instance/invites/{code}',
|
f'/api/v6/admin/instance/invites/{code}')
|
||||||
headers={
|
|
||||||
'Authorization': token
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert resp.status_code == 204
|
assert resp.status_code == 204
|
||||||
|
|
|
||||||
|
|
@ -21,28 +21,23 @@ import secrets
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
from tests.credentials import CREDS
|
|
||||||
from litecord.enums import UserFlags
|
from litecord.enums import UserFlags
|
||||||
|
|
||||||
|
|
||||||
async def _search(test_cli, *, username='', discrim='', token=None):
|
async def _search(test_cli, *, username='', discrim=''):
|
||||||
token = token or await login('admin', test_cli)
|
|
||||||
|
|
||||||
query_string = {
|
query_string = {
|
||||||
'username': username,
|
'username': username,
|
||||||
'discriminator': discrim
|
'discriminator': discrim
|
||||||
}
|
}
|
||||||
|
|
||||||
return await test_cli.get('/api/v6/admin/users', headers={
|
return await test_cli.get('/api/v6/admin/users', query_string=query_string)
|
||||||
'Authorization': token
|
|
||||||
}, query_string=query_string)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_list_users(test_cli):
|
async def test_list_users(test_cli_staff):
|
||||||
"""Try to list as many users as possible."""
|
"""Try to list as many users as possible."""
|
||||||
resp = await _search(test_cli, username=CREDS['admin']['username'])
|
resp = await _search(
|
||||||
|
test_cli_staff, username=test_cli_staff.user['username'])
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -50,13 +45,10 @@ async def test_list_users(test_cli):
|
||||||
assert rjson
|
assert rjson
|
||||||
|
|
||||||
|
|
||||||
async def _setup_user(test_cli, *, token=None) -> dict:
|
async def _setup_user(test_cli) -> dict:
|
||||||
token = token or await login('admin', test_cli)
|
|
||||||
genned = secrets.token_hex(7)
|
genned = secrets.token_hex(7)
|
||||||
|
|
||||||
resp = await test_cli.post('/api/v6/admin/users', headers={
|
resp = await test_cli.post('/api/v6/admin/users', json={
|
||||||
'Authorization': token
|
|
||||||
}, json={
|
|
||||||
'username': genned,
|
'username': genned,
|
||||||
'email': f'{genned}@{genned}.com',
|
'email': f'{genned}@{genned}.com',
|
||||||
'password': genned,
|
'password': genned,
|
||||||
|
|
@ -70,13 +62,9 @@ async def _setup_user(test_cli, *, token=None) -> dict:
|
||||||
return rjson
|
return rjson
|
||||||
|
|
||||||
|
|
||||||
async def _del_user(test_cli, user_id, *, token=None):
|
async def _del_user(test_cli, user_id):
|
||||||
"""Delete a user."""
|
"""Delete a user."""
|
||||||
token = token or await login('admin', test_cli)
|
resp = await test_cli.delete(f'/api/v6/admin/users/{user_id}')
|
||||||
|
|
||||||
resp = await test_cli.delete(f'/api/v6/admin/users/{user_id}', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -93,32 +81,29 @@ async def _del_user(test_cli, user_id, *, token=None):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_create_delete(test_cli):
|
async def test_create_delete(test_cli_staff):
|
||||||
"""Create a user. Then delete them."""
|
"""Create a user. Then delete them."""
|
||||||
token = await login('admin', test_cli)
|
rjson = await _setup_user(test_cli_staff)
|
||||||
|
|
||||||
rjson = await _setup_user(test_cli, token=token)
|
|
||||||
|
|
||||||
genned = rjson['username']
|
genned = rjson['username']
|
||||||
genned_uid = rjson['id']
|
genned_uid = rjson['id']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# check if side-effects went through with a search
|
# check if side-effects went through with a search
|
||||||
resp = await _search(test_cli, username=genned, token=token)
|
resp = await _search(test_cli_staff, username=genned)
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
assert isinstance(rjson, list)
|
assert isinstance(rjson, list)
|
||||||
assert rjson[0]['id'] == genned_uid
|
assert rjson[0]['id'] == genned_uid
|
||||||
finally:
|
finally:
|
||||||
await _del_user(test_cli, genned_uid, token=token)
|
await _del_user(test_cli_staff, genned_uid)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_user_update(test_cli):
|
async def test_user_update(test_cli_staff):
|
||||||
"""Test user update."""
|
"""Test user update."""
|
||||||
token = await login('admin', test_cli)
|
rjson = await _setup_user(test_cli_staff)
|
||||||
rjson = await _setup_user(test_cli, token=token)
|
|
||||||
|
|
||||||
user_id = rjson['id']
|
user_id = rjson['id']
|
||||||
|
|
||||||
|
|
@ -126,9 +111,9 @@ async def test_user_update(test_cli):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# set them as partner flag
|
# set them as partner flag
|
||||||
resp = await test_cli.patch(f'/api/v6/admin/users/{user_id}', headers={
|
resp = await test_cli_staff.patch(
|
||||||
'Authorization': token
|
f'/api/v6/admin/users/{user_id}',
|
||||||
}, json={
|
json={
|
||||||
'flags': UserFlags.partner,
|
'flags': UserFlags.partner,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -140,4 +125,4 @@ async def test_user_update(test_cli):
|
||||||
# TODO: maybe we can check for side effects by fetching the
|
# TODO: maybe we can check for side effects by fetching the
|
||||||
# user manually too...
|
# user manually too...
|
||||||
finally:
|
finally:
|
||||||
await _del_user(test_cli, user_id, token=token)
|
await _del_user(test_cli_staff, user_id)
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,6 @@ sys.path.append(os.getcwd())
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_gw(test_cli):
|
async def test_gw(test_cli):
|
||||||
|
|
@ -38,13 +36,9 @@ async def test_gw(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_gw_bot(test_cli):
|
async def test_gw_bot(test_cli_user):
|
||||||
"""Test the Get Bot Gateway route"""
|
"""Test the Get Bot Gateway route"""
|
||||||
token = await login('normal', test_cli)
|
resp = await test_cli_user.get('/api/v6/gateway/bot')
|
||||||
|
|
||||||
resp = await test_cli.get('/api/v6/gateway/bot', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
|
||||||
|
|
@ -20,22 +20,18 @@ import secrets
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_guild_create(test_cli):
|
async def test_guild_create(test_cli_user):
|
||||||
"""Test the creation of a guild, in three stages:
|
"""Test the creation of a guild, in three stages:
|
||||||
- creating it
|
- creating it
|
||||||
- checking the list
|
- checking the list
|
||||||
- deleting it
|
- deleting it
|
||||||
"""
|
"""
|
||||||
token = await login('normal', test_cli)
|
|
||||||
g_name = secrets.token_hex(5)
|
g_name = secrets.token_hex(5)
|
||||||
|
|
||||||
# stage 1: create
|
# stage 1: create
|
||||||
resp = await test_cli.post('/api/v6/guilds', headers={
|
resp = await test_cli_user.post('/api/v6/guilds', json={
|
||||||
'Authorization': token
|
|
||||||
}, json={
|
|
||||||
'name': g_name,
|
'name': g_name,
|
||||||
'region': None,
|
'region': None,
|
||||||
})
|
})
|
||||||
|
|
@ -53,9 +49,7 @@ async def test_guild_create(test_cli):
|
||||||
guild_id = created['id']
|
guild_id = created['id']
|
||||||
|
|
||||||
# stage 2: test
|
# stage 2: test
|
||||||
resp = await test_cli.get('/api/v6/users/@me/guilds', headers={
|
resp = await test_cli_user.get('/api/v6/users/@me/guilds')
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -85,8 +79,6 @@ async def test_guild_create(test_cli):
|
||||||
assert our_guild['name'] == created['name']
|
assert our_guild['name'] == created['name']
|
||||||
|
|
||||||
# stage 3: deletion
|
# stage 3: deletion
|
||||||
resp = await test_cli.delete(f'/api/v6/guilds/{guild_id}', headers={
|
resp = await test_cli_user.delete(f'/api/v6/guilds/{guild_id}')
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 204
|
assert resp.status_code == 204
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_science_empty(test_cli):
|
async def test_science_empty(test_cli):
|
||||||
|
|
@ -37,15 +35,10 @@ async def test_harvest_empty(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_consent_non_consenting(test_cli):
|
async def test_consent_non_consenting(test_cli_user):
|
||||||
"""Test the consent route to see if we're still on
|
"""Test the consent route to see if we're still on
|
||||||
a non-consent status regarding data collection."""
|
a non-consent status regarding data collection."""
|
||||||
token = await login('normal', test_cli)
|
resp = await test_cli_user.get('/api/v6/users/@me/consent')
|
||||||
|
|
||||||
resp = await test_cli.get('/api/v6/users/@me/consent', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
|
||||||
|
|
@ -20,15 +20,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
import pytest
|
import pytest
|
||||||
import secrets
|
import secrets
|
||||||
|
|
||||||
from tests.common import login, get_uid
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_me(test_cli):
|
async def test_get_me(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
resp = await test_cli_user.get('/api/v6/users/@me')
|
||||||
resp = await test_cli.get('/api/v6/users/@me', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -44,11 +39,8 @@ async def test_get_me(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_me_guilds(test_cli):
|
async def test_get_me_guilds(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
resp = await test_cli_user.get('/api/v6/users/@me/guilds')
|
||||||
resp = await test_cli.get('/api/v6/users/@me/guilds', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
@ -56,13 +48,9 @@ async def test_get_me_guilds(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_profile_self(test_cli):
|
async def test_get_profile_self(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
user_id = test_cli_user.user['id']
|
||||||
user_id = await get_uid(token, test_cli)
|
resp = await test_cli_user.get(f'/api/v6/users/{user_id}/profile')
|
||||||
|
|
||||||
resp = await test_cli.get(f'/api/v6/users/{user_id}/profile', headers={
|
|
||||||
'Authorization': token
|
|
||||||
})
|
|
||||||
|
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
rjson = await resp.json
|
rjson = await resp.json
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import json
|
||||||
import pytest
|
import pytest
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
from tests.common import login
|
|
||||||
from litecord.gateway.opcodes import OP
|
from litecord.gateway.opcodes import OP
|
||||||
from litecord.gateway.websocket import decode_etf
|
from litecord.gateway.websocket import decode_etf
|
||||||
|
|
||||||
|
|
@ -87,9 +86,8 @@ async def test_gw(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ready(test_cli):
|
async def test_ready(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
conn = await gw_start(test_cli_user.cli)
|
||||||
conn = await gw_start(test_cli)
|
|
||||||
|
|
||||||
# get the hello frame but ignore it
|
# get the hello frame but ignore it
|
||||||
await _json(conn)
|
await _json(conn)
|
||||||
|
|
@ -97,7 +95,7 @@ async def test_ready(test_cli):
|
||||||
await _json_send(conn, {
|
await _json_send(conn, {
|
||||||
'op': OP.IDENTIFY,
|
'op': OP.IDENTIFY,
|
||||||
'd': {
|
'd': {
|
||||||
'token': token,
|
'token': test_cli_user.user['token'],
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -112,9 +110,8 @@ async def test_ready(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ready_fields(test_cli):
|
async def test_ready_fields(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
conn = await gw_start(test_cli_user.cli)
|
||||||
conn = await gw_start(test_cli)
|
|
||||||
|
|
||||||
# get the hello frame but ignore it
|
# get the hello frame but ignore it
|
||||||
await _json(conn)
|
await _json(conn)
|
||||||
|
|
@ -122,7 +119,7 @@ async def test_ready_fields(test_cli):
|
||||||
await _json_send(conn, {
|
await _json_send(conn, {
|
||||||
'op': OP.IDENTIFY,
|
'op': OP.IDENTIFY,
|
||||||
'd': {
|
'd': {
|
||||||
'token': token,
|
'token': test_cli_user.user['token'],
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -153,9 +150,8 @@ async def test_ready_fields(test_cli):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_heartbeat(test_cli):
|
async def test_heartbeat(test_cli_user):
|
||||||
token = await login('normal', test_cli)
|
conn = await gw_start(test_cli_user.cli)
|
||||||
conn = await gw_start(test_cli)
|
|
||||||
|
|
||||||
# get the hello frame but ignore it
|
# get the hello frame but ignore it
|
||||||
await _json(conn)
|
await _json(conn)
|
||||||
|
|
@ -163,7 +159,7 @@ async def test_heartbeat(test_cli):
|
||||||
await _json_send(conn, {
|
await _json_send(conn, {
|
||||||
'op': OP.IDENTIFY,
|
'op': OP.IDENTIFY,
|
||||||
'd': {
|
'd': {
|
||||||
'token': token,
|
'token': test_cli_user.user['token'],
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue