Move some things from session to region
This commit is contained in:
@@ -49,6 +49,11 @@ class Circuit:
|
||||
raise
|
||||
return self.send_datagram(serialized, message.direction, transport=transport)
|
||||
|
||||
def disconnect(self):
|
||||
self.packet_id_base = 0
|
||||
self.unacked_reliable.clear()
|
||||
self.is_alive = False
|
||||
|
||||
def send_datagram(self, data: bytes, direction: Direction, transport=None):
|
||||
self.last_packet_at = dt.datetime.now()
|
||||
src_addr, dst_addr = self.host, self.near_host
|
||||
|
||||
@@ -110,6 +110,10 @@ class HippoClientRegion(BaseClientRegion):
|
||||
self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id)
|
||||
self.asset_uploader = AssetUploader(proxify(self))
|
||||
self.objects = ClientObjectManager(proxify(self))
|
||||
self._llsd_serializer = LLSDMessageSerializer()
|
||||
self._eq_task: Optional[asyncio.Task] = None
|
||||
|
||||
self.message_handler.subscribe("StartPingCheck", self._handle_ping_check)
|
||||
|
||||
def update_caps(self, caps: Mapping[str, str]) -> None:
|
||||
self.caps.update(caps)
|
||||
@@ -118,6 +122,56 @@ class HippoClientRegion(BaseClientRegion):
|
||||
def cap_urls(self) -> multidict.MultiDict:
|
||||
return self.caps.copy()
|
||||
|
||||
async def connect(self):
|
||||
# Disconnect if we're already connected
|
||||
await self.disconnect()
|
||||
|
||||
self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
|
||||
self.circuit.is_alive = True
|
||||
|
||||
async def disconnect(self):
|
||||
if self._eq_task is not None:
|
||||
self._eq_task.cancel()
|
||||
self._eq_task = None
|
||||
self.circuit.disconnect()
|
||||
|
||||
async def _poll_event_queue(self):
|
||||
ack: Optional[int] = None
|
||||
while True:
|
||||
payload = {"ack": ack, "done": False}
|
||||
async with self.caps_client.post("EventQueueGet", llsd=payload) as resp:
|
||||
if resp.status != 200:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
polled = await resp.read_llsd()
|
||||
for event in polled["events"]:
|
||||
if self._llsd_serializer.can_handle(event["message"]):
|
||||
msg = self._llsd_serializer.deserialize(event)
|
||||
else:
|
||||
# If this isn't a templated message (like some EQ-only events are),
|
||||
# then we wrap it in a synthetic `Message` so that the API for handling
|
||||
# both EQ-only and templated message events can be the same. Ick.
|
||||
msg = Message(event["message"])
|
||||
if isinstance(event["body"], dict):
|
||||
msg.add_block(Block("EventData", **event["body"]))
|
||||
else:
|
||||
# Shouldn't be any events that have anything other than a dict
|
||||
# as a body, but just to be sure...
|
||||
msg.add_block(Block("EventData", Data=event["body"]))
|
||||
msg.synthetic = True
|
||||
self.session().message_handler.handle(msg)
|
||||
self.message_handler.handle(msg)
|
||||
ack = polled["id"]
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
async def _handle_ping_check(self, message: Message):
|
||||
self.circuit.send(
|
||||
Message(
|
||||
"CompletePingCheck",
|
||||
Block("PingID", PingID=message["PingID"]["PingID"]),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class HippoClientSession(BaseClientSession):
|
||||
"""Represents a client's view of a remote session"""
|
||||
@@ -125,6 +179,7 @@ class HippoClientSession(BaseClientSession):
|
||||
|
||||
region_by_handle: Callable[[int], Optional[HippoClientRegion]]
|
||||
region_by_circuit_addr: Callable[[ADDR_TUPLE], Optional[HippoClientRegion]]
|
||||
regions: List[HippoClientRegion]
|
||||
session_manager: HippoClient
|
||||
|
||||
def __init__(self, id, secure_session_id, agent_id, circuit_code, session_manager: Optional[HippoClient] = None,
|
||||
@@ -155,8 +210,6 @@ class HippoClientSession(BaseClientSession):
|
||||
valid_circuit = True
|
||||
|
||||
if valid_circuit:
|
||||
# TODO: This is a little bit crap, we need to know if a UseCircuitCode was ever ACKed
|
||||
# before we can start sending other packets, otherwise we might have a race.
|
||||
await region.circuit.send_reliable(
|
||||
Message(
|
||||
"UseCircuitCode",
|
||||
@@ -322,8 +375,6 @@ class HippoClient(BaseClientSessionManager):
|
||||
self.session: Optional[HippoClientSession] = None
|
||||
self.settings = ClientSettings()
|
||||
self._resend_task: Optional[asyncio.Task] = None
|
||||
self._eq_task: Optional[asyncio.Task] = None
|
||||
self._llsd_serializer = LLSDMessageSerializer()
|
||||
|
||||
async def aclose(self):
|
||||
try:
|
||||
@@ -459,8 +510,7 @@ class HippoClient(BaseClientSessionManager):
|
||||
async with seed_resp_fut as seed_resp:
|
||||
seed_resp.raise_for_status()
|
||||
region.update_caps(await seed_resp.read_llsd())
|
||||
self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
|
||||
self.session.main_region.message_handler.subscribe("StartPingCheck", self._handle_ping_check)
|
||||
await region.connect()
|
||||
|
||||
async def logout(self):
|
||||
if not self.session:
|
||||
@@ -468,9 +518,6 @@ class HippoClient(BaseClientSessionManager):
|
||||
if self._resend_task:
|
||||
self._resend_task.cancel()
|
||||
self._resend_task = None
|
||||
if self._eq_task:
|
||||
self._eq_task.cancel()
|
||||
self._eq_task = None
|
||||
|
||||
session = self.session
|
||||
self.session = None
|
||||
@@ -482,6 +529,8 @@ class HippoClient(BaseClientSessionManager):
|
||||
Block("AgentData", AgentID=session.agent_id, SessionID=session.id),
|
||||
)
|
||||
)
|
||||
for region in session.regions:
|
||||
await region.disconnect()
|
||||
session.transport.close()
|
||||
|
||||
def send_chat(self, message: Union[bytes, str], channel: int = 0, chat_type=ChatType.NORMAL) -> asyncio.Future:
|
||||
@@ -493,49 +542,10 @@ class HippoClient(BaseClientSessionManager):
|
||||
|
||||
async def _attempt_resends(self):
|
||||
while True:
|
||||
await asyncio.sleep(0.5)
|
||||
if self.session is None:
|
||||
continue
|
||||
break
|
||||
for region in self.session.regions:
|
||||
if not region.circuit or not region.circuit.is_alive:
|
||||
if not region.circuit.is_alive:
|
||||
continue
|
||||
region.circuit.resend_unacked()
|
||||
|
||||
async def _poll_event_queue(self):
|
||||
ack: Optional[int] = None
|
||||
while True:
|
||||
if self.session is None or self.session.main_region is None:
|
||||
return
|
||||
payload = {"ack": ack, "done": False}
|
||||
async with self.session.main_region.caps_client.post("EventQueueGet", llsd=payload) as resp:
|
||||
if resp.status != 200:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
polled = await resp.read_llsd()
|
||||
for event in polled["events"]:
|
||||
if self._llsd_serializer.can_handle(event["message"]):
|
||||
msg = self._llsd_serializer.deserialize(event)
|
||||
else:
|
||||
# If this isn't a templated message (like some EQ-only events are),
|
||||
# then we wrap it in a synthetic `Message` so that the API for handling
|
||||
# both EQ-only and templated message events can be the same. Ick.
|
||||
msg = Message(event["message"])
|
||||
if isinstance(event["body"], dict):
|
||||
msg.add_block(Block("EventData", **event["body"]))
|
||||
else:
|
||||
# Shouldn't be any events that have anything other than a dict
|
||||
# as a body, but just to be sure...
|
||||
msg.add_block(Block("EventData", Data=event["body"]))
|
||||
msg.synthetic = True
|
||||
self.session.message_handler.handle(msg)
|
||||
self.session.main_region.message_handler.handle(msg)
|
||||
ack = polled["id"]
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
async def _handle_ping_check(self, message: Message):
|
||||
self.session.main_region.circuit.send(
|
||||
Message(
|
||||
"CompletePingCheck",
|
||||
Block("PingID", PingID=message["PingID"]["PingID"]),
|
||||
)
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
Reference in New Issue
Block a user