From df7f2b1b21fcca918243511e618a920a654c49e6 Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 30 May 2019 20:59:32 -0300 Subject: [PATCH 1/4] pipfile: add zstandard library --- Pipfile | 1 + Pipfile.lock | 108 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 78 insertions(+), 31 deletions(-) diff --git a/Pipfile b/Pipfile index ec43aa5..768ee87 100644 --- a/Pipfile +++ b/Pipfile @@ -14,6 +14,7 @@ Cerberus = "==1.2" quart = "==0.9.0" pillow = "*" aiohttp = "==3.5.4" +zstandard = "*" [dev-packages] pytest = "==4.4.1" diff --git a/Pipfile.lock b/Pipfile.lock index 587200d..787e283 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "038042e1d75ebc675f8595e63e3373984ae3cc16973991150e26ce2ff56f435b" + "sha256": "b2a0f4104d15d6f12cf1a4f4d05cb242a623972f4199c80809470712959ef9d4" }, "pipfile-spec": 6, "requires": { @@ -187,10 +187,10 @@ }, "h11": { "hashes": [ - "sha256:acca6a44cb52a32ab442b1779adf0875c443c689e9e028f8d831a3769f9c5208", - "sha256:f2b1ca39bfed357d1f19ac732913d5f9faa54a5062eca7d2ec3a916cfb7ae4c7" + "sha256:33d4bca7be0fa039f4e84d50ab00531047e53d6ee8ffbc83501ea602c169cae1", + "sha256:4bc6d6a1238b7615b266ada57e0618568066f57dd6fa967d1290ec9309b2f2f1" ], - "version": "==0.8.1" + "version": "==0.9.0" }, "h2": { "hashes": [ @@ -419,10 +419,10 @@ }, "wsproto": { "hashes": [ - "sha256:55c3da870460e8838b2fbe4d10f3accc0cea3a13d5e8dbbdc6da5d537d6d44dc", - "sha256:c7f35e0af250b9f25583b090039eb2159a079fbe71b7daf86cc3ddcd2f3a70b3" + "sha256:2b870f5b5b4a6d23dce080a4ee1cbb119b2378f82593bd6d66ae2cbd72a7c0ad", + "sha256:ed222c812aaea55d72d18a87df429cfd602e15b6c992a07a53b495858f083a14" ], - "version": "==0.14.0" + "version": "==0.14.1" }, "yarl": { "hashes": [ @@ -439,6 +439,39 @@ "sha256:e060906c0c585565c718d1c3841747b61c5439af2211e185f6739a9412dfbde1" ], "version": "==1.3.0" + }, + "zstandard": { + "hashes": [ + "sha256:19f5ad81590acd20dbdfb930b87a035189778662fdc67ab8cbcc106269ed1be8", + "sha256:1a1db0c9774181e806a418c32d511aa085c7e2c28c257a58f6c107f5decb3109", + "sha256:22d7aa898f36f78108cc1ef0c8da8225f0add518441d815ad4fdd1d577378209", + "sha256:357873afdd7cd0e653d169c36ce837ce2b3e5926dd4a5c0f0476c813f6765373", + "sha256:3c31da5d78a7b07e722e8a3e0b1295bc9b316b7e90a1666659c451a42750ffe4", + "sha256:3f76562ec63fabc6f4b5be0cd986f911c97105c35c31b4d655b90c4d2fe07f40", + "sha256:42fa4462e0563fe17e73dfeb95eef9b00429b86282f8f6ca0e2765b1855a8324", + "sha256:51aad01a5709ca6f45768c69ffd4c887528e5ad9e09302426b735560752c4e82", + "sha256:6cd81819a02e57e38e27c53c5c0a7015e059b0e148a18bf27b46b4f808840879", + "sha256:717fd2494f222164396e03d08ef57174d2a889920b81ca49f276caf9381e6405", + "sha256:71c8711458212c973a9b719275db8111f22803e0caf675affde50703b96e9be1", + "sha256:76a331b5a6258fce3906551557db9be83bdd89a62f66f509a55a4a307239c782", + "sha256:7c92dfcdf7e0c540f9718b40b4c54516a968ef6b81567b75df81866a1af2189d", + "sha256:7f3db21223a8bb4ffcf6c36b9c20d38278967723b47fce249dcb6ec6d4082b83", + "sha256:7fa9deba4c904e76870e08324adff94ec3a4bc56a50bbe1a9f859a4aed11c0d2", + "sha256:88912cbcf68cc40037c113460a166ebfbbb24864ceebb89ad221ea346f22e995", + "sha256:94aa5bb817f1c747b21214f6ef83a022bcb63bf81e4dae2954768165c13a510b", + "sha256:951e382a2ea47179ecb3e314e8c70f2e5189e3652ccbbcb71c6443dd71bc20fc", + "sha256:978a500ae1184f602dc902977ec208c7cf02c10caae9c159b10976a7cb29f879", + "sha256:991c4a40171d87854b219cdf2ba56c1c34b3b3a8ebe5d1ab63bd357ff71271b2", + "sha256:9ca84187182743d2e6bbf9d3f79d3834db205cddc98add27ad20f2189d080a60", + "sha256:ae50bc839cf1ff549f55a3e55922563f246fb692f77497175a8d8d4cddc294da", + "sha256:b7abae5b17e82d5f78aaa641077b4619c6ad204e30c6f3445d422acff5f35d3e", + "sha256:b8fce0c961654f77c81a6ae1f2cd40633b41ef16a12ae02f0382ed6692f9bb90", + "sha256:d8f047d3647a5cd1b77b4580f35208c938da00c101a092571c85bcefaa2d725d", + "sha256:f1785b31bf428e964a9670dd4f721023f2741ef7fd67c663bf01e3d4d3f9ec2a", + "sha256:fcf70e1e9d38035a15482e954ba064f3b701cf84cfe571576d15af93ac2a2fb1" + ], + "index": "pypi", + "version": "==0.11.1" } }, "develop": { @@ -456,6 +489,13 @@ ], "version": "==19.1.0" }, + "importlib-metadata": { + "hashes": [ + "sha256:a9f185022cfa69e9ca5f7eabfd5a58b689894cb78a11e3c8c89398a8ccbb8e7f", + "sha256:df1403cd3aebeb2b1dcd3515ca062eecb5bd3ea7611f18cba81130c68707e879" + ], + "version": "==0.17" + }, "more-itertools": { "hashes": [ "sha256:2112d2ca570bb7c3e53ea1a35cd5df42bb0fd10c45f0fb97178679c3c03d64c7", @@ -490,10 +530,10 @@ }, "pluggy": { "hashes": [ - "sha256:19ecf9ce9db2fce065a7a0586e07cfb4ac8614fe96edf628a264b1c70116cf8f", - "sha256:84d306a647cc805219916e62aab89caa97a33a1dd8c342e87a37f91073cd4746" + "sha256:0825a152ac059776623854c1543d65a4ad408eb3d33ee114dff91e57ec6ae6fc", + "sha256:b9817417e95936bf75d85d3f8767f7df6cdde751fc40aed3bb3074cbcb77757c" ], - "version": "==0.9.0" + "version": "==0.12.0" }, "py": { "hashes": [ @@ -535,28 +575,34 @@ }, "typed-ast": { "hashes": [ - "sha256:04894d268ba6eab7e093d43107869ad49e7b5ef40d1a94243ea49b352061b200", - "sha256:16616ece19daddc586e499a3d2f560302c11f122b9c692bc216e821ae32aa0d0", - "sha256:252fdae740964b2d3cdfb3f84dcb4d6247a48a6abe2579e8029ab3be3cdc026c", - "sha256:2af80a373af123d0b9f44941a46df67ef0ff7a60f95872412a145f4500a7fc99", - "sha256:2c88d0a913229a06282b285f42a31e063c3bf9071ff65c5ea4c12acb6977c6a7", - "sha256:2ea99c029ebd4b5a308d915cc7fb95b8e1201d60b065450d5d26deb65d3f2bc1", - "sha256:3d2e3ab175fc097d2a51c7a0d3fda442f35ebcc93bb1d7bd9b95ad893e44c04d", - "sha256:4766dd695548a15ee766927bf883fb90c6ac8321be5a60c141f18628fb7f8da8", - "sha256:56b6978798502ef66625a2e0f80cf923da64e328da8bbe16c1ff928c70c873de", - "sha256:5cddb6f8bce14325b2863f9d5ac5c51e07b71b462361fd815d1d7706d3a9d682", - "sha256:644ee788222d81555af543b70a1098f2025db38eaa99226f3a75a6854924d4db", - "sha256:64cf762049fc4775efe6b27161467e76d0ba145862802a65eefc8879086fc6f8", - "sha256:68c362848d9fb71d3c3e5f43c09974a0ae319144634e7a47db62f0f2a54a7fa7", - "sha256:6c1f3c6f6635e611d58e467bf4371883568f0de9ccc4606f17048142dec14a1f", - "sha256:b213d4a02eec4ddf622f4d2fbc539f062af3788d1f332f028a2e19c42da53f15", - "sha256:bb27d4e7805a7de0e35bd0cb1411bc85f807968b2b0539597a49a23b00a622ae", - "sha256:c9d414512eaa417aadae7758bc118868cd2396b0e6138c1dd4fda96679c079d3", - "sha256:f0937165d1e25477b01081c4763d2d9cdc3b18af69cb259dd4f640c9b900fe5e", - "sha256:fb96a6e2c11059ecf84e6741a319f93f683e440e341d4489c9b161eca251cf2a", - "sha256:fc71d2d6ae56a091a8d94f33ec9d0f2001d1cb1db423d8b4355debfe9ce689b7" + "sha256:132eae51d6ef3ff4a8c47c393a4ef5ebf0d1aecc96880eb5d6c8ceab7017cc9b", + "sha256:18141c1484ab8784006c839be8b985cfc82a2e9725837b0ecfa0203f71c4e39d", + "sha256:2baf617f5bbbfe73fd8846463f5aeafc912b5ee247f410700245d68525ec584a", + "sha256:3d90063f2cbbe39177e9b4d888e45777012652d6110156845b828908c51ae462", + "sha256:4304b2218b842d610aa1a1d87e1dc9559597969acc62ce717ee4dfeaa44d7eee", + "sha256:4983ede548ffc3541bae49a82675996497348e55bafd1554dc4e4a5d6eda541a", + "sha256:5315f4509c1476718a4825f45a203b82d7fdf2a6f5f0c8f166435975b1c9f7d4", + "sha256:6cdfb1b49d5345f7c2b90d638822d16ba62dc82f7616e9b4caa10b72f3f16649", + "sha256:7b325f12635598c604690efd7a0197d0b94b7d7778498e76e0710cd582fd1c7a", + "sha256:8d3b0e3b8626615826f9a626548057c5275a9733512b137984a68ba1598d3d2f", + "sha256:8f8631160c79f53081bd23446525db0bc4c5616f78d04021e6e434b286493fd7", + "sha256:912de10965f3dc89da23936f1cc4ed60764f712e5fa603a09dd904f88c996760", + "sha256:b010c07b975fe853c65d7bbe9d4ac62f1c69086750a574f6292597763781ba18", + "sha256:c908c10505904c48081a5415a1e295d8403e353e0c14c42b6d67f8f97fae6616", + "sha256:c94dd3807c0c0610f7c76f078119f4ea48235a953512752b9175f9f98f5ae2bd", + "sha256:ce65dee7594a84c466e79d7fb7d3303e7295d16a83c22c7c4037071b059e2c21", + "sha256:eaa9cfcb221a8a4c2889be6f93da141ac777eb8819f077e1d09fb12d00a09a93", + "sha256:f3376bc31bad66d46d44b4e6522c5c21976bf9bca4ef5987bb2bf727f4506cbb", + "sha256:f9202fa138544e13a4ec1a6792c35834250a85958fde1251b6a22e07d1260ae7" ], - "version": "==1.3.4" + "version": "==1.3.5" + }, + "zipp": { + "hashes": [ + "sha256:8c1019c6aad13642199fbe458275ad6a84907634cc9f0989877ccc4a2840139d", + "sha256:ca943a7e809cc12257001ccfb99e3563da9af99d52f261725e96dfe0f9275bc3" + ], + "version": "==0.5.1" } } } From 9142e26152cd19a901e49ad9025008dc4b31e822 Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 30 May 2019 21:46:16 -0300 Subject: [PATCH 2/4] gateway: move encoding to litecord.gateway.encoding --- litecord/gateway/encoding.py | 84 +++++++++++++++++++++++++++++++++++ litecord/gateway/gateway.py | 2 +- litecord/gateway/websocket.py | 73 ++++-------------------------- 3 files changed, 94 insertions(+), 65 deletions(-) create mode 100644 litecord/gateway/encoding.py diff --git a/litecord/gateway/encoding.py b/litecord/gateway/encoding.py new file mode 100644 index 0000000..07957f6 --- /dev/null +++ b/litecord/gateway/encoding.py @@ -0,0 +1,84 @@ +""" + +Litecord +Copyright (C) 2018-2019 Luna Mendes + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +""" + +import json +import earl + +from litecord.utils import LitecordJSONEncoder + + +def encode_json(payload) -> str: + """Encode a given payload to JSON.""" + return json.dumps(payload, separators=(',', ':'), + cls=LitecordJSONEncoder) + + +def decode_json(data: str): + """Decode from JSON.""" + return json.loads(data) + + +def encode_etf(payload) -> str: + """Encode a payload to ETF (External Term Format). + + This gives a JSON pass on the given payload (via calling encode_json and + then decode_json) because we may want to encode objects that can only be + encoded by LitecordJSONEncoder. + + Earl-ETF does not give the same interface for extensibility, hence why we + do the pass. + """ + sanitized = encode_json(payload) + sanitized = decode_json(sanitized) + return earl.pack(sanitized) + + +def _etf_decode_dict(data): + """Decode a given dictionary.""" + # NOTE: this is very slow. + + if isinstance(data, bytes): + return data.decode() + + if not isinstance(data, dict): + return data + + _copy = dict(data) + result = {} + + for key in _copy.keys(): + # assuming key is bytes rn. + new_k = key.decode() + + # maybe nested dicts, so... + result[new_k] = _etf_decode_dict(data[key]) + + return result + +def decode_etf(data: bytes): + """Decode data in ETF to any.""" + res = earl.unpack(data) + + if isinstance(res, bytes): + return data.decode() + + if isinstance(res, dict): + return _etf_decode_dict(res) + + return res diff --git a/litecord/gateway/gateway.py b/litecord/gateway/gateway.py index f435d16..a213528 100644 --- a/litecord/gateway/gateway.py +++ b/litecord/gateway/gateway.py @@ -50,7 +50,7 @@ async def websocket_handler(app, ws, url): except (KeyError, IndexError): gw_compress = None - if gw_compress and gw_compress not in ('zlib-stream',): + if gw_compress and gw_compress not in ('zlib-stream', 'zstd-stream'): return await ws.close(1000, 'Invalid gateway compress') gws = GatewayWebsocket( diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 335cfd1..7283f17 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -21,19 +21,17 @@ import collections import asyncio import pprint import zlib -import json from typing import List, Dict, Any from random import randint import websockets from logbook import Logger -import earl from litecord.auth import raw_token_check from litecord.enums import RelationshipType, ChannelType from litecord.schemas import validate, GW_STATUS_UPDATE from litecord.utils import ( - task_wrapper, LitecordJSONEncoder, yield_chunks + task_wrapper, yield_chunks ) from litecord.permissions import get_permissions @@ -46,6 +44,9 @@ from litecord.errors import ( from litecord.gateway.errors import ( DecodeError, UnknownOPCode, InvalidShard, ShardingRequired ) +from litecord.gateway.encoding import ( + encode_json, decode_json, encode_etf, decode_etf +) from litecord.storage import int_ @@ -64,67 +65,6 @@ WebsocketObjects = collections.namedtuple( ) -def encode_json(payload) -> str: - """Encode a given payload to JSON.""" - return json.dumps(payload, separators=(',', ':'), - cls=LitecordJSONEncoder) - - -def decode_json(data: str): - """Decode from JSON.""" - return json.loads(data) - - -def encode_etf(payload) -> str: - """Encode a payload to ETF (External Term Format). - - This gives a JSON pass on the given payload (via calling encode_json and - then decode_json) because we may want to encode objects that can only be - encoded by LitecordJSONEncoder. - - Earl-ETF does not give the same interface for extensibility, hence why we - do the pass. - """ - sanitized = encode_json(payload) - sanitized = decode_json(sanitized) - return earl.pack(sanitized) - - -def _etf_decode_dict(data): - """Decode a given dictionary.""" - # NOTE: this is very slow. - - if isinstance(data, bytes): - return data.decode() - - if not isinstance(data, dict): - return data - - _copy = dict(data) - result = {} - - for key in _copy.keys(): - # assuming key is bytes rn. - new_k = key.decode() - - # maybe nested dicts, so... - result[new_k] = _etf_decode_dict(data[key]) - - return result - -def decode_etf(data: bytes): - """Decode data in ETF to any.""" - res = earl.unpack(data) - - if isinstance(res, bytes): - return data.decode() - - if isinstance(res, dict): - return _etf_decode_dict(res) - - return res - - class GatewayWebsocket: """Main gateway websocket logic.""" @@ -210,6 +150,9 @@ class GatewayWebsocket: await self._chunked_send(data1, 1024) await self._chunked_send(data2, 1024) + async def _zstd_stream_send(self, encoded): + pass + async def send(self, payload: Dict[str, Any]): """Send a payload to the websocket. @@ -233,6 +176,8 @@ class GatewayWebsocket: if self.wsp.compress == 'zlib-stream': await self._zlib_stream_send(encoded) + elif self.wsp.compress == 'zstd-stream': + await self._zstd_stream_send(encoded) elif self.state and self.state.compress and len(encoded) > 1024: # TODO: should we only compress on >1KB packets? or maybe we # should do all? From 59f3f6bbb772fe4a04f16b245465ef1fb18fb5d8 Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 30 May 2019 21:58:23 -0300 Subject: [PATCH 3/4] gateway.websocket: add ZstdCompressor at start --- litecord/gateway/websocket.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index 7283f17..eea81a8 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -25,6 +25,7 @@ from typing import List, Dict, Any from random import randint import websockets +import zstandard as zstd from logbook import Logger from litecord.auth import raw_token_check @@ -53,7 +54,7 @@ from litecord.storage import int_ log = Logger(__name__) WebsocketProperties = collections.namedtuple( - 'WebsocketProperties', 'v encoding compress zctx tasks' + 'WebsocketProperties', 'v encoding compress zctx zsctx tasks' ) WebsocketObjects = collections.namedtuple( @@ -80,11 +81,14 @@ class GatewayWebsocket: self.presence = self.ext.presence self.ws = ws - self.wsp = WebsocketProperties(kwargs.get('v'), - kwargs.get('encoding', 'json'), - kwargs.get('compress', None), - zlib.compressobj(), - {}) + self.wsp = WebsocketProperties( + kwargs.get('v'), + kwargs.get('encoding', 'json'), + kwargs.get('compress', None), + zlib.compressobj(), + zstd.ZstdCompressor(), + {} + ) log.debug('websocket properties: {!r}', self.wsp) From 288c839806a5f6080cde23c92ad84406538cc37f Mon Sep 17 00:00:00 2001 From: Luna Date: Thu, 30 May 2019 22:17:20 -0300 Subject: [PATCH 4/4] gateway: add impl for zstd_stream_send - gateway: add utils pkg for a basic wrapper for zstandard's own stream writer --- litecord/gateway/utils.py | 30 ++++++++++++++++++++++++++++++ litecord/gateway/websocket.py | 8 +++++++- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 litecord/gateway/utils.py diff --git a/litecord/gateway/utils.py b/litecord/gateway/utils.py new file mode 100644 index 0000000..d52d7df --- /dev/null +++ b/litecord/gateway/utils.py @@ -0,0 +1,30 @@ +""" + +Litecord +Copyright (C) 2018-2019 Luna Mendes + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +""" + +import asyncio + +class WebsocketFileHandler: + """A handler around a websocket that wraps normal I/O calls into + the websocket's respective asyncio calls via asyncio.ensure_future.""" + def __init__(self, ws): + self.ws = ws + + def write(self, data): + """Write data into the websocket""" + asyncio.ensure_future(self.ws.send(data)) diff --git a/litecord/gateway/websocket.py b/litecord/gateway/websocket.py index eea81a8..365163c 100644 --- a/litecord/gateway/websocket.py +++ b/litecord/gateway/websocket.py @@ -49,6 +49,8 @@ from litecord.gateway.encoding import ( encode_json, decode_json, encode_etf, decode_etf ) +from litecord.gateway.utils import WebsocketFileHandler + from litecord.storage import int_ log = Logger(__name__) @@ -155,7 +157,11 @@ class GatewayWebsocket: await self._chunked_send(data2, 1024) async def _zstd_stream_send(self, encoded): - pass + compressor = self.wsp.zsctx.stream_writer( + WebsocketFileHandler(self.ws)) + + compressor.write(encoded) + compressor.flush(zstd.FLUSH_FRAME) async def send(self, payload: Dict[str, Any]): """Send a payload to the websocket.