Compare commits

..

1 Commits

Author SHA1 Message Date
Mai 97f2af5a2b Merge branch 'patch-3' into 'master'
Channel names no longer end with a dash

See merge request litecord/litecord!70
2021-11-09 22:55:07 +00:00
2 changed files with 45 additions and 69 deletions

View File

@ -268,13 +268,16 @@ class GatewayWebsocket:
"""Split data in chunk_size-big chunks and send them
over the websocket."""
log.debug(
"zlib-stream: sending {} bytes into {}-byte chunks", len(data), chunk_size
"zlib-stream: chunking {} bytes into {}-byte chunks", len(data), chunk_size
)
# we send the entire iterator as per websockets documentation
# to pretent setting FIN when we don't want to
# see https://gitlab.com/litecord/litecord/-/issues/139
await self.ws.send(yield_chunks(data, chunk_size))
total_chunks = 0
for chunk in yield_chunks(data, chunk_size):
total_chunks += 1
log.debug("zlib-stream: chunk {}", total_chunks)
await self.ws.send(chunk)
log.debug("zlib-stream: sent {} chunks", total_chunks)
async def _zlib_stream_send(self, encoded):
"""Sending a single payload across multiple compressed
@ -283,19 +286,39 @@ class GatewayWebsocket:
# compress and flush (for the rest of compressed data + ZLIB_SUFFIX)
data1 = self.ws_properties.zctx.compress(encoded)
data2 = self.ws_properties.zctx.flush(zlib.Z_FULL_FLUSH)
data = data1 + data2
log.debug(
"zlib-stream: length {} -> compressed ({})",
"zlib-stream: length {} -> compressed ({} + {})",
len(encoded),
len(data),
len(data1),
len(data2),
)
# since we always chunk the entire compressed message, we shouldn't
# worry about sending big frames to the clients
if not data1:
# if data1 is nothing, that might cause problems
# to clients, since they'll receive an empty message
data1 = bytes([data2[0]])
data2 = data2[1:]
log.debug(
"zlib-stream: len(data1) == 0, remaking as ({} + {})",
len(data1),
len(data2),
)
# NOTE: the old approach was ws.send(data1 + data2).
# I changed this to a chunked send of data1 and data2
# because that can bring some problems to the network
# since we can be potentially sending a really big packet
# as a single message.
# clients should handle chunked sends (via detection
# of the ZLIB_SUFFIX suffix appended to data2), so
# this shouldn't being problems.
# TODO: the chunks are 1024 bytes, 1KB, is this good enough?
await self._chunked_send(data, 1024)
await self._chunked_send(data1, 1024)
await self._chunked_send(data2, 1024)
async def _zstd_stream_send(self, encoded):
compressor = self.ws_properties.zsctx.stream_writer(

View File

@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import json
import zlib
from typing import Optional
import pytest
import websockets
@ -27,42 +25,15 @@ import websockets
from litecord.gateway.opcodes import OP
from litecord.gateway.websocket import decode_etf
# Z_SYNC_FLUSH suffix
ZLIB_SUFFIX = b"\x00\x00\xff\xff"
async def _json(conn):
frame = await conn.recv()
return json.loads(frame)
async def _recv(conn, *, zlib_stream: bool):
if zlib_stream:
try:
conn._zlib_context
except AttributeError:
conn._zlib_context = zlib.decompressobj()
# inspired by
# https://discord.com/developers/docs/topics/gateway#transport-compression-transport-compression-example
zlib_buffer = bytearray()
while True:
# keep receiving frames until we find the zlib prefix inside
msg = await conn.recv()
zlib_buffer.extend(msg)
if len(msg) < 4 or msg[-4:] != ZLIB_SUFFIX:
continue
# NOTE: the message is utf-8 encoded.
msg = conn._zlib_context.decompress(zlib_buffer)
return msg
else:
return await conn.recv()
async def _json(conn, *, zlib_stream: bool = False):
data = await _recv(conn, zlib_stream=zlib_stream)
return json.loads(data)
async def _etf(conn, *, zlib_stream: bool = False):
data = await _recv(conn, zlib_stream=zlib_stream)
return decode_etf(data)
async def _etf(conn):
frame = await conn.recv()
return decode_etf(frame)
async def _json_send(conn, data):
@ -78,8 +49,8 @@ async def _close(conn):
await conn.close(1000, "test end")
async def extract_and_verify_ready(conn, **kwargs):
ready = await _json(conn, **kwargs)
async def extract_and_verify_ready(conn):
ready = await _json(conn)
assert ready["op"] == OP.DISPATCH
assert ready["t"] == "READY"
@ -107,9 +78,7 @@ async def get_gw(test_cli, version: int) -> str:
return gw_json["url"]
async def gw_start(
test_cli, *, version: int = 6, etf=False, compress: Optional[str] = None
):
async def gw_start(test_cli, *, version: int = 6, etf=False):
"""Start a websocket connection"""
gw_url = await get_gw(test_cli, version)
@ -118,8 +87,7 @@ async def gw_start(
else:
gw_url = f"{gw_url}?v={version}&encoding=json"
compress = f"&compress={compress}" if compress else ""
return await websockets.connect(f"{gw_url}{compress}")
return await websockets.connect(gw_url)
@pytest.mark.asyncio
@ -350,18 +318,3 @@ async def test_ready_bot(test_cli_bot):
await extract_and_verify_ready(conn)
finally:
await _close(conn)
@pytest.mark.asyncio
async def test_ready_bot_zlib_stream(test_cli_bot):
conn = await gw_start(test_cli_bot.cli, compress="zlib-stream")
await _json(conn, zlib_stream=True) # ignore hello
await _json_send(
conn,
{"op": OP.IDENTIFY, "d": {"token": test_cli_bot.user["token"]}},
)
try:
await extract_and_verify_ready(conn, zlib_stream=True)
finally:
await _close(conn)