Add tests for voice connector setup

This commit is contained in:
Salad Dais
2023-12-18 06:10:51 +00:00
parent 1165769aca
commit 670acef0b4
3 changed files with 60 additions and 31 deletions

View File

@@ -9,14 +9,13 @@ import subprocess
import tempfile
import urllib.parse
import uuid
from collections import UserDict
from typing import Optional, Union, Any
from hippolyzer.lib.base.datatypes import Vector3
from hippolyzer.lib.base.events import Event
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.objects import handle_to_gridxy
from .connection import VivoxConnection
from .connection import VivoxConnection, VivoxMessage
LOG = logging.getLogger(__name__)
RESP_LOG = logging.getLogger(__name__ + ".responses")
@@ -40,12 +39,6 @@ def vivox_to_uuid(val):
return str(uuid.UUID(bytes=base64.b64decode(val, b"-_")))
class VoiceEvent(UserDict):
def __init__(self, name: str, event_dict: dict):
self.name = name
super().__init__(event_dict)
class VoiceClient:
SERVER_URL = "http://www.bhr.vivox.com/api2/" # noqa
@@ -82,7 +75,7 @@ class VoiceClient:
self.vivox_conn: Optional[VivoxConnection] = None
self._poll_task = asyncio.get_event_loop().create_task(self._poll_messages())
self.message_handler: MessageHandler[VoiceEvent, str] = MessageHandler()
self.message_handler: MessageHandler[VivoxMessage, str] = MessageHandler()
@property
def username(self):
@@ -358,10 +351,10 @@ class VoiceClient:
try:
RESP_LOG.debug(repr(msg))
if msg.type == "Event":
self.message_handler.handle(VoiceEvent(msg.action, msg.data))
self.message_handler.handle(msg)
# Spin off handler tasks for each event so that we don't block polling
_ = asyncio.get_event_loop().create_task(self._dispatch_received_event(msg.action, msg.data))
_ = asyncio.get_event_loop().create_task(self._dispatch_received_event(msg.name, msg.data))
elif msg.type == "Response":
# Might not have this request ID if it was sent directly via the socket
if msg.request_id in self._pending_req_futures:

View File

@@ -12,7 +12,7 @@ import lxml.etree
class VivoxMessage(NamedTuple):
type: str
action: str
name: str
request_id: Optional[str]
data: dict

View File

@@ -98,7 +98,7 @@ class TestVivoxConnection(unittest.IsolatedAsyncioTestCase):
self.assertEqual("quux", msg.request_id)
self.assertEqual("Request", msg.type)
self.assertEqual("Aux.GetRenderDevices.1", msg.action)
self.assertEqual("Aux.GetRenderDevices.1", msg.name)
self.assertDictEqual({"Foo": "1"}, msg.data)
i += 1
@@ -149,22 +149,58 @@ class TestVoiceClient(unittest.IsolatedAsyncioTestCase):
self.client._make_request_id = _make_request_id
async def test_connection(self):
async def _serve_login():
await self.server_connection.send_event(
"VoiceServiceConnectionStateChangedEvent",
{
"Connected": 1,
"Platform": "Linux",
"Version": 1,
"DataDirectory": "/tmp/whatever",
}
)
self.assertEqual(
('Request', 'Aux.GetCaptureDevices.1', '1', {}),
await self.server_connection.read_message()
)
async def _handle_next_message(self, name: str):
msg = await self.server_connection.read_message()
self.assertEqual(name, msg.name)
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {}
})
serve_coro = asyncio.get_event_loop().create_task(_serve_login())
# Await this here so we can see any exceptions
await serve_coro
async def _serve_connector_setup(self):
await self.server_connection.send_event(
"VoiceServiceConnectionStateChangedEvent",
{
"Connected": 1,
"Platform": "Linux",
"Version": 1,
"DataDirectory": "/tmp/whatever",
}
)
msg = await self.server_connection.read_message()
self.assertEqual('Aux.GetCaptureDevices.1', msg.name)
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {
"StatusCode": 0,
"StatusString": None,
"CaptureDevices": []
}
})
msg = await self.server_connection.read_message()
self.assertEqual('Aux.GetRenderDevices.1', msg.name)
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {
"StatusCode": 0,
"StatusString": None,
"RenderDevices": []
}
})
await self._handle_next_message("Connector.MuteLocalSpeaker.1")
await self._handle_next_message("Connector.SetLocalSpeakerVolume.1")
await self._handle_next_message("Connector.MuteLocalMic.1")
await self._handle_next_message("Connector.SetLocalMicVolume.1")
msg = await self.server_connection.read_message()
self.assertEqual('Connector.Create.1', msg.name)
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {
"ConnectorHandle": 2,
}
})
async def test_connection(self):
await self._serve_connector_setup()