mirror of https://gitlab.com/litecord/litecord.git
test.test_websocket: add test_ready_fields
- RatelimitManager: add a test flag to overwrite _ws.connect ratelimit
This commit is contained in:
parent
978ad9075e
commit
a7f8852324
|
|
@ -42,13 +42,23 @@ RATELIMITS = {
|
||||||
|
|
||||||
class RatelimitManager:
|
class RatelimitManager:
|
||||||
"""Manager for the bucket managers"""
|
"""Manager for the bucket managers"""
|
||||||
def __init__(self):
|
def __init__(self, testing_flag=False):
|
||||||
self._ratelimiters = {}
|
self._ratelimiters = {}
|
||||||
|
self._test = testing_flag
|
||||||
self.global_bucket = Ratelimit(50, 1)
|
self.global_bucket = Ratelimit(50, 1)
|
||||||
self._fill_rtl()
|
self._fill_rtl()
|
||||||
|
|
||||||
def _fill_rtl(self):
|
def _fill_rtl(self):
|
||||||
for path, rtl in RATELIMITS.items():
|
for path, rtl in RATELIMITS.items():
|
||||||
|
# overwrite rtl with a 10/1 for _ws.connect
|
||||||
|
# if we're in testing mode.
|
||||||
|
|
||||||
|
# NOTE: this is a bad way to do it, but
|
||||||
|
# we only need to change that one for now.
|
||||||
|
rtl = (Ratelimit(10, 1)
|
||||||
|
if self._test and path == '_ws.connect'
|
||||||
|
else rtl)
|
||||||
|
|
||||||
self._ratelimiters[path] = rtl
|
self._ratelimiters[path] = rtl
|
||||||
|
|
||||||
def get_ratelimit(self, key: str) -> Ratelimit:
|
def get_ratelimit(self, key: str) -> Ratelimit:
|
||||||
|
|
|
||||||
2
run.py
2
run.py
|
|
@ -183,7 +183,7 @@ async def init_app_db(app):
|
||||||
def init_app_managers(app):
|
def init_app_managers(app):
|
||||||
"""Initialize singleton classes."""
|
"""Initialize singleton classes."""
|
||||||
app.loop = asyncio.get_event_loop()
|
app.loop = asyncio.get_event_loop()
|
||||||
app.ratelimiter = RatelimitManager()
|
app.ratelimiter = RatelimitManager(app.config.get('_testing'))
|
||||||
app.state_manager = StateManager()
|
app.state_manager = StateManager()
|
||||||
|
|
||||||
app.storage = Storage(app.db)
|
app.storage = Storage(app.db)
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ from run import app as main_app, set_blueprints
|
||||||
@pytest.fixture(name='app')
|
@pytest.fixture(name='app')
|
||||||
def _test_app(unused_tcp_port, event_loop):
|
def _test_app(unused_tcp_port, event_loop):
|
||||||
set_blueprints(main_app)
|
set_blueprints(main_app)
|
||||||
|
main_app.config['_testing'] = True
|
||||||
|
|
||||||
# reassign an unused tcp port for websockets
|
# reassign an unused tcp port for websockets
|
||||||
# since the config might give a used one.
|
# since the config might give a used one.
|
||||||
|
|
|
||||||
|
|
@ -67,3 +67,39 @@ async def test_ready(test_cli):
|
||||||
await conn.close(1000, 'test end')
|
await conn.close(1000, 'test end')
|
||||||
except (Exception, websockets.ConnectionClosed):
|
except (Exception, websockets.ConnectionClosed):
|
||||||
assert False
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ready_fields(test_cli):
|
||||||
|
token = await login('normal', test_cli)
|
||||||
|
conn = await gw_start(test_cli)
|
||||||
|
|
||||||
|
# get the hello frame but ignore it
|
||||||
|
await _json(conn)
|
||||||
|
|
||||||
|
await _json_send(conn, {
|
||||||
|
'op': OP.IDENTIFY,
|
||||||
|
'd': {
|
||||||
|
'token': token,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ready = await _json(conn)
|
||||||
|
assert isinstance(ready, dict)
|
||||||
|
assert ready['op'] == OP.DISPATCH
|
||||||
|
assert ready['t'] == 'READY'
|
||||||
|
|
||||||
|
data = ready['d']
|
||||||
|
assert isinstance(data, dict)
|
||||||
|
|
||||||
|
# NOTE: change if default gateway changes
|
||||||
|
assert data['v'] == 6
|
||||||
|
|
||||||
|
# make sure other fields exist and are with
|
||||||
|
# proper types.
|
||||||
|
assert isinstance(data['user'], dict)
|
||||||
|
assert isinstance(data['private_channels'], list)
|
||||||
|
assert isinstance(data['guilds'], list)
|
||||||
|
assert isinstance(data['session_id'], str)
|
||||||
|
|
||||||
|
await conn.close(1000, 'test end')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue