diff --git a/hippolyzer/lib/base/message/circuit.py b/hippolyzer/lib/base/message/circuit.py index c0f6c2f..05e8b52 100644 --- a/hippolyzer/lib/base/message/circuit.py +++ b/hippolyzer/lib/base/message/circuit.py @@ -49,6 +49,11 @@ class Circuit: raise return self.send_datagram(serialized, message.direction, transport=transport) + def disconnect(self): + self.packet_id_base = 0 + self.unacked_reliable.clear() + self.is_alive = False + def send_datagram(self, data: bytes, direction: Direction, transport=None): self.last_packet_at = dt.datetime.now() src_addr, dst_addr = self.host, self.near_host diff --git a/hippolyzer/lib/client/hippo_client.py b/hippolyzer/lib/client/hippo_client.py index 0830baf..ddc5d22 100644 --- a/hippolyzer/lib/client/hippo_client.py +++ b/hippolyzer/lib/client/hippo_client.py @@ -110,6 +110,10 @@ class HippoClientRegion(BaseClientRegion): self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id) self.asset_uploader = AssetUploader(proxify(self)) self.objects = ClientObjectManager(proxify(self)) + self._llsd_serializer = LLSDMessageSerializer() + self._eq_task: Optional[asyncio.Task] = None + + self.message_handler.subscribe("StartPingCheck", self._handle_ping_check) def update_caps(self, caps: Mapping[str, str]) -> None: self.caps.update(caps) @@ -118,6 +122,56 @@ class HippoClientRegion(BaseClientRegion): def cap_urls(self) -> multidict.MultiDict: return self.caps.copy() + async def connect(self): + # Disconnect if we're already connected + await self.disconnect() + + self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue()) + self.circuit.is_alive = True + + async def disconnect(self): + if self._eq_task is not None: + self._eq_task.cancel() + self._eq_task = None + self.circuit.disconnect() + + async def _poll_event_queue(self): + ack: Optional[int] = None + while True: + payload = {"ack": ack, "done": False} + async with self.caps_client.post("EventQueueGet", llsd=payload) as resp: + if resp.status != 200: + await asyncio.sleep(0.1) + continue + polled = await resp.read_llsd() + for event in polled["events"]: + if self._llsd_serializer.can_handle(event["message"]): + msg = self._llsd_serializer.deserialize(event) + else: + # If this isn't a templated message (like some EQ-only events are), + # then we wrap it in a synthetic `Message` so that the API for handling + # both EQ-only and templated message events can be the same. Ick. + msg = Message(event["message"]) + if isinstance(event["body"], dict): + msg.add_block(Block("EventData", **event["body"])) + else: + # Shouldn't be any events that have anything other than a dict + # as a body, but just to be sure... + msg.add_block(Block("EventData", Data=event["body"])) + msg.synthetic = True + self.session().message_handler.handle(msg) + self.message_handler.handle(msg) + ack = polled["id"] + await asyncio.sleep(0.001) + + async def _handle_ping_check(self, message: Message): + self.circuit.send( + Message( + "CompletePingCheck", + Block("PingID", PingID=message["PingID"]["PingID"]), + ) + ) + class HippoClientSession(BaseClientSession): """Represents a client's view of a remote session""" @@ -125,6 +179,7 @@ class HippoClientSession(BaseClientSession): region_by_handle: Callable[[int], Optional[HippoClientRegion]] region_by_circuit_addr: Callable[[ADDR_TUPLE], Optional[HippoClientRegion]] + regions: List[HippoClientRegion] session_manager: HippoClient def __init__(self, id, secure_session_id, agent_id, circuit_code, session_manager: Optional[HippoClient] = None, @@ -155,8 +210,6 @@ class HippoClientSession(BaseClientSession): valid_circuit = True if valid_circuit: - # TODO: This is a little bit crap, we need to know if a UseCircuitCode was ever ACKed - # before we can start sending other packets, otherwise we might have a race. await region.circuit.send_reliable( Message( "UseCircuitCode", @@ -322,8 +375,6 @@ class HippoClient(BaseClientSessionManager): self.session: Optional[HippoClientSession] = None self.settings = ClientSettings() self._resend_task: Optional[asyncio.Task] = None - self._eq_task: Optional[asyncio.Task] = None - self._llsd_serializer = LLSDMessageSerializer() async def aclose(self): try: @@ -459,8 +510,7 @@ class HippoClient(BaseClientSessionManager): async with seed_resp_fut as seed_resp: seed_resp.raise_for_status() region.update_caps(await seed_resp.read_llsd()) - self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue()) - self.session.main_region.message_handler.subscribe("StartPingCheck", self._handle_ping_check) + await region.connect() async def logout(self): if not self.session: @@ -468,9 +518,6 @@ class HippoClient(BaseClientSessionManager): if self._resend_task: self._resend_task.cancel() self._resend_task = None - if self._eq_task: - self._eq_task.cancel() - self._eq_task = None session = self.session self.session = None @@ -482,6 +529,8 @@ class HippoClient(BaseClientSessionManager): Block("AgentData", AgentID=session.agent_id, SessionID=session.id), ) ) + for region in session.regions: + await region.disconnect() session.transport.close() def send_chat(self, message: Union[bytes, str], channel: int = 0, chat_type=ChatType.NORMAL) -> asyncio.Future: @@ -493,49 +542,10 @@ class HippoClient(BaseClientSessionManager): async def _attempt_resends(self): while True: - await asyncio.sleep(0.5) if self.session is None: - continue + break for region in self.session.regions: - if not region.circuit or not region.circuit.is_alive: + if not region.circuit.is_alive: continue region.circuit.resend_unacked() - - async def _poll_event_queue(self): - ack: Optional[int] = None - while True: - if self.session is None or self.session.main_region is None: - return - payload = {"ack": ack, "done": False} - async with self.session.main_region.caps_client.post("EventQueueGet", llsd=payload) as resp: - if resp.status != 200: - await asyncio.sleep(0.1) - continue - polled = await resp.read_llsd() - for event in polled["events"]: - if self._llsd_serializer.can_handle(event["message"]): - msg = self._llsd_serializer.deserialize(event) - else: - # If this isn't a templated message (like some EQ-only events are), - # then we wrap it in a synthetic `Message` so that the API for handling - # both EQ-only and templated message events can be the same. Ick. - msg = Message(event["message"]) - if isinstance(event["body"], dict): - msg.add_block(Block("EventData", **event["body"])) - else: - # Shouldn't be any events that have anything other than a dict - # as a body, but just to be sure... - msg.add_block(Block("EventData", Data=event["body"])) - msg.synthetic = True - self.session.message_handler.handle(msg) - self.session.main_region.message_handler.handle(msg) - ack = polled["id"] - await asyncio.sleep(0.001) - - async def _handle_ping_check(self, message: Message): - self.session.main_region.circuit.send( - Message( - "CompletePingCheck", - Block("PingID", PingID=message["PingID"]["PingID"]), - ) - ) + await asyncio.sleep(0.5)