Add test for joining voice session

This commit is contained in:
Salad Dais
2023-12-18 20:11:21 +00:00
parent f278a4bfcf
commit 6da766ef22
2 changed files with 45 additions and 22 deletions

View File

@@ -75,7 +75,7 @@ class VoiceClient:
self.vivox_conn: Optional[VivoxConnection] = None
self._poll_task = asyncio.get_event_loop().create_task(self._poll_messages())
self.message_handler: MessageHandler[VivoxMessage, str] = MessageHandler()
self.message_handler: MessageHandler[VivoxMessage, str] = MessageHandler(take_by_default=False)
@property
def username(self):

View File

@@ -207,32 +207,55 @@ class TestVoiceClient(unittest.IsolatedAsyncioTestCase):
})
await self.client.ready.wait()
async def _serve_login(self):
msg = await self._expect_message("Account.Login.1")
self.assertEqual("foo", msg.data["AccountName"])
await self.server_connection.send_event("AccountLoginStateChangeEvent", {
"AccountHandle": 2,
"StatusCode": 200,
"StatusString": "OK",
"State": 1,
})
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {
"StatusCode": 0,
"StatusString": None,
"AccountHandle": 2,
"DisplayName": "foo",
}
})
async def test_create_connector(self):
await self._serve_connector_setup()
async def test_login(self):
await self._serve_connector_setup()
async def _handle_login():
msg = await self._expect_message("Account.Login.1")
self.assertEqual("foo", msg.data["AccountName"])
await self.server_connection.send_event("AccountLoginStateChangeEvent", {
"AccountHandle": 2,
"StatusCode": 200,
"StatusString": "OK",
"State": 1,
})
await self.server_connection.send_response(msg.request_id, msg.name, {
"ReturnCode": 0,
"Results": {
"StatusCode": 0,
"StatusString": None,
"AccountHandle": 2,
"DisplayName": "foo",
}
})
login_task = asyncio.get_event_loop().create_task(_handle_login())
login_task = asyncio.create_task(self._serve_login())
await asyncio.wait_for(self.client.login("foo", "bar"), 0.5)
await asyncio.wait_for(login_task, 0.5)
async def test_create_session(self):
await self._serve_connector_setup()
login_task = asyncio.create_task(self._serve_login())
await asyncio.wait_for(self.client.login("foo", "bar"), 0.5)
await asyncio.wait_for(login_task, 0.5)
async def _serve_session():
await self._handle_message("Session.Create.1")
await self.server_connection.send_event("SessionAddedEvent", {
"SessionHandle": 4,
"SessionGroupHandle": 5,
})
await self.server_connection.send_event("ParticipantAddedEvent", {
"ParticipantUri": "uri:baz@foo",
})
serve_session_task = asyncio.create_task(_serve_session())
await self.client.join_session("uri:foo@bar", region_handle=0)
self.assertIn("uri:baz@foo", self.client.participants)
await serve_session_task