diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 6344bd4..284123c 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -57,7 +57,6 @@ class AsyncWebsocket: self.ws = WSConnection(ConnectionType.CLIENT) self.reader, self.writer = None, None self.reader_task = None - self._waiting_for_message_event = None self._events = asyncio.Queue() async def send(self, data): @@ -73,13 +72,8 @@ class AsyncWebsocket: await self.writer.drain() async def _reader_loop_task(self): - # continuously read messages from the socket # and fill up the _events queue with them - # - # if a recv() coroutine has been waiting for an event - # (via _waiting_for_message_event), then set that event so that - # we immediately process it while True: log.info("reading data") in_data = await self.reader.read(4096) @@ -94,26 +88,12 @@ class AsyncWebsocket: log.debug("queued ws event {}", event) await self._events.put(event) - if not self._events.empty() and self._waiting_for_message_event: - self._waiting_for_message_event.set() - # since we closed, we don't have to continue reading if not in_data: return async def recv(self, *, expect=Message, process_event: bool = True): - # if queue is empty, wait until it's filled up - if self._events.empty(): - self._waiting_for_message_event = asyncio.Event() - try: - await asyncio.wait( - [self._waiting_for_message_event.wait(), self.reader_task], - return_when=asyncio.FIRST_COMPLETED, - ) - finally: - self._waiting_for_message_event = None - # this loop is only done so we reply to pings while also being # able to receive any other event in the middle. # @@ -123,7 +103,7 @@ class AsyncWebsocket: while True: # if we get a ping, reply with pong immediately # and fetch the next event - event = self._events.get_nowait() + event = await self._events.get() log.debug("processing {}", event) if isinstance(event, Ping):