mirror of https://gitlab.com/litecord/litecord.git
Compare commits
7 Commits
97f2af5a2b
...
8fa78b47f5
| Author | SHA1 | Date |
|---|---|---|
|
|
8fa78b47f5 | |
|
|
ddd2469066 | |
|
|
c85f0806c7 | |
|
|
f792769656 | |
|
|
549d5992fd | |
|
|
fbc15219f7 | |
|
|
e9edd8ba37 |
|
|
@ -268,16 +268,13 @@ class GatewayWebsocket:
|
||||||
"""Split data in chunk_size-big chunks and send them
|
"""Split data in chunk_size-big chunks and send them
|
||||||
over the websocket."""
|
over the websocket."""
|
||||||
log.debug(
|
log.debug(
|
||||||
"zlib-stream: chunking {} bytes into {}-byte chunks", len(data), chunk_size
|
"zlib-stream: sending {} bytes into {}-byte chunks", len(data), chunk_size
|
||||||
)
|
)
|
||||||
|
|
||||||
total_chunks = 0
|
# we send the entire iterator as per websockets documentation
|
||||||
for chunk in yield_chunks(data, chunk_size):
|
# to pretent setting FIN when we don't want to
|
||||||
total_chunks += 1
|
# see https://gitlab.com/litecord/litecord/-/issues/139
|
||||||
log.debug("zlib-stream: chunk {}", total_chunks)
|
await self.ws.send(yield_chunks(data, chunk_size))
|
||||||
await self.ws.send(chunk)
|
|
||||||
|
|
||||||
log.debug("zlib-stream: sent {} chunks", total_chunks)
|
|
||||||
|
|
||||||
async def _zlib_stream_send(self, encoded):
|
async def _zlib_stream_send(self, encoded):
|
||||||
"""Sending a single payload across multiple compressed
|
"""Sending a single payload across multiple compressed
|
||||||
|
|
@ -286,39 +283,19 @@ class GatewayWebsocket:
|
||||||
# compress and flush (for the rest of compressed data + ZLIB_SUFFIX)
|
# compress and flush (for the rest of compressed data + ZLIB_SUFFIX)
|
||||||
data1 = self.ws_properties.zctx.compress(encoded)
|
data1 = self.ws_properties.zctx.compress(encoded)
|
||||||
data2 = self.ws_properties.zctx.flush(zlib.Z_FULL_FLUSH)
|
data2 = self.ws_properties.zctx.flush(zlib.Z_FULL_FLUSH)
|
||||||
|
data = data1 + data2
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"zlib-stream: length {} -> compressed ({} + {})",
|
"zlib-stream: length {} -> compressed ({})",
|
||||||
len(encoded),
|
len(encoded),
|
||||||
len(data1),
|
len(data),
|
||||||
len(data2),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not data1:
|
# since we always chunk the entire compressed message, we shouldn't
|
||||||
# if data1 is nothing, that might cause problems
|
# worry about sending big frames to the clients
|
||||||
# to clients, since they'll receive an empty message
|
|
||||||
data1 = bytes([data2[0]])
|
|
||||||
data2 = data2[1:]
|
|
||||||
|
|
||||||
log.debug(
|
|
||||||
"zlib-stream: len(data1) == 0, remaking as ({} + {})",
|
|
||||||
len(data1),
|
|
||||||
len(data2),
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: the old approach was ws.send(data1 + data2).
|
|
||||||
# I changed this to a chunked send of data1 and data2
|
|
||||||
# because that can bring some problems to the network
|
|
||||||
# since we can be potentially sending a really big packet
|
|
||||||
# as a single message.
|
|
||||||
|
|
||||||
# clients should handle chunked sends (via detection
|
|
||||||
# of the ZLIB_SUFFIX suffix appended to data2), so
|
|
||||||
# this shouldn't being problems.
|
|
||||||
|
|
||||||
# TODO: the chunks are 1024 bytes, 1KB, is this good enough?
|
# TODO: the chunks are 1024 bytes, 1KB, is this good enough?
|
||||||
await self._chunked_send(data1, 1024)
|
await self._chunked_send(data, 1024)
|
||||||
await self._chunked_send(data2, 1024)
|
|
||||||
|
|
||||||
async def _zstd_stream_send(self, encoded):
|
async def _zstd_stream_send(self, encoded):
|
||||||
compressor = self.ws_properties.zsctx.stream_writer(
|
compressor = self.ws_properties.zsctx.stream_writer(
|
||||||
|
|
|
||||||
|
|
@ -191,6 +191,12 @@ def validate(
|
||||||
if reqjson is None:
|
if reqjson is None:
|
||||||
raise BadRequest("No JSON provided")
|
raise BadRequest("No JSON provided")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if schema["type"]["type"] == "channel_type" and reqjson["name"][-1] == "-":
|
||||||
|
reqjson["name"] = reqjson["name"][:-1]
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
try:
|
try:
|
||||||
valid = validator.validate(reqjson)
|
valid = validator.validate(reqjson)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import zlib
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import websockets
|
import websockets
|
||||||
|
|
@ -25,15 +27,42 @@ import websockets
|
||||||
from litecord.gateway.opcodes import OP
|
from litecord.gateway.opcodes import OP
|
||||||
from litecord.gateway.websocket import decode_etf
|
from litecord.gateway.websocket import decode_etf
|
||||||
|
|
||||||
|
# Z_SYNC_FLUSH suffix
|
||||||
async def _json(conn):
|
ZLIB_SUFFIX = b"\x00\x00\xff\xff"
|
||||||
frame = await conn.recv()
|
|
||||||
return json.loads(frame)
|
|
||||||
|
|
||||||
|
|
||||||
async def _etf(conn):
|
async def _recv(conn, *, zlib_stream: bool):
|
||||||
frame = await conn.recv()
|
if zlib_stream:
|
||||||
return decode_etf(frame)
|
try:
|
||||||
|
conn._zlib_context
|
||||||
|
except AttributeError:
|
||||||
|
conn._zlib_context = zlib.decompressobj()
|
||||||
|
|
||||||
|
# inspired by
|
||||||
|
# https://discord.com/developers/docs/topics/gateway#transport-compression-transport-compression-example
|
||||||
|
zlib_buffer = bytearray()
|
||||||
|
while True:
|
||||||
|
# keep receiving frames until we find the zlib prefix inside
|
||||||
|
msg = await conn.recv()
|
||||||
|
zlib_buffer.extend(msg)
|
||||||
|
if len(msg) < 4 or msg[-4:] != ZLIB_SUFFIX:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# NOTE: the message is utf-8 encoded.
|
||||||
|
msg = conn._zlib_context.decompress(zlib_buffer)
|
||||||
|
return msg
|
||||||
|
else:
|
||||||
|
return await conn.recv()
|
||||||
|
|
||||||
|
|
||||||
|
async def _json(conn, *, zlib_stream: bool = False):
|
||||||
|
data = await _recv(conn, zlib_stream=zlib_stream)
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
|
||||||
|
async def _etf(conn, *, zlib_stream: bool = False):
|
||||||
|
data = await _recv(conn, zlib_stream=zlib_stream)
|
||||||
|
return decode_etf(data)
|
||||||
|
|
||||||
|
|
||||||
async def _json_send(conn, data):
|
async def _json_send(conn, data):
|
||||||
|
|
@ -49,8 +78,8 @@ async def _close(conn):
|
||||||
await conn.close(1000, "test end")
|
await conn.close(1000, "test end")
|
||||||
|
|
||||||
|
|
||||||
async def extract_and_verify_ready(conn):
|
async def extract_and_verify_ready(conn, **kwargs):
|
||||||
ready = await _json(conn)
|
ready = await _json(conn, **kwargs)
|
||||||
assert ready["op"] == OP.DISPATCH
|
assert ready["op"] == OP.DISPATCH
|
||||||
assert ready["t"] == "READY"
|
assert ready["t"] == "READY"
|
||||||
|
|
||||||
|
|
@ -78,7 +107,9 @@ async def get_gw(test_cli, version: int) -> str:
|
||||||
return gw_json["url"]
|
return gw_json["url"]
|
||||||
|
|
||||||
|
|
||||||
async def gw_start(test_cli, *, version: int = 6, etf=False):
|
async def gw_start(
|
||||||
|
test_cli, *, version: int = 6, etf=False, compress: Optional[str] = None
|
||||||
|
):
|
||||||
"""Start a websocket connection"""
|
"""Start a websocket connection"""
|
||||||
gw_url = await get_gw(test_cli, version)
|
gw_url = await get_gw(test_cli, version)
|
||||||
|
|
||||||
|
|
@ -87,7 +118,8 @@ async def gw_start(test_cli, *, version: int = 6, etf=False):
|
||||||
else:
|
else:
|
||||||
gw_url = f"{gw_url}?v={version}&encoding=json"
|
gw_url = f"{gw_url}?v={version}&encoding=json"
|
||||||
|
|
||||||
return await websockets.connect(gw_url)
|
compress = f"&compress={compress}" if compress else ""
|
||||||
|
return await websockets.connect(f"{gw_url}{compress}")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|
@ -318,3 +350,18 @@ async def test_ready_bot(test_cli_bot):
|
||||||
await extract_and_verify_ready(conn)
|
await extract_and_verify_ready(conn)
|
||||||
finally:
|
finally:
|
||||||
await _close(conn)
|
await _close(conn)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ready_bot_zlib_stream(test_cli_bot):
|
||||||
|
conn = await gw_start(test_cli_bot.cli, compress="zlib-stream")
|
||||||
|
await _json(conn, zlib_stream=True) # ignore hello
|
||||||
|
await _json_send(
|
||||||
|
conn,
|
||||||
|
{"op": OP.IDENTIFY, "d": {"token": test_cli_bot.user["token"]}},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await extract_and_verify_ready(conn, zlib_stream=True)
|
||||||
|
finally:
|
||||||
|
await _close(conn)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue