Compare commits

...

4 Commits

Author SHA1 Message Date
Luna 2180fbca02 tests: remove unecessary asyncio event
queue's get() already blocks if no items are in the queue.
2022-01-28 23:52:46 -03:00
Luna 4a70d9580d tests: don't double-close on test_broken_identify 2022-01-28 23:38:33 -03:00
Luna 049523b03f tests: assert we are in a good state on autoreply 2022-01-28 23:38:12 -03:00
Luna 38b560205f tests: use an internal queue for wsproto events 2022-01-28 23:25:39 -03:00
1 changed files with 45 additions and 16 deletions

View File

@ -28,6 +28,7 @@ import pytest
import websockets import websockets
from logbook import Logger from logbook import Logger
from wsproto import WSConnection, ConnectionType from wsproto import WSConnection, ConnectionType
from wsproto.connection import ConnectionState
from wsproto.events import ( from wsproto.events import (
Request, Request,
Message, Message,
@ -55,6 +56,8 @@ class AsyncWebsocket:
self.url = url self.url = url
self.ws = WSConnection(ConnectionType.CLIENT) self.ws = WSConnection(ConnectionType.CLIENT)
self.reader, self.writer = None, None self.reader, self.writer = None, None
self.reader_task = None
self._events = asyncio.Queue()
async def send(self, data): async def send(self, data):
assert self.writer is not None assert self.writer is not None
@ -68,23 +71,49 @@ class AsyncWebsocket:
self.writer.write(self.ws.send(data)) self.writer.write(self.ws.send(data))
await self.writer.drain() await self.writer.drain()
async def recv(self, *, expect=Message, process_event: bool = True): async def _reader_loop_task(self):
in_data = await self.reader.read(4096) # continuously read messages from the socket
if not in_data: # and fill up the _events queue with them
log.info("connection closed (no data)") while True:
self.ws.receive_data(None) log.info("reading data")
else: in_data = await self.reader.read(4096)
log.debug("received {} bytes", len(in_data)) if not in_data:
self.ws.receive_data(in_data) log.info("connection closed (no data)")
self.ws.receive_data(None)
else:
log.debug("received {} bytes", len(in_data))
self.ws.receive_data(in_data)
# if we get a ping, reply with pong immediately for event in self.ws.events():
# and fetch the next event log.debug("queued ws event {}", event)
event = next(self.ws.events()) await self._events.put(event)
if isinstance(event, Ping):
await self.send(event.response()) # since we closed, we don't have to continue reading
event = next(self.ws.events()) if not in_data:
return
async def recv(self, *, expect=Message, process_event: bool = True):
# this loop is only done so we reply to pings while also being
# able to receive any other event in the middle.
#
# CloseConnection does not lead us to reading other events, so
# that's why it's left out.
while True:
# if we get a ping, reply with pong immediately
# and fetch the next event
event = await self._events.get()
log.debug("processing {}", event)
if isinstance(event, Ping):
await self.send(event.response())
continue
break
if isinstance(event, CloseConnection): if isinstance(event, CloseConnection):
assert self.ws.state is ConnectionState.REMOTE_CLOSING
await self.send(event.response()) await self.send(event.response())
if process_event: if process_event:
raise websockets.ConnectionClosed( raise websockets.ConnectionClosed(
@ -125,6 +154,8 @@ class AsyncWebsocket:
log.info("connecting to {!r} {}", host, port) log.info("connecting to {!r} {}", host, port)
self.reader, self.writer = await asyncio.open_connection(host, port) self.reader, self.writer = await asyncio.open_connection(host, port)
self.reader_task = asyncio.create_task(self._reader_loop_task())
path = parsed.path or "/" path = parsed.path or "/"
target = f"{path}?{parsed.query}" if parsed.query else path target = f"{path}?{parsed.query}" if parsed.query else path
await self.send(Request(host=parsed.netloc, target=target)) await self.send(Request(host=parsed.netloc, target=target))
@ -282,8 +313,6 @@ async def test_broken_identify(test_cli_user):
raise AssertionError("Received a JSON message but expected close") raise AssertionError("Received a JSON message but expected close")
except websockets.ConnectionClosed as exc: except websockets.ConnectionClosed as exc:
assert exc.code == 4002 assert exc.code == 4002
finally:
await _close(conn)
@pytest.mark.asyncio @pytest.mark.asyncio