Add basic EQ handling to client
This commit is contained in:
@@ -12,15 +12,16 @@ from typing import *
|
||||
import aiohttp
|
||||
import multidict
|
||||
|
||||
from hippolyzer.lib.base.helpers import proxify
|
||||
from hippolyzer.lib.base.helpers import proxify, get_resource_filename
|
||||
from hippolyzer.lib.base.message.circuit import Circuit
|
||||
from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer
|
||||
from hippolyzer.lib.base.message.message import Message, Block
|
||||
from hippolyzer.lib.base.message.message_dot_xml import MessageDotXML
|
||||
from hippolyzer.lib.base.message.message_handler import MessageHandler
|
||||
from hippolyzer.lib.base.message.udpdeserializer import UDPMessageDeserializer
|
||||
from hippolyzer.lib.base.network.caps_client import CapsClient
|
||||
from hippolyzer.lib.base.network.caps_client import CapsClient, CAPS_DICT
|
||||
from hippolyzer.lib.base.network.transport import ADDR_TUPLE, Direction, SocketUDPTransport, AbstractUDPTransport
|
||||
from hippolyzer.lib.base.settings import Settings
|
||||
from hippolyzer.lib.base.settings import Settings, SettingDescriptor
|
||||
from hippolyzer.lib.base.templates import RegionHandshakeReplyFlags
|
||||
from hippolyzer.lib.base.transfer_manager import TransferManager
|
||||
from hippolyzer.lib.base.xfer_manager import XferManager
|
||||
@@ -32,10 +33,25 @@ from hippolyzer.lib.client.state import BaseClientSession, BaseClientRegion, Bas
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ClientSettings(Settings):
|
||||
# Off by default for now, the cert validation is a big mess due to LL using an internal CA.
|
||||
SSL_VERIFY: bool = SettingDescriptor(False)
|
||||
SSL_CERT_PATH: str = SettingDescriptor(get_resource_filename("lib/base/network/data/ca-bundle.crt"))
|
||||
|
||||
|
||||
class HippoCapsClient(CapsClient):
|
||||
def __init__(
|
||||
self,
|
||||
settings: ClientSettings,
|
||||
caps: Optional[CAPS_DICT] = None,
|
||||
session: Optional[aiohttp.ClientSession] = None,
|
||||
) -> None:
|
||||
super().__init__(caps, session)
|
||||
self._settings = settings
|
||||
|
||||
def _request_fixups(self, cap_or_url: str, headers: Dict, proxy: Optional[bool], ssl: Any):
|
||||
headers["User-Agent"] = f"Hippolyzer/v{version('hippolyzer')}"
|
||||
return cap_or_url, headers, proxy, False
|
||||
return cap_or_url, headers, proxy, self._settings.SSL_VERIFY
|
||||
|
||||
|
||||
class HippoClientProtocol(asyncio.DatagramProtocol):
|
||||
@@ -87,7 +103,7 @@ class HippoClientRegion(BaseClientRegion):
|
||||
if seed_cap:
|
||||
self.caps["Seed"] = seed_cap
|
||||
self.session: Callable[[], HippoClientSession] = weakref.ref(session)
|
||||
self.caps_client = HippoCapsClient(self.caps, session.http_session)
|
||||
self.caps_client = HippoCapsClient(session.session_manager.settings, self.caps, session.http_session)
|
||||
self.xfer_manager = XferManager(proxify(self), self.session().secure_session_id)
|
||||
self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id)
|
||||
self.asset_uploader = AssetUploader(proxify(self))
|
||||
@@ -150,6 +166,8 @@ class HippoClientSession(BaseClientSession):
|
||||
|
||||
|
||||
class HippoClient(BaseClientSessionManager):
|
||||
"""A simple client, only connects to one region at a time currently."""
|
||||
|
||||
SUPPORTED_CAPS: Set[str] = {
|
||||
"AbuseCategories",
|
||||
"AcceptFriendship",
|
||||
@@ -298,8 +316,10 @@ class HippoClient(BaseClientSessionManager):
|
||||
self._options = options if options is not None else self.DEFAULT_OPTIONS
|
||||
self.http_session: Optional[aiohttp.ClientSession] = aiohttp.ClientSession()
|
||||
self.session: Optional[HippoClientSession] = None
|
||||
self.settings = Settings()
|
||||
self.settings = ClientSettings()
|
||||
self._resend_task: Optional[asyncio.Task] = None
|
||||
self._eq_task: Optional[asyncio.Task] = None
|
||||
self._llsd_serializer = LLSDMessageSerializer()
|
||||
|
||||
async def aclose(self):
|
||||
try:
|
||||
@@ -381,7 +401,7 @@ class HippoClient(BaseClientSessionManager):
|
||||
self.session.main_region = region
|
||||
|
||||
# Kick this off and await it later
|
||||
seed_resp_fut = region.caps_client.post("Seed", session=self.http_session, llsd=list(self.SUPPORTED_CAPS))
|
||||
seed_resp_fut = region.caps_client.post("Seed", llsd=list(self.SUPPORTED_CAPS))
|
||||
|
||||
# Register first so we can handle it even if the ack happens after the message is sent
|
||||
region_handshake_fut = region.message_handler.wait_for(("RegionHandshake",))
|
||||
@@ -428,7 +448,9 @@ class HippoClient(BaseClientSessionManager):
|
||||
)
|
||||
)
|
||||
async with seed_resp_fut as seed_resp:
|
||||
seed_resp.raise_for_status()
|
||||
region.update_caps(await seed_resp.read_llsd())
|
||||
self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
|
||||
|
||||
async def logout(self):
|
||||
if not self.session:
|
||||
@@ -436,6 +458,9 @@ class HippoClient(BaseClientSessionManager):
|
||||
if self._resend_task:
|
||||
self._resend_task.cancel()
|
||||
self._resend_task = None
|
||||
if self._eq_task:
|
||||
self._eq_task.cancel()
|
||||
self._eq_task = None
|
||||
|
||||
session = self.session
|
||||
self.session = None
|
||||
@@ -451,10 +476,31 @@ class HippoClient(BaseClientSessionManager):
|
||||
|
||||
async def _attempt_resends(self):
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.5)
|
||||
if self.session is None:
|
||||
continue
|
||||
for region in self.session.regions:
|
||||
if not region.circuit or not region.circuit.is_alive:
|
||||
continue
|
||||
region.circuit.resend_unacked()
|
||||
|
||||
async def _poll_event_queue(self):
|
||||
ack: Optional[int] = None
|
||||
while True:
|
||||
if self.session is None or self.session.main_region is None:
|
||||
return
|
||||
payload = {"ack": ack, "done": False}
|
||||
async with self.session.main_region.caps_client.post("EventQueueGet", llsd=payload) as resp:
|
||||
if resp.status != 200:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
polled = await resp.read_llsd()
|
||||
for event in polled["events"]:
|
||||
if not self._llsd_serializer.can_handle(event["message"]):
|
||||
# TODO: handle non-message EQ events
|
||||
continue
|
||||
msg = self._llsd_serializer.deserialize(event)
|
||||
self.session.message_handler.handle(msg)
|
||||
self.session.main_region.message_handler.handle(msg)
|
||||
ack = polled["id"]
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Tuple, Optional
|
||||
|
||||
import aioresponses
|
||||
|
||||
from hippolyzer.lib.base import llsd
|
||||
from hippolyzer.lib.base.datatypes import UUID
|
||||
from hippolyzer.lib.base.message.circuit import Circuit
|
||||
from hippolyzer.lib.base.message.message import Message, Block
|
||||
@@ -83,6 +84,13 @@ class TestHippoClient(unittest.IsolatedAsyncioTestCase):
|
||||
"region_y": 123,
|
||||
"seed_capability": "https://127.0.0.1:4/foo",
|
||||
}
|
||||
FAKE_SEED_RESP = {
|
||||
"EventQueueGet": "https://127.0.0.1:5/",
|
||||
}
|
||||
FAKE_EQ_RESP = {
|
||||
"id": 1,
|
||||
"events": [],
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.server_handler = MessageHandler()
|
||||
@@ -99,7 +107,8 @@ class TestHippoClient(unittest.IsolatedAsyncioTestCase):
|
||||
async def _do_login():
|
||||
with aioresponses.aioresponses() as m:
|
||||
m.post(self.FAKE_LOGIN_URI, body=self._make_fake_login_body())
|
||||
m.post(self.FAKE_LOGIN_RESP['seed_capability'], body="<llsd><map></map></llsd>")
|
||||
m.post(self.FAKE_LOGIN_RESP['seed_capability'], body=llsd.format_xml(self.FAKE_SEED_RESP))
|
||||
m.post(self.FAKE_SEED_RESP['EventQueueGet'], body=llsd.format_xml(self.FAKE_EQ_RESP))
|
||||
await client.login("foo", "bar", login_uri=self.FAKE_LOGIN_URI)
|
||||
await client.logout()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user