diff --git a/hippolyzer/lib/base/message/circuit.py b/hippolyzer/lib/base/message/circuit.py index c69694c..a001c4b 100644 --- a/hippolyzer/lib/base/message/circuit.py +++ b/hippolyzer/lib/base/message/circuit.py @@ -1,14 +1,16 @@ from __future__ import annotations +import abc import datetime as dt import logging from typing import * +from typing import Optional +from .message_handler import MessageHandler from ..network.transport import AbstractUDPTransport, UDPPacket, Direction, ADDR_TUPLE -from .message import Block +from .message import Block, Message from .msgtypes import PacketFlags from .udpserializer import UDPMessageSerializer -from .message import Message class Circuit: @@ -63,3 +65,16 @@ class Circuit: def __repr__(self): return "<%s %r : %r>" % (self.__class__.__name__, self.near_host, self.host) + + +class ConnectionHolder(abc.ABC): + """ + Any object that has both a circuit and a message handler + + Preferred to explicitly passing around a circuit, message handler pair + because generally a ConnectionHolder represents a region or a client. + The same region or client may have multiple different circuits across the + lifetime of a session (due to region restarts, etc.) + """ + circuit: Optional[Circuit] + message_handler: MessageHandler[Message] diff --git a/hippolyzer/lib/base/network/caps_client.py b/hippolyzer/lib/base/network/caps_client.py index 93c7c1e..146420c 100644 --- a/hippolyzer/lib/base/network/caps_client.py +++ b/hippolyzer/lib/base/network/caps_client.py @@ -77,7 +77,7 @@ class _HippoSessionRequestContextManager: CAPS_DICT = Union[ Mapping[str, str], - multidict.MultiDict[Tuple[Any, str]] + multidict.MultiDict[str], ] @@ -88,6 +88,9 @@ class CapsClient: def _request_fixups(self, cap_or_url: str, headers: Dict, proxy: Optional[bool], ssl: Any): return cap_or_url, headers, proxy, ssl + def _get_caps(self) -> Optional[CAPS_DICT]: + return self._caps + def request(self, method: str, cap_or_url: str, *, path: str = "", data: Any = None, headers: Optional[Dict] = None, session: Optional[aiohttp.ClientSession] = None, llsd: Any = dataclasses.MISSING, params: Optional[Dict[str, Any]] = None, @@ -97,14 +100,12 @@ class CapsClient: if path: raise ValueError("Specifying both path and a full URL not supported") else: - if self._caps is None: + caps = self._get_caps() + if caps is None: raise RuntimeError(f"Need a caps dict to request a Cap like {cap_or_url}") - if cap_or_url not in self._caps: + if cap_or_url not in caps: raise KeyError(f"{cap_or_url} is not a full URL and not a Cap") - cap_or_url = self._caps[cap_or_url] - # Stupid hack for proxy multidicts that contain a tuple of `(cap_type, cap_url)` - if isinstance(cap_or_url, tuple): - cap_or_url = cap_or_url[-1] + cap_or_url = caps[cap_or_url] if path: cap_or_url += path diff --git a/hippolyzer/lib/base/transfer_manager.py b/hippolyzer/lib/base/transfer_manager.py index 8fbf2c8..d5755ed 100644 --- a/hippolyzer/lib/base/transfer_manager.py +++ b/hippolyzer/lib/base/transfer_manager.py @@ -8,9 +8,8 @@ import dataclasses from typing import * from hippolyzer.lib.base.datatypes import UUID -from hippolyzer.lib.base.message.circuit import Circuit from hippolyzer.lib.base.message.message import Block, Message -from hippolyzer.lib.base.message.message_handler import MessageHandler +from hippolyzer.lib.base.message.circuit import ConnectionHolder from hippolyzer.lib.base.templates import ( TransferRequestParamsBase, TransferChannelType, @@ -71,13 +70,11 @@ class Transfer: class TransferManager: def __init__( self, - message_handler: MessageHandler[Message], - circuit: Circuit, + connection_holder: ConnectionHolder, agent_id: Optional[UUID] = None, session_id: Optional[UUID] = None, ): - self._message_handler = message_handler - self._circuit = circuit + self._connection_holder = connection_holder self._agent_id = agent_id self._session_id = session_id @@ -97,7 +94,7 @@ class TransferManager: if params_dict.get("SessionID", dataclasses.MISSING) is None: params.SessionID = self._session_id - self._circuit.send_message(Message( + self._connection_holder.circuit.send_message(Message( 'TransferRequest', Block( 'TransferInfo', @@ -114,7 +111,7 @@ class TransferManager: async def _pump_transfer_replies(self, transfer: Transfer): # Subscribe to message related to our transfer while we're in this block - with self._message_handler.subscribe_async( + with self._connection_holder.message_handler.subscribe_async( _TRANSFER_MESSAGES, predicate=transfer.is_our_message, ) as get_msg: diff --git a/hippolyzer/lib/base/xfer_manager.py b/hippolyzer/lib/base/xfer_manager.py index f2a9cea..7e05627 100644 --- a/hippolyzer/lib/base/xfer_manager.py +++ b/hippolyzer/lib/base/xfer_manager.py @@ -9,12 +9,11 @@ import random from typing import * from hippolyzer.lib.base.datatypes import UUID, RawBytes -from hippolyzer.lib.base.message.circuit import Circuit from hippolyzer.lib.base.message.data_packer import TemplateDataPacker from hippolyzer.lib.base.message.message import Block, Message -from hippolyzer.lib.base.message.message_handler import MessageHandler from hippolyzer.lib.base.message.msgtypes import MsgType from hippolyzer.lib.base.network.transport import Direction +from hippolyzer.lib.base.message.circuit import ConnectionHolder from hippolyzer.lib.base.templates import XferPacket, XferFilePath, AssetType, XferError _XFER_MESSAGES = {"AbortXfer", "ConfirmXferPacket", "RequestXfer", "SendXferPacket"} @@ -93,12 +92,10 @@ class UploadStrategy(enum.IntEnum): class XferManager: def __init__( self, - message_handler: MessageHandler[Message], - circuit: Circuit, + connection_holder: ConnectionHolder, secure_session_id: Optional[UUID] = None, ): - self._message_handler = message_handler - self._circuit = circuit + self._connection_holder = connection_holder self._secure_session_id = secure_session_id def request( @@ -113,7 +110,7 @@ class XferManager: direction: Direction = Direction.OUT, ) -> Xfer: xfer_id = xfer_id if xfer_id is not None else random.getrandbits(64) - self._circuit.send_message(Message( + self._connection_holder.circuit.send_message(Message( 'RequestXfer', Block( 'XferID', @@ -132,7 +129,7 @@ class XferManager: return xfer async def _pump_xfer_replies(self, xfer: Xfer): - with self._message_handler.subscribe_async( + with self._connection_holder.message_handler.subscribe_async( _XFER_MESSAGES, predicate=xfer.is_our_message, ) as get_msg: @@ -177,7 +174,7 @@ class XferManager: to_ack = range(xfer.next_ackable, ack_max) xfer.next_ackable = ack_max for ack_id in to_ack: - self._circuit.send_message(Message( + self._connection_holder.circuit.send_message(Message( "ConfirmXferPacket", Block("XferID", ID=xfer.xfer_id, Packet=ack_id), direction=xfer.direction, @@ -219,7 +216,7 @@ class XferManager: else: inline_data = data - self._circuit.send_message(Message( + self._connection_holder.circuit.send_message(Message( "AssetUploadRequest", Block( "AssetBlock", @@ -235,7 +232,7 @@ class XferManager: return fut async def _pump_asset_upload(self, xfer: Optional[Xfer], transaction_id: UUID, fut: asyncio.Future): - message_handler = self._message_handler + message_handler = self._connection_holder.message_handler # We'll receive an Xfer request for the asset we're uploading. # asset ID is determined by hashing secure session ID with chosen transaction ID. asset_id: UUID = UUID.combine(transaction_id, self._secure_session_id) @@ -264,7 +261,7 @@ class XferManager: request_predicate: Callable[[Message], bool], wait_for_confirm: bool = True ): - message_handler = self._message_handler + message_handler = self._connection_holder.message_handler request_msg = await message_handler.wait_for( 'RequestXfer', predicate=request_predicate, timeout=5.0) xfer.xfer_id = request_msg["XferID"]["ID"] @@ -275,7 +272,7 @@ class XferManager: chunk = xfer.chunks.pop(packet_id) # EOF if there are no chunks left packet_val = XferPacket(PacketID=packet_id, IsEOF=not bool(xfer.chunks)) - self._circuit.send_message(Message( + self._connection_holder.circuit.send_message(Message( "SendXferPacket", Block("XferID", ID=xfer.xfer_id, Packet_=packet_val), Block("DataPacket", Data=chunk), diff --git a/hippolyzer/lib/client/__init__.py b/hippolyzer/lib/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hippolyzer/lib/client/namecache.py b/hippolyzer/lib/client/namecache.py new file mode 100644 index 0000000..ecc8f38 --- /dev/null +++ b/hippolyzer/lib/client/namecache.py @@ -0,0 +1,78 @@ +import dataclasses +from typing import * + +from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.message.message import Message +from hippolyzer.lib.base.message.message_handler import MessageHandler + + +@dataclasses.dataclass +class NameCacheEntry: + full_id: UUID + first_name: Optional[str] = None + last_name: Optional[str] = None + display_name: Optional[str] = None + + def __str__(self): + if self.display_name: + return f"{self.display_name} ({self.legacy_name})" + if self.legacy_name: + return self.legacy_name + return f"(???) ({self.full_id})" + + @property + def legacy_name(self) -> Optional[str]: + if self.first_name is None: + return None + return f"{self.first_name} {self.last_name}" + + @property + def preferred_name(self) -> Optional[str]: + if self.display_name: + return self.display_name + return self.legacy_name + + +class NameCache: + def __init__(self): + self._cache: Dict[UUID, NameCacheEntry] = {} + + def create_subscriptions( + self, + message_handler: MessageHandler[Message], + ): + message_handler.subscribe("UUIDNameReply", self._handle_uuid_name_reply) + + def lookup(self, uuid: UUID) -> Optional[NameCacheEntry]: + return self._cache.get(uuid) + + def update(self, full_id: UUID, vals: dict): + # upsert the cache entry + entry = self._cache.get(full_id) or NameCacheEntry(full_id=full_id) + if "FirstName" in vals: + entry.first_name = vals["FirstName"] + if "LastName" in vals: + entry.last_name = vals["LastName"] + if "DisplayName" in vals: + entry.display_name = vals["DisplayName"] if vals["DisplayName"] else None + self._cache[full_id] = entry + + def _handle_uuid_name_reply(self, msg: Message): + for block in msg.blocks["UUIDNameBlock"]: + self.update(block["ID"], { + "FirstName": block["FirstName"], + "LastName": block["LastName"], + }) + + def _process_display_names_response(self, parsed: dict): + """Handle the response from the GetDisplayNames cap""" + for agent in parsed["agents"]: + # Don't set display name if they just have the default + display_name = None + if not agent["is_display_name_default"]: + display_name = agent["display_name"] + self.update(agent["id"], { + "FirstName": agent["legacy_first_name"], + "LastName": agent["legacy_last_name"], + "DisplayName": display_name, + }) diff --git a/hippolyzer/lib/proxy/objects.py b/hippolyzer/lib/client/object_manager.py similarity index 87% rename from hippolyzer/lib/proxy/objects.py rename to hippolyzer/lib/client/object_manager.py index 6fc7710..cbbe886 100644 --- a/hippolyzer/lib/proxy/objects.py +++ b/hippolyzer/lib/client/object_manager.py @@ -6,38 +6,30 @@ import enum import itertools import logging import math -import typing import weakref from typing import * -from hippolyzer.lib.base import llsd from hippolyzer.lib.base.datatypes import UUID, Vector3 from hippolyzer.lib.base.helpers import proxify from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.objects import ( - handle_to_global_pos, normalize_object_update, normalize_terse_object_update, normalize_object_update_compressed_data, normalize_object_update_compressed, - Object, + Object, handle_to_global_pos, ) -from hippolyzer.lib.proxy.addons import AddonManager -from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow -from hippolyzer.lib.proxy.namecache import NameCache, NameCacheEntry +from hippolyzer.lib.client.namecache import NameCache, NameCacheEntry +from hippolyzer.lib.client.state import BaseClientSession, BaseClientRegion from hippolyzer.lib.base.templates import PCode, ObjectStateSerializer -from hippolyzer.lib.proxy.vocache import RegionViewerObjectCacheChain -if TYPE_CHECKING: - from hippolyzer.lib.proxy.region import ProxiedRegion - from hippolyzer.lib.proxy.sessions import Session LOG = logging.getLogger(__name__) class OrphanManager: def __init__(self): - self._orphans: typing.Dict[int, typing.List[int]] = collections.defaultdict(list) + self._orphans: Dict[int, List[int]] = collections.defaultdict(list) def clear(self): return self._orphans.clear() @@ -55,7 +47,7 @@ class OrphanManager: del self._orphans[parent_id] return removed - def collect_orphans(self, parent_localid: int) -> typing.Sequence[int]: + def collect_orphans(self, parent_localid: int) -> Sequence[int]: return self._orphans.pop(parent_localid, []) def track_orphan(self, obj: Object): @@ -67,7 +59,7 @@ class OrphanManager: self._orphans[parent_id].append(local_id) -OBJECT_OR_LOCAL = typing.Union[Object, int] +OBJECT_OR_LOCAL = Union[Object, int] class LocationType(enum.IntEnum): @@ -129,31 +121,27 @@ class Avatar: return self._resolved_name.preferred_name -class ObjectManager: +class ClientObjectManager: """ Object manager for a specific region """ - def __init__(self, region: ProxiedRegion, use_vo_cache: bool = False): - self._region: ProxiedRegion = proxify(region) - self.use_vo_cache = use_vo_cache - self.cache_loaded: bool = False - self.object_cache: RegionViewerObjectCacheChain = RegionViewerObjectCacheChain([]) - self._localid_lookup: typing.Dict[int, Object] = {} - self._fullid_lookup: typing.Dict[UUID, int] = {} - self._coarse_locations: typing.Dict[UUID, Vector3] = {} - self._object_futures: typing.Dict[Tuple[int, int], List[asyncio.Future]] = {} + def __init__(self, region: BaseClientRegion, name_cache: Optional[NameCache]): + self._region: BaseClientRegion = region + self._localid_lookup: Dict[int, Object] = {} + self._fullid_lookup: Dict[UUID, int] = {} + self._coarse_locations: Dict[UUID, Vector3] = {} + self._object_futures: Dict[Tuple[int, int], List[asyncio.Future]] = {} + self._orphan_manager = OrphanManager() + self.name_cache = name_cache or NameCache() # Objects that we've seen references to but don't have data for self.missing_locals = set() - self._orphan_manager = OrphanManager() - self._world_objects: WorldObjectManager = region.session().objects - self.name_cache: NameCache = region.session().session_manager.name_cache + + self._world_objects: ClientWorldObjectManager = proxify(region.session().objects) message_handler = region.message_handler message_handler.subscribe("CoarseLocationUpdate", self._handle_coarse_location_update) - region.http_message_handler.subscribe("GetObjectCost", - self._handle_get_object_cost) message_handler.subscribe("KillObject", self._handle_kill_object) message_handler.subscribe("ObjectProperties", @@ -168,6 +156,15 @@ class ObjectManager: def all_objects(self) -> Iterable[Object]: return self._localid_lookup.values() + def lookup_localid(self, localid: int) -> Optional[Object]: + return self._localid_lookup.get(localid, None) + + def lookup_fullid(self, fullid: UUID) -> Optional[Object]: + local_id = self._fullid_lookup.get(fullid, None) + if local_id is None: + return None + return self.lookup_localid(local_id) + @property def all_avatars(self) -> Iterable[Avatar]: av_objects = {o.FullID: o for o in self.all_objects if o.PCode == PCode.AVATAR} @@ -187,16 +184,7 @@ class ObjectManager: )) return avatars - def lookup_localid(self, localid: int) -> typing.Optional[Object]: - return self._localid_lookup.get(localid, None) - - def lookup_fullid(self, fullid: UUID) -> typing.Optional[Object]: - local_id = self._fullid_lookup.get(fullid, None) - if local_id is None: - return None - return self.lookup_localid(local_id) - - def lookup_avatar(self, fullid: UUID) -> typing.Optional[Avatar]: + def lookup_avatar(self, fullid: UUID) -> Optional[Avatar]: for avatar in self.all_avatars: if avatar.FullID == fullid: return avatar @@ -250,6 +238,12 @@ class ObjectManager: self.run_object_update_hooks(obj, set(obj.to_dict().keys()), UpdateType.OBJECT_UPDATE) def track_object(self, obj: Object): + obj_same_localid = self._localid_lookup.get(obj.LocalID) + if obj_same_localid: + LOG.error(f"Clobbering existing object with LocalID {obj.LocalID}! " + f"{obj.to_dict()} clobbered {obj_same_localid.to_dict()}") + # Clear the clobbered object out of the FullID lookup + self._fullid_lookup.pop(obj_same_localid.FullID, None) self._localid_lookup[obj.LocalID] = obj self._fullid_lookup[obj.FullID] = obj.LocalID # If it was missing, it's not missing anymore. @@ -324,14 +318,6 @@ class ObjectManager: else: LOG.debug(f"Changing parent of {obj.LocalID}, but couldn't find old parent") - def _cancel_futures(self, local_id: int): - # Object was killed, so need to kill any pending futures. - for fut_key, futs in self._object_futures.items(): - if fut_key[0] == local_id: - for fut in futs: - fut.cancel() - break - def _kill_object_by_local_id(self, local_id: int): obj = self.lookup_localid(local_id) self.missing_locals -= {local_id} @@ -339,7 +325,7 @@ class ObjectManager: self._cancel_futures(local_id) if obj: - AddonManager.handle_object_killed(self._region.session(), self._region, obj) + self.run_kill_object_hooks(obj) child_ids = obj.ChildIDs else: LOG.debug(f"Tried to kill unknown object {local_id}") @@ -397,23 +383,27 @@ class ObjectManager: packet.meta["ObjectUpdateIDs"] = tuple(seen_locals) + # noinspection PyUnusedLocal + def _lookup_cache_entry(self, local_id: int, crc: int) -> Optional[bytes]: + return None + def handle_object_update_cached(self, packet: Message): seen_locals = [] + missing_locals = set() for block in packet['ObjectData']: seen_locals.append(block["ID"]) update_flags = block.deserialize_var("UpdateFlags", make_copy=False) # Check if we already know about the object obj = self.lookup_localid(block["ID"]) - if obj is not None: + if obj is not None and obj.CRC == block["CRC"]: self._update_existing_object(obj, { "UpdateFlags": update_flags, "RegionHandle": self._region.handle, }, UpdateType.OBJECT_UPDATE) continue - # Check if the object is in a viewer's VOCache - cached_obj_data = self.object_cache.lookup_object_data(block["ID"], block["CRC"]) + cached_obj_data = self._lookup_cache_entry(block["ID"], block["CRC"]) if cached_obj_data is not None: cached_obj = normalize_object_update_compressed_data(cached_obj_data) cached_obj["UpdateFlags"] = update_flags @@ -422,9 +412,14 @@ class ObjectManager: continue # Don't know about it and wasn't cached. - self.missing_locals.add(block["ID"]) + missing_locals.add(block["ID"]) + self.missing_locals.update(missing_locals) + self._handle_object_update_cached_misses(missing_locals) packet.meta["ObjectUpdateIDs"] = tuple(seen_locals) + def _handle_object_update_cached_misses(self, local_ids: Set[int]): + self.request_objects(local_ids) + def handle_object_update_compressed(self, packet: Message): seen_locals = [] for block in packet['ObjectData']: @@ -453,18 +448,6 @@ class ObjectManager: LOG.debug(f"Received {packet.name} for unknown {block['ObjectID']}") packet.meta["ObjectUpdateIDs"] = tuple(seen_locals) - def _handle_get_object_cost(self, flow: HippoHTTPFlow): - parsed = llsd.parse_xml(flow.response.content) - if "error" in parsed: - return - for object_id, object_costs in parsed.items(): - obj = self.lookup_fullid(UUID(object_id)) - if not obj: - LOG.debug(f"Received ObjectCost for unknown {object_id}") - continue - obj.ObjectCosts.update(object_costs) - self.run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS) - def _handle_kill_object(self, packet: Message): seen_locals = [] for block in packet["ObjectData"]: @@ -480,7 +463,7 @@ class ObjectManager: # and always use the newest one containing a particular avatar ID? self._coarse_locations.clear() - coarse_locations: typing.Dict[UUID, Vector3] = {} + coarse_locations: Dict[UUID, Vector3] = {} for agent_block, location_block in zip(packet["AgentData"], packet["Location"]): x, y, z = location_block["X"], location_block["Y"], location_block["Z"] coarse_locations[agent_block["AgentID"]] = Vector3( @@ -495,25 +478,6 @@ class ObjectManager: self._coarse_locations.update(coarse_locations) - def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType): - if obj.PCode == PCode.AVATAR and "NameValue" in updated_props: - if obj.NameValue: - self.name_cache.update(obj.FullID, obj.NameValue.to_dict()) - futures = self._object_futures.get((obj.LocalID, update_type), []) - for fut in futures[:]: - fut.set_result(obj) - AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props) - - def load_cache(self): - if not self.use_vo_cache or self.cache_loaded: - return - handle = self._region.handle - if not handle: - LOG.warning(f"Tried to load cache for {self._region} without a handle") - return - self.cache_loaded = True - self.object_cache = RegionViewerObjectCacheChain.for_region(handle, self._region.cache_id) - def clear(self): for obj in self._localid_lookup.values(): self._world_objects.handle_object_gone(obj) @@ -525,24 +489,55 @@ class ObjectManager: for fut in tuple(itertools.chain(*self._object_futures.values())): fut.cancel() self._object_futures.clear() - self.object_cache = RegionViewerObjectCacheChain([]) - self.cache_loaded = False - def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]])\ + # noinspection PyUnusedLocal + def _is_localid_selected(self, local_id: int): + return False + + def _process_get_object_cost_response(self, parsed: dict): + if "error" in parsed: + return + for object_id, object_costs in parsed.items(): + obj = self.lookup_fullid(UUID(object_id)) + if not obj: + LOG.debug(f"Received ObjectCost for unknown {object_id}") + continue + obj.ObjectCosts.update(object_costs) + self.run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS) + + def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType): + futures = self._object_futures.get((obj.LocalID, update_type), []) + for fut in futures[:]: + fut.set_result(obj) + if obj.PCode == PCode.AVATAR and "NameValue" in updated_props: + if obj.NameValue: + self.name_cache.update(obj.FullID, obj.NameValue.to_dict()) + + def run_kill_object_hooks(self, obj: Object): + pass + + def _cancel_futures(self, local_id: int): + # Object went away, so need to kill any pending futures. + for fut_key, futs in self._object_futures.items(): + if fut_key[0] == local_id: + for fut in futs: + fut.cancel() + break + + def request_object_properties(self, objects: Union[OBJECT_OR_LOCAL, Sequence[OBJECT_OR_LOCAL]])\ -> List[asyncio.Future[Object]]: if isinstance(objects, (Object, int)): objects = (objects,) if not objects: return [] - session = self._region.session() - local_ids = tuple((o.LocalID if isinstance(o, Object) else o) for o in objects) # Don't mess with already selected objects - unselected_ids = tuple(local for local in local_ids if local not in session.selected.object_locals) + unselected_ids = tuple(local for local in local_ids if not self._is_localid_selected(local)) ids_to_req = unselected_ids + session = self._region.session() while ids_to_req: blocks = [ Block("AgentData", AgentID=session.agent_id, SessionID=session.id), @@ -572,7 +567,7 @@ class ObjectManager: def request_missing_objects(self) -> List[asyncio.Future[Object]]: return self.request_objects(self.missing_locals) - def request_objects(self, local_ids) -> List[asyncio.Future[Object]]: + def request_objects(self, local_ids: Union[int, Iterable[int]]) -> List[asyncio.Future[Object]]: """ Request object local IDs, returning a list of awaitable handles for the objects @@ -605,10 +600,10 @@ class ObjectManager: return futures -class WorldObjectManager: +class ClientWorldObjectManager: """Manages Objects for a session's whole world""" - def __init__(self, session: Session): - self._session: Session = proxify(session) + def __init__(self, session: BaseClientSession): + self._session: BaseClientSession = session self._fullid_lookup: Dict[UUID, Object] = {} message_handler = self._session.message_handler message_handler.subscribe("ObjectUpdate", self._handle_object_update) @@ -619,52 +614,19 @@ class WorldObjectManager: message_handler.subscribe("ObjectUpdateCached", self._handle_object_update_cached) - def _wrap_region_update_handler(self, handler: Callable, message: Message): - """ - Dispatch an ObjectUpdate to a region's handler based on RegionHandle - - Indra doesn't care what region actually sent the message, just what - region handle is in the message, so we need a global message handler - plus dispatch. - """ - region = self._session.region_by_handle(message["RegionData"]["RegionHandle"]) - if not region: - return - return handler(region.objects, message) - - def _handle_object_update(self, message: Message): - self._wrap_region_update_handler(ObjectManager.handle_object_update, message) - - def _handle_terse_object_update(self, message: Message): - self._wrap_region_update_handler(ObjectManager.handle_terse_object_update, message) - - def _handle_object_update_compressed(self, message: Message): - self._wrap_region_update_handler(ObjectManager.handle_object_update_compressed, message) - - def _handle_object_update_cached(self, message: Message): - self._wrap_region_update_handler(ObjectManager.handle_object_update_cached, message) - - def handle_new_object(self, obj: Object): - """Called by a region's ObjectManager when a new Object is tracked""" - self._fullid_lookup[obj.FullID] = obj - - def handle_object_gone(self, obj: Object): - """Called by a region's ObjectManager on KillObject or region going away""" - self._fullid_lookup.pop(obj.FullID, None) - def lookup_fullid(self, full_id: UUID) -> Optional[Object]: return self._fullid_lookup.get(full_id, None) - def lookup_avatar(self, full_id: UUID) -> Optional[Avatar]: - return {a.FullID: a for a in self.all_avatars}.get(full_id, None) - - def __len__(self): - return len(self._fullid_lookup) - @property def all_objects(self) -> Iterable[Object]: return self._fullid_lookup.values() + # TODO: all Avatars should be owned by the WorldObjectManager to deal with the + # possibility of duplicate avatars in different regions due to CoarseLocationUpdate. + # would also allow us to have persistent Avatar instances. + def lookup_avatar(self, full_id: UUID) -> Optional[Avatar]: + return {a.FullID: a for a in self.all_avatars}.get(full_id, None) + @property def all_avatars(self) -> Iterable[Avatar]: chained = itertools.chain(*(r.objects.all_avatars for r in self._session.regions)) @@ -673,13 +635,51 @@ class WorldObjectManager: vals = {a.FullID: a for a in sorted(chained, key=lambda x: x.Object is not None)} return vals.values() + def __len__(self): + return len(self._fullid_lookup) + + def _wrap_region_update_handler(self, handler: Callable, message: Message): + """ + Dispatch an ObjectUpdate to a region's handler based on RegionHandle + + Indra doesn't care what region actually sent the message, just what + region handle is in the message, so we need a global message handler + plus dispatch. + """ + handle = message["RegionData"]["RegionHandle"] + region = self._session.region_by_handle(handle) + if not region: + LOG.warning(f"Got {message.name} for unknown region {handle}") + return + return handler(region.objects, message) + + def _handle_object_update(self, message: Message): + self._wrap_region_update_handler(ClientObjectManager.handle_object_update, message) + + def _handle_terse_object_update(self, message: Message): + self._wrap_region_update_handler(ClientObjectManager.handle_terse_object_update, message) + + def _handle_object_update_compressed(self, message: Message): + self._wrap_region_update_handler(ClientObjectManager.handle_object_update_compressed, message) + + def _handle_object_update_cached(self, message: Message): + self._wrap_region_update_handler(ClientObjectManager.handle_object_update_cached, message) + + def handle_new_object(self, obj: Object): + """Called by a region's ObjectManager when a new Object is tracked""" + self._fullid_lookup[obj.FullID] = obj + + def handle_object_gone(self, obj: Object): + """Called by a region's ObjectManager on KillObject or region going away""" + self._fullid_lookup.pop(obj.FullID, None) + def request_missing_objects(self) -> List[asyncio.Future[Object]]: futs = [] for region in self._session.regions: futs.extend(region.objects.request_missing_objects()) return futs - def request_object_properties(self, objects: typing.Union[Object, typing.Sequence[Object]]) \ + def request_object_properties(self, objects: Union[Object, Sequence[Object]]) \ -> List[asyncio.Future[Object]]: # Doesn't accept local ID unlike ObjectManager because they're ambiguous here. if isinstance(objects, Object): @@ -710,3 +710,6 @@ class WorldObjectManager: if obj.Parent is None: await asyncio.wait_for(region.objects.request_objects(obj.ParentID)[0], wait_time) obj = obj.Parent + + def clear(self): + self._fullid_lookup.clear() diff --git a/hippolyzer/lib/client/state.py b/hippolyzer/lib/client/state.py new file mode 100644 index 0000000..76a6998 --- /dev/null +++ b/hippolyzer/lib/client/state.py @@ -0,0 +1,34 @@ +""" +Base classes for common session-related state shared between clients and proxies +""" +from __future__ import annotations + +import abc +from typing import * + +from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.message.circuit import ConnectionHolder +from hippolyzer.lib.base.message.message import Message +from hippolyzer.lib.base.message.message_handler import MessageHandler + +if TYPE_CHECKING: + from hippolyzer.lib.client.object_manager import ClientObjectManager, ClientWorldObjectManager + + +class BaseClientRegion(ConnectionHolder, abc.ABC): + """Represents a client's view of a remote region""" + # Actually a weakref + handle: Optional[int] + session: Callable[[], BaseClientSession] + objects: ClientObjectManager + + +class BaseClientSession(abc.ABC): + """Represents a client's view of a remote session""" + id: UUID + agent_id: UUID + secure_session_id: UUID + message_handler: MessageHandler[Message] + regions: Sequence[BaseClientRegion] + region_by_handle: Callable[[int], Optional[BaseClientRegion]] + objects: ClientWorldObjectManager diff --git a/hippolyzer/lib/proxy/caps_client.py b/hippolyzer/lib/proxy/caps_client.py index 91fda36..bd70cae 100644 --- a/hippolyzer/lib/proxy/caps_client.py +++ b/hippolyzer/lib/proxy/caps_client.py @@ -5,10 +5,22 @@ import re import sys from typing import * -from hippolyzer.lib.base.network.caps_client import CapsClient +from hippolyzer.lib.base.network.caps_client import CapsClient, CAPS_DICT + +if TYPE_CHECKING: + from hippolyzer.lib.proxy.region import ProxiedRegion class ProxyCapsClient(CapsClient): + def __init__(self, region: Optional[ProxiedRegion] = None): + super().__init__(None) + self._region = region + + def _get_caps(self) -> Optional[CAPS_DICT]: + if not self._region: + return None + return self._region.caps + def _request_fixups(self, cap_or_url: str, headers: Dict, proxy: Optional[bool], ssl: Any): # We want to proxy this through Hippolyzer if proxy is None: diff --git a/hippolyzer/lib/proxy/namecache.py b/hippolyzer/lib/proxy/namecache.py index 55f69d3..b284168 100644 --- a/hippolyzer/lib/proxy/namecache.py +++ b/hippolyzer/lib/proxy/namecache.py @@ -1,52 +1,28 @@ from __future__ import annotations -import dataclasses import logging from typing import * from hippolyzer.lib.base import llsd from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.message.message import Message from hippolyzer.lib.base.message.message_handler import MessageHandler +from hippolyzer.lib.client.namecache import NameCache from hippolyzer.lib.proxy.viewer_settings import iter_viewer_cache_dirs if TYPE_CHECKING: from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow - from hippolyzer.lib.base.message.message import Message -@dataclasses.dataclass -class NameCacheEntry: - first_name: Optional[str] = None - last_name: Optional[str] = None - display_name: Optional[str] = None - - def __str__(self): - if self.display_name: - return f"{self.display_name} ({self.legacy_name})" - return self.legacy_name - - @property - def legacy_name(self): - return f"{self.first_name} {self.last_name}" - - @property - def preferred_name(self): - if self.display_name: - return self.display_name - return self.legacy_name - - -class NameCache: - def __init__(self): - self._cache: Dict[UUID, NameCacheEntry] = {} - +class ProxyNameCache(NameCache): def create_subscriptions( self, message_handler: MessageHandler[Message], - http_message_handler: MessageHandler[HippoHTTPFlow], + http_message_handler: Optional[MessageHandler[HippoHTTPFlow]] = None, ): - message_handler.subscribe("UUIDNameReply", self._handle_uuid_name_reply) - http_message_handler.subscribe("GetDisplayNames", self._handle_get_display_names) + super().create_subscriptions(message_handler) + if http_message_handler is not None: + http_message_handler.subscribe("GetDisplayNames", self._handle_get_display_names) def load_viewer_caches(self): for cache_dir in iter_viewer_cache_dirs(): @@ -69,38 +45,7 @@ class NameCache: except: logging.exception(f"Failed to load namecache from {cache_dir}") - def lookup(self, uuid: UUID) -> Optional[NameCacheEntry]: - return self._cache.get(uuid) - - def update(self, uuid: UUID, vals: dict): - # upsert the cache entry - entry = self._cache.get(uuid) or NameCacheEntry() - if "FirstName" in vals: - entry.first_name = vals["FirstName"] - if "LastName" in vals: - entry.last_name = vals["LastName"] - if "DisplayName" in vals: - entry.display_name = vals["DisplayName"] if vals["DisplayName"] else None - self._cache[uuid] = entry - - def _handle_uuid_name_reply(self, msg: Message): - for block in msg.blocks["UUIDNameBlock"]: - self.update(block["ID"], { - "FirstName": block["FirstName"], - "LastName": block["LastName"], - }) - def _handle_get_display_names(self, flow: HippoHTTPFlow): if flow.response.status_code != 200: return - parsed = llsd.parse_xml(flow.response.content) - for agent in parsed["agents"]: - # Don't set display name if they just have the default - display_name = None - if not agent["is_display_name_default"]: - display_name = agent["display_name"] - self.update(agent["id"], { - "FirstName": agent["legacy_first_name"], - "LastName": agent["legacy_last_name"], - "DisplayName": display_name, - }) + self._process_display_names_response(llsd.parse_xml(flow.response.content)) diff --git a/hippolyzer/lib/proxy/object_manager.py b/hippolyzer/lib/proxy/object_manager.py new file mode 100644 index 0000000..a5be028 --- /dev/null +++ b/hippolyzer/lib/proxy/object_manager.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +from typing import * + +from hippolyzer.lib.base import llsd +from hippolyzer.lib.client.object_manager import ( + ClientObjectManager, + UpdateType, +) + +from hippolyzer.lib.base.objects import Object +from hippolyzer.lib.proxy.addons import AddonManager +from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow +from hippolyzer.lib.proxy.namecache import ProxyNameCache +from hippolyzer.lib.proxy.vocache import RegionViewerObjectCacheChain + +if TYPE_CHECKING: + from hippolyzer.lib.proxy.region import ProxiedRegion + +LOG = logging.getLogger(__name__) + + +class ProxyObjectManager(ClientObjectManager): + """ + Object manager for a specific region + """ + _region: ProxiedRegion + + def __init__( + self, + region: ProxiedRegion, + name_cache: Optional[ProxyNameCache] = None, + use_vo_cache: bool = False + ): + super().__init__(region, name_cache) + self.use_vo_cache = use_vo_cache + self.cache_loaded = False + self.object_cache = RegionViewerObjectCacheChain([]) + + region.http_message_handler.subscribe("GetObjectCost", + self._handle_get_object_cost) + + def _handle_get_object_cost(self, flow: HippoHTTPFlow): + parsed = llsd.parse_xml(flow.response.content) + self._process_get_object_cost_response(parsed) + + def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType): + super().run_object_update_hooks(obj, updated_props, update_type) + AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props) + + def run_kill_object_hooks(self, obj: Object): + AddonManager.handle_object_killed(self._region.session(), self._region, obj) + + def _lookup_cache_entry(self, local_id: int, crc: int) -> Optional[bytes]: + return self.object_cache.lookup_object_data(local_id, crc) + + def load_cache(self): + if not self.use_vo_cache or self.cache_loaded: + return + handle = self._region.handle + if not handle: + LOG.warning(f"Tried to load cache for {self._region} without a handle") + return + self.cache_loaded = True + self.object_cache = RegionViewerObjectCacheChain.for_region(handle, self._region.cache_id) + + def _handle_object_update_cached_misses(self, local_ids: Set[int]): + # Don't do anything automatically. People have to manually ask for + # missed objects to be fetched. + pass + + def clear(self): + super().clear() + self.object_cache = RegionViewerObjectCacheChain([]) + self.cache_loaded = False + + def _is_localid_selected(self, localid: int): + return localid in self._region.session().selected.object_locals diff --git a/hippolyzer/lib/proxy/region.py b/hippolyzer/lib/proxy/region.py index c589754..ab9e796 100644 --- a/hippolyzer/lib/proxy/region.py +++ b/hippolyzer/lib/proxy/region.py @@ -11,12 +11,14 @@ import urllib.parse import multidict from hippolyzer.lib.base.datatypes import Vector3, UUID +from hippolyzer.lib.base.helpers import proxify from hippolyzer.lib.base.message.message import Message from hippolyzer.lib.base.message.message_handler import MessageHandler from hippolyzer.lib.base.objects import handle_to_global_pos +from hippolyzer.lib.client.state import BaseClientRegion from hippolyzer.lib.proxy.caps_client import ProxyCapsClient from hippolyzer.lib.proxy.circuit import ProxiedCircuit -from hippolyzer.lib.proxy.objects import ObjectManager +from hippolyzer.lib.proxy.object_manager import ProxyObjectManager from hippolyzer.lib.base.transfer_manager import TransferManager from hippolyzer.lib.base.xfer_manager import XferManager @@ -33,6 +35,10 @@ class CapType(enum.Enum): class CapsMultiDict(multidict.MultiDict[Tuple[CapType, str]]): + # TODO: Make a view object for this that's just name -> URL + # deriving from MultiMapping[_T] so we don't have to do + # so many copies for consumers that aren't expecting the + # CapType tag. def add(self, key, value) -> None: # Prepend rather than append when adding caps. # Necessary so the most recent for a region URI is returned @@ -42,7 +48,7 @@ class CapsMultiDict(multidict.MultiDict[Tuple[CapType, str]]): super().add(key, val) -class ProxiedRegion: +class ProxiedRegion(BaseClientRegion): def __init__(self, circuit_addr, seed_cap: str, session, handle=None): # A client may make a Seed request twice, and may get back two (valid!) sets of # Cap URIs. We need to be able to look up both, so MultiDict is necessary. @@ -56,27 +62,17 @@ class ProxiedRegion: self._caps_url_lookup: Dict[str, Tuple[CapType, str]] = {} if seed_cap: self._caps["Seed"] = (CapType.NORMAL, seed_cap) - self.session: Optional[Callable[[], Session]] = weakref.ref(session) + self.session: Callable[[], Session] = weakref.ref(session) self.message_handler: MessageHandler[Message] = MessageHandler() self.http_message_handler: MessageHandler[HippoHTTPFlow] = MessageHandler() self.eq_manager = EventQueueManager(self) - self.caps_client = ProxyCapsClient(self._caps) - self.objects = ObjectManager(self, use_vo_cache=True) + self.caps_client = ProxyCapsClient(proxify(self)) + name_cache = session.session_manager.name_cache + self.objects: ProxyObjectManager = ProxyObjectManager(self, name_cache, use_vo_cache=True) + self.xfer_manager = XferManager(proxify(self), self.session().secure_session_id) + self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id) self._recalc_caps() - @property - def xfer_manager(self) -> XferManager: - return XferManager(self.message_handler, self.circuit, self.session().secure_session_id) - - @property - def transfer_manager(self) -> TransferManager: - return TransferManager( - self.message_handler, - self.circuit, - self.session().agent_id, - self.session().session_id, - ) - @property def name(self): if self._name: diff --git a/hippolyzer/lib/proxy/sessions.py b/hippolyzer/lib/proxy/sessions.py index ec6d08f..f32ecd3 100644 --- a/hippolyzer/lib/proxy/sessions.py +++ b/hippolyzer/lib/proxy/sessions.py @@ -10,21 +10,22 @@ from typing import * from weakref import ref from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.message.message import Message from hippolyzer.lib.base.message.message_handler import MessageHandler +from hippolyzer.lib.client.object_manager import ClientWorldObjectManager +from hippolyzer.lib.client.state import BaseClientSession from hippolyzer.lib.proxy.circuit import ProxiedCircuit from hippolyzer.lib.proxy.http_asset_repo import HTTPAssetRepo from hippolyzer.lib.proxy.http_proxy import HTTPFlowContext, is_asset_server_cap_name, SerializedCapData -from hippolyzer.lib.proxy.namecache import NameCache -from hippolyzer.lib.proxy.objects import WorldObjectManager +from hippolyzer.lib.proxy.namecache import ProxyNameCache from hippolyzer.lib.proxy.region import ProxiedRegion, CapType if TYPE_CHECKING: from hippolyzer.lib.proxy.message_logger import BaseMessageLogger from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow - from hippolyzer.lib.base.message.message import Message -class Session: +class Session(BaseClientSession): def __init__(self, session_id, secure_session_id, agent_id, circuit_code, login_data=None, session_manager=None): self.login_data = login_data or {} @@ -42,7 +43,7 @@ class Session: self.started_at = datetime.datetime.now() self.message_handler: MessageHandler[Message] = MessageHandler() self.http_message_handler: MessageHandler[HippoHTTPFlow] = MessageHandler() - self.objects = WorldObjectManager(self) + self.objects = ClientWorldObjectManager(self) self._main_region = None @property @@ -174,7 +175,7 @@ class SessionManager: self.asset_repo = HTTPAssetRepo() self.message_logger: Optional[BaseMessageLogger] = None self.addon_ctx: Dict[str, Any] = {} - self.name_cache = NameCache() + self.name_cache = ProxyNameCache() self.use_viewer_object_cache: bool = False def create_session(self, login_data) -> Session: @@ -197,6 +198,7 @@ class SessionManager: def close_session(self, session: Session): logging.info("Closed %r" % session) + session.objects.clear() self.sessions.remove(session) def resolve_cap(self, url: str) -> Optional["CapData"]: diff --git a/tests/base/test_xfer_transfer.py b/tests/base/test_xfer_transfer.py index a33a3cc..6574500 100644 --- a/tests/base/test_xfer_transfer.py +++ b/tests/base/test_xfer_transfer.py @@ -6,6 +6,7 @@ from typing import * from hippolyzer.lib.base.datatypes import UUID from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.message.message_handler import MessageHandler +from hippolyzer.lib.base.message.circuit import ConnectionHolder from hippolyzer.lib.base.templates import ( AssetType, EstateAssetType, @@ -30,6 +31,12 @@ class MockHandlingCircuit(ProxiedCircuit): asyncio.get_event_loop().call_soon(self.handler.handle, message) +class MockConnectionHolder(ConnectionHolder): + def __init__(self, circuit, message_handler): + self.circuit = circuit + self.message_handler = message_handler + + class BaseTransferTests(unittest.IsolatedAsyncioTestCase): SMALL_PAYLOAD = b"foobar" LARGE_PAYLOAD = b"foobar" * 500 @@ -41,13 +48,15 @@ class BaseTransferTests(unittest.IsolatedAsyncioTestCase): # and vice-versa self.client_circuit = MockHandlingCircuit(self.server_message_handler) self.server_circuit = MockHandlingCircuit(self.client_message_handler) + self.server_connection = MockConnectionHolder(self.server_circuit, self.server_message_handler) + self.client_connection = MockConnectionHolder(self.client_circuit, self.client_message_handler) class XferManagerTests(BaseTransferTests): def setUp(self) -> None: super().setUp() self.secure_session_id = UUID.random() - self.xfer_manager = XferManager(self.client_message_handler, self.client_circuit, self.secure_session_id) + self.xfer_manager = XferManager(self.client_connection, self.secure_session_id) self.received_bytes: Optional[bytes] = None async def _handle_vfile_upload(self): @@ -58,7 +67,7 @@ class XferManagerTests(BaseTransferTests): if asset_block["AssetData"]: self.received_bytes = asset_block["AssetData"] else: - manager = XferManager(self.server_message_handler, self.server_circuit) + manager = XferManager(self.server_connection) xfer = await manager.request(vfile_id=asset_id, vfile_type=AssetType.BODYPART) self.received_bytes = xfer.reassemble_chunks() self.server_circuit.send_message(Message( @@ -87,8 +96,7 @@ class TestTransferManager(BaseTransferTests): def setUp(self) -> None: super().setUp() self.transfer_manager = TransferManager( - self.client_message_handler, - self.client_circuit, + self.client_connection, UUID.random(), UUID.random(), )