Move tons more things to lib.base and lib.client

Put an abstract session and region implementation in client so things
that could be logically shared between client/proxy can be.

ObjectManager moved to client with proxy-specific details in
ProxyObjectManager.
This commit is contained in:
Salad Dais
2021-06-04 02:50:24 +00:00
parent f2ee6f789f
commit 49f7ba960f
14 changed files with 423 additions and 256 deletions

View File

@@ -1,14 +1,16 @@
from __future__ import annotations
import abc
import datetime as dt
import logging
from typing import *
from typing import Optional
from .message_handler import MessageHandler
from ..network.transport import AbstractUDPTransport, UDPPacket, Direction, ADDR_TUPLE
from .message import Block
from .message import Block, Message
from .msgtypes import PacketFlags
from .udpserializer import UDPMessageSerializer
from .message import Message
class Circuit:
@@ -63,3 +65,16 @@ class Circuit:
def __repr__(self):
return "<%s %r : %r>" % (self.__class__.__name__, self.near_host, self.host)
class ConnectionHolder(abc.ABC):
"""
Any object that has both a circuit and a message handler
Preferred to explicitly passing around a circuit, message handler pair
because generally a ConnectionHolder represents a region or a client.
The same region or client may have multiple different circuits across the
lifetime of a session (due to region restarts, etc.)
"""
circuit: Optional[Circuit]
message_handler: MessageHandler[Message]

View File

@@ -77,7 +77,7 @@ class _HippoSessionRequestContextManager:
CAPS_DICT = Union[
Mapping[str, str],
multidict.MultiDict[Tuple[Any, str]]
multidict.MultiDict[str],
]
@@ -88,6 +88,9 @@ class CapsClient:
def _request_fixups(self, cap_or_url: str, headers: Dict, proxy: Optional[bool], ssl: Any):
return cap_or_url, headers, proxy, ssl
def _get_caps(self) -> Optional[CAPS_DICT]:
return self._caps
def request(self, method: str, cap_or_url: str, *, path: str = "", data: Any = None,
headers: Optional[Dict] = None, session: Optional[aiohttp.ClientSession] = None,
llsd: Any = dataclasses.MISSING, params: Optional[Dict[str, Any]] = None,
@@ -97,14 +100,12 @@ class CapsClient:
if path:
raise ValueError("Specifying both path and a full URL not supported")
else:
if self._caps is None:
caps = self._get_caps()
if caps is None:
raise RuntimeError(f"Need a caps dict to request a Cap like {cap_or_url}")
if cap_or_url not in self._caps:
if cap_or_url not in caps:
raise KeyError(f"{cap_or_url} is not a full URL and not a Cap")
cap_or_url = self._caps[cap_or_url]
# Stupid hack for proxy multidicts that contain a tuple of `(cap_type, cap_url)`
if isinstance(cap_or_url, tuple):
cap_or_url = cap_or_url[-1]
cap_or_url = caps[cap_or_url]
if path:
cap_or_url += path

View File

@@ -8,9 +8,8 @@ import dataclasses
from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.circuit import Circuit
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.message.circuit import ConnectionHolder
from hippolyzer.lib.base.templates import (
TransferRequestParamsBase,
TransferChannelType,
@@ -71,13 +70,11 @@ class Transfer:
class TransferManager:
def __init__(
self,
message_handler: MessageHandler[Message],
circuit: Circuit,
connection_holder: ConnectionHolder,
agent_id: Optional[UUID] = None,
session_id: Optional[UUID] = None,
):
self._message_handler = message_handler
self._circuit = circuit
self._connection_holder = connection_holder
self._agent_id = agent_id
self._session_id = session_id
@@ -97,7 +94,7 @@ class TransferManager:
if params_dict.get("SessionID", dataclasses.MISSING) is None:
params.SessionID = self._session_id
self._circuit.send_message(Message(
self._connection_holder.circuit.send_message(Message(
'TransferRequest',
Block(
'TransferInfo',
@@ -114,7 +111,7 @@ class TransferManager:
async def _pump_transfer_replies(self, transfer: Transfer):
# Subscribe to message related to our transfer while we're in this block
with self._message_handler.subscribe_async(
with self._connection_holder.message_handler.subscribe_async(
_TRANSFER_MESSAGES,
predicate=transfer.is_our_message,
) as get_msg:

View File

@@ -9,12 +9,11 @@ import random
from typing import *
from hippolyzer.lib.base.datatypes import UUID, RawBytes
from hippolyzer.lib.base.message.circuit import Circuit
from hippolyzer.lib.base.message.data_packer import TemplateDataPacker
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.message.msgtypes import MsgType
from hippolyzer.lib.base.network.transport import Direction
from hippolyzer.lib.base.message.circuit import ConnectionHolder
from hippolyzer.lib.base.templates import XferPacket, XferFilePath, AssetType, XferError
_XFER_MESSAGES = {"AbortXfer", "ConfirmXferPacket", "RequestXfer", "SendXferPacket"}
@@ -93,12 +92,10 @@ class UploadStrategy(enum.IntEnum):
class XferManager:
def __init__(
self,
message_handler: MessageHandler[Message],
circuit: Circuit,
connection_holder: ConnectionHolder,
secure_session_id: Optional[UUID] = None,
):
self._message_handler = message_handler
self._circuit = circuit
self._connection_holder = connection_holder
self._secure_session_id = secure_session_id
def request(
@@ -113,7 +110,7 @@ class XferManager:
direction: Direction = Direction.OUT,
) -> Xfer:
xfer_id = xfer_id if xfer_id is not None else random.getrandbits(64)
self._circuit.send_message(Message(
self._connection_holder.circuit.send_message(Message(
'RequestXfer',
Block(
'XferID',
@@ -132,7 +129,7 @@ class XferManager:
return xfer
async def _pump_xfer_replies(self, xfer: Xfer):
with self._message_handler.subscribe_async(
with self._connection_holder.message_handler.subscribe_async(
_XFER_MESSAGES,
predicate=xfer.is_our_message,
) as get_msg:
@@ -177,7 +174,7 @@ class XferManager:
to_ack = range(xfer.next_ackable, ack_max)
xfer.next_ackable = ack_max
for ack_id in to_ack:
self._circuit.send_message(Message(
self._connection_holder.circuit.send_message(Message(
"ConfirmXferPacket",
Block("XferID", ID=xfer.xfer_id, Packet=ack_id),
direction=xfer.direction,
@@ -219,7 +216,7 @@ class XferManager:
else:
inline_data = data
self._circuit.send_message(Message(
self._connection_holder.circuit.send_message(Message(
"AssetUploadRequest",
Block(
"AssetBlock",
@@ -235,7 +232,7 @@ class XferManager:
return fut
async def _pump_asset_upload(self, xfer: Optional[Xfer], transaction_id: UUID, fut: asyncio.Future):
message_handler = self._message_handler
message_handler = self._connection_holder.message_handler
# We'll receive an Xfer request for the asset we're uploading.
# asset ID is determined by hashing secure session ID with chosen transaction ID.
asset_id: UUID = UUID.combine(transaction_id, self._secure_session_id)
@@ -264,7 +261,7 @@ class XferManager:
request_predicate: Callable[[Message], bool],
wait_for_confirm: bool = True
):
message_handler = self._message_handler
message_handler = self._connection_holder.message_handler
request_msg = await message_handler.wait_for(
'RequestXfer', predicate=request_predicate, timeout=5.0)
xfer.xfer_id = request_msg["XferID"]["ID"]
@@ -275,7 +272,7 @@ class XferManager:
chunk = xfer.chunks.pop(packet_id)
# EOF if there are no chunks left
packet_val = XferPacket(PacketID=packet_id, IsEOF=not bool(xfer.chunks))
self._circuit.send_message(Message(
self._connection_holder.circuit.send_message(Message(
"SendXferPacket",
Block("XferID", ID=xfer.xfer_id, Packet_=packet_val),
Block("DataPacket", Data=chunk),

View File

View File

@@ -0,0 +1,78 @@
import dataclasses
from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
@dataclasses.dataclass
class NameCacheEntry:
full_id: UUID
first_name: Optional[str] = None
last_name: Optional[str] = None
display_name: Optional[str] = None
def __str__(self):
if self.display_name:
return f"{self.display_name} ({self.legacy_name})"
if self.legacy_name:
return self.legacy_name
return f"(???) ({self.full_id})"
@property
def legacy_name(self) -> Optional[str]:
if self.first_name is None:
return None
return f"{self.first_name} {self.last_name}"
@property
def preferred_name(self) -> Optional[str]:
if self.display_name:
return self.display_name
return self.legacy_name
class NameCache:
def __init__(self):
self._cache: Dict[UUID, NameCacheEntry] = {}
def create_subscriptions(
self,
message_handler: MessageHandler[Message],
):
message_handler.subscribe("UUIDNameReply", self._handle_uuid_name_reply)
def lookup(self, uuid: UUID) -> Optional[NameCacheEntry]:
return self._cache.get(uuid)
def update(self, full_id: UUID, vals: dict):
# upsert the cache entry
entry = self._cache.get(full_id) or NameCacheEntry(full_id=full_id)
if "FirstName" in vals:
entry.first_name = vals["FirstName"]
if "LastName" in vals:
entry.last_name = vals["LastName"]
if "DisplayName" in vals:
entry.display_name = vals["DisplayName"] if vals["DisplayName"] else None
self._cache[full_id] = entry
def _handle_uuid_name_reply(self, msg: Message):
for block in msg.blocks["UUIDNameBlock"]:
self.update(block["ID"], {
"FirstName": block["FirstName"],
"LastName": block["LastName"],
})
def _process_display_names_response(self, parsed: dict):
"""Handle the response from the GetDisplayNames cap"""
for agent in parsed["agents"]:
# Don't set display name if they just have the default
display_name = None
if not agent["is_display_name_default"]:
display_name = agent["display_name"]
self.update(agent["id"], {
"FirstName": agent["legacy_first_name"],
"LastName": agent["legacy_last_name"],
"DisplayName": display_name,
})

View File

@@ -6,38 +6,30 @@ import enum
import itertools
import logging
import math
import typing
import weakref
from typing import *
from hippolyzer.lib.base import llsd
from hippolyzer.lib.base.datatypes import UUID, Vector3
from hippolyzer.lib.base.helpers import proxify
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.objects import (
handle_to_global_pos,
normalize_object_update,
normalize_terse_object_update,
normalize_object_update_compressed_data,
normalize_object_update_compressed,
Object,
Object, handle_to_global_pos,
)
from hippolyzer.lib.proxy.addons import AddonManager
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
from hippolyzer.lib.proxy.namecache import NameCache, NameCacheEntry
from hippolyzer.lib.client.namecache import NameCache, NameCacheEntry
from hippolyzer.lib.client.state import BaseClientSession, BaseClientRegion
from hippolyzer.lib.base.templates import PCode, ObjectStateSerializer
from hippolyzer.lib.proxy.vocache import RegionViewerObjectCacheChain
if TYPE_CHECKING:
from hippolyzer.lib.proxy.region import ProxiedRegion
from hippolyzer.lib.proxy.sessions import Session
LOG = logging.getLogger(__name__)
class OrphanManager:
def __init__(self):
self._orphans: typing.Dict[int, typing.List[int]] = collections.defaultdict(list)
self._orphans: Dict[int, List[int]] = collections.defaultdict(list)
def clear(self):
return self._orphans.clear()
@@ -55,7 +47,7 @@ class OrphanManager:
del self._orphans[parent_id]
return removed
def collect_orphans(self, parent_localid: int) -> typing.Sequence[int]:
def collect_orphans(self, parent_localid: int) -> Sequence[int]:
return self._orphans.pop(parent_localid, [])
def track_orphan(self, obj: Object):
@@ -67,7 +59,7 @@ class OrphanManager:
self._orphans[parent_id].append(local_id)
OBJECT_OR_LOCAL = typing.Union[Object, int]
OBJECT_OR_LOCAL = Union[Object, int]
class LocationType(enum.IntEnum):
@@ -129,31 +121,27 @@ class Avatar:
return self._resolved_name.preferred_name
class ObjectManager:
class ClientObjectManager:
"""
Object manager for a specific region
"""
def __init__(self, region: ProxiedRegion, use_vo_cache: bool = False):
self._region: ProxiedRegion = proxify(region)
self.use_vo_cache = use_vo_cache
self.cache_loaded: bool = False
self.object_cache: RegionViewerObjectCacheChain = RegionViewerObjectCacheChain([])
self._localid_lookup: typing.Dict[int, Object] = {}
self._fullid_lookup: typing.Dict[UUID, int] = {}
self._coarse_locations: typing.Dict[UUID, Vector3] = {}
self._object_futures: typing.Dict[Tuple[int, int], List[asyncio.Future]] = {}
def __init__(self, region: BaseClientRegion, name_cache: Optional[NameCache]):
self._region: BaseClientRegion = region
self._localid_lookup: Dict[int, Object] = {}
self._fullid_lookup: Dict[UUID, int] = {}
self._coarse_locations: Dict[UUID, Vector3] = {}
self._object_futures: Dict[Tuple[int, int], List[asyncio.Future]] = {}
self._orphan_manager = OrphanManager()
self.name_cache = name_cache or NameCache()
# Objects that we've seen references to but don't have data for
self.missing_locals = set()
self._orphan_manager = OrphanManager()
self._world_objects: WorldObjectManager = region.session().objects
self.name_cache: NameCache = region.session().session_manager.name_cache
self._world_objects: ClientWorldObjectManager = proxify(region.session().objects)
message_handler = region.message_handler
message_handler.subscribe("CoarseLocationUpdate",
self._handle_coarse_location_update)
region.http_message_handler.subscribe("GetObjectCost",
self._handle_get_object_cost)
message_handler.subscribe("KillObject",
self._handle_kill_object)
message_handler.subscribe("ObjectProperties",
@@ -168,6 +156,15 @@ class ObjectManager:
def all_objects(self) -> Iterable[Object]:
return self._localid_lookup.values()
def lookup_localid(self, localid: int) -> Optional[Object]:
return self._localid_lookup.get(localid, None)
def lookup_fullid(self, fullid: UUID) -> Optional[Object]:
local_id = self._fullid_lookup.get(fullid, None)
if local_id is None:
return None
return self.lookup_localid(local_id)
@property
def all_avatars(self) -> Iterable[Avatar]:
av_objects = {o.FullID: o for o in self.all_objects if o.PCode == PCode.AVATAR}
@@ -187,16 +184,7 @@ class ObjectManager:
))
return avatars
def lookup_localid(self, localid: int) -> typing.Optional[Object]:
return self._localid_lookup.get(localid, None)
def lookup_fullid(self, fullid: UUID) -> typing.Optional[Object]:
local_id = self._fullid_lookup.get(fullid, None)
if local_id is None:
return None
return self.lookup_localid(local_id)
def lookup_avatar(self, fullid: UUID) -> typing.Optional[Avatar]:
def lookup_avatar(self, fullid: UUID) -> Optional[Avatar]:
for avatar in self.all_avatars:
if avatar.FullID == fullid:
return avatar
@@ -250,6 +238,12 @@ class ObjectManager:
self.run_object_update_hooks(obj, set(obj.to_dict().keys()), UpdateType.OBJECT_UPDATE)
def track_object(self, obj: Object):
obj_same_localid = self._localid_lookup.get(obj.LocalID)
if obj_same_localid:
LOG.error(f"Clobbering existing object with LocalID {obj.LocalID}! "
f"{obj.to_dict()} clobbered {obj_same_localid.to_dict()}")
# Clear the clobbered object out of the FullID lookup
self._fullid_lookup.pop(obj_same_localid.FullID, None)
self._localid_lookup[obj.LocalID] = obj
self._fullid_lookup[obj.FullID] = obj.LocalID
# If it was missing, it's not missing anymore.
@@ -324,14 +318,6 @@ class ObjectManager:
else:
LOG.debug(f"Changing parent of {obj.LocalID}, but couldn't find old parent")
def _cancel_futures(self, local_id: int):
# Object was killed, so need to kill any pending futures.
for fut_key, futs in self._object_futures.items():
if fut_key[0] == local_id:
for fut in futs:
fut.cancel()
break
def _kill_object_by_local_id(self, local_id: int):
obj = self.lookup_localid(local_id)
self.missing_locals -= {local_id}
@@ -339,7 +325,7 @@ class ObjectManager:
self._cancel_futures(local_id)
if obj:
AddonManager.handle_object_killed(self._region.session(), self._region, obj)
self.run_kill_object_hooks(obj)
child_ids = obj.ChildIDs
else:
LOG.debug(f"Tried to kill unknown object {local_id}")
@@ -397,23 +383,27 @@ class ObjectManager:
packet.meta["ObjectUpdateIDs"] = tuple(seen_locals)
# noinspection PyUnusedLocal
def _lookup_cache_entry(self, local_id: int, crc: int) -> Optional[bytes]:
return None
def handle_object_update_cached(self, packet: Message):
seen_locals = []
missing_locals = set()
for block in packet['ObjectData']:
seen_locals.append(block["ID"])
update_flags = block.deserialize_var("UpdateFlags", make_copy=False)
# Check if we already know about the object
obj = self.lookup_localid(block["ID"])
if obj is not None:
if obj is not None and obj.CRC == block["CRC"]:
self._update_existing_object(obj, {
"UpdateFlags": update_flags,
"RegionHandle": self._region.handle,
}, UpdateType.OBJECT_UPDATE)
continue
# Check if the object is in a viewer's VOCache
cached_obj_data = self.object_cache.lookup_object_data(block["ID"], block["CRC"])
cached_obj_data = self._lookup_cache_entry(block["ID"], block["CRC"])
if cached_obj_data is not None:
cached_obj = normalize_object_update_compressed_data(cached_obj_data)
cached_obj["UpdateFlags"] = update_flags
@@ -422,9 +412,14 @@ class ObjectManager:
continue
# Don't know about it and wasn't cached.
self.missing_locals.add(block["ID"])
missing_locals.add(block["ID"])
self.missing_locals.update(missing_locals)
self._handle_object_update_cached_misses(missing_locals)
packet.meta["ObjectUpdateIDs"] = tuple(seen_locals)
def _handle_object_update_cached_misses(self, local_ids: Set[int]):
self.request_objects(local_ids)
def handle_object_update_compressed(self, packet: Message):
seen_locals = []
for block in packet['ObjectData']:
@@ -453,18 +448,6 @@ class ObjectManager:
LOG.debug(f"Received {packet.name} for unknown {block['ObjectID']}")
packet.meta["ObjectUpdateIDs"] = tuple(seen_locals)
def _handle_get_object_cost(self, flow: HippoHTTPFlow):
parsed = llsd.parse_xml(flow.response.content)
if "error" in parsed:
return
for object_id, object_costs in parsed.items():
obj = self.lookup_fullid(UUID(object_id))
if not obj:
LOG.debug(f"Received ObjectCost for unknown {object_id}")
continue
obj.ObjectCosts.update(object_costs)
self.run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS)
def _handle_kill_object(self, packet: Message):
seen_locals = []
for block in packet["ObjectData"]:
@@ -480,7 +463,7 @@ class ObjectManager:
# and always use the newest one containing a particular avatar ID?
self._coarse_locations.clear()
coarse_locations: typing.Dict[UUID, Vector3] = {}
coarse_locations: Dict[UUID, Vector3] = {}
for agent_block, location_block in zip(packet["AgentData"], packet["Location"]):
x, y, z = location_block["X"], location_block["Y"], location_block["Z"]
coarse_locations[agent_block["AgentID"]] = Vector3(
@@ -495,25 +478,6 @@ class ObjectManager:
self._coarse_locations.update(coarse_locations)
def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType):
if obj.PCode == PCode.AVATAR and "NameValue" in updated_props:
if obj.NameValue:
self.name_cache.update(obj.FullID, obj.NameValue.to_dict())
futures = self._object_futures.get((obj.LocalID, update_type), [])
for fut in futures[:]:
fut.set_result(obj)
AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props)
def load_cache(self):
if not self.use_vo_cache or self.cache_loaded:
return
handle = self._region.handle
if not handle:
LOG.warning(f"Tried to load cache for {self._region} without a handle")
return
self.cache_loaded = True
self.object_cache = RegionViewerObjectCacheChain.for_region(handle, self._region.cache_id)
def clear(self):
for obj in self._localid_lookup.values():
self._world_objects.handle_object_gone(obj)
@@ -525,24 +489,55 @@ class ObjectManager:
for fut in tuple(itertools.chain(*self._object_futures.values())):
fut.cancel()
self._object_futures.clear()
self.object_cache = RegionViewerObjectCacheChain([])
self.cache_loaded = False
def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]])\
# noinspection PyUnusedLocal
def _is_localid_selected(self, local_id: int):
return False
def _process_get_object_cost_response(self, parsed: dict):
if "error" in parsed:
return
for object_id, object_costs in parsed.items():
obj = self.lookup_fullid(UUID(object_id))
if not obj:
LOG.debug(f"Received ObjectCost for unknown {object_id}")
continue
obj.ObjectCosts.update(object_costs)
self.run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS)
def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType):
futures = self._object_futures.get((obj.LocalID, update_type), [])
for fut in futures[:]:
fut.set_result(obj)
if obj.PCode == PCode.AVATAR and "NameValue" in updated_props:
if obj.NameValue:
self.name_cache.update(obj.FullID, obj.NameValue.to_dict())
def run_kill_object_hooks(self, obj: Object):
pass
def _cancel_futures(self, local_id: int):
# Object went away, so need to kill any pending futures.
for fut_key, futs in self._object_futures.items():
if fut_key[0] == local_id:
for fut in futs:
fut.cancel()
break
def request_object_properties(self, objects: Union[OBJECT_OR_LOCAL, Sequence[OBJECT_OR_LOCAL]])\
-> List[asyncio.Future[Object]]:
if isinstance(objects, (Object, int)):
objects = (objects,)
if not objects:
return []
session = self._region.session()
local_ids = tuple((o.LocalID if isinstance(o, Object) else o) for o in objects)
# Don't mess with already selected objects
unselected_ids = tuple(local for local in local_ids if local not in session.selected.object_locals)
unselected_ids = tuple(local for local in local_ids if not self._is_localid_selected(local))
ids_to_req = unselected_ids
session = self._region.session()
while ids_to_req:
blocks = [
Block("AgentData", AgentID=session.agent_id, SessionID=session.id),
@@ -572,7 +567,7 @@ class ObjectManager:
def request_missing_objects(self) -> List[asyncio.Future[Object]]:
return self.request_objects(self.missing_locals)
def request_objects(self, local_ids) -> List[asyncio.Future[Object]]:
def request_objects(self, local_ids: Union[int, Iterable[int]]) -> List[asyncio.Future[Object]]:
"""
Request object local IDs, returning a list of awaitable handles for the objects
@@ -605,10 +600,10 @@ class ObjectManager:
return futures
class WorldObjectManager:
class ClientWorldObjectManager:
"""Manages Objects for a session's whole world"""
def __init__(self, session: Session):
self._session: Session = proxify(session)
def __init__(self, session: BaseClientSession):
self._session: BaseClientSession = session
self._fullid_lookup: Dict[UUID, Object] = {}
message_handler = self._session.message_handler
message_handler.subscribe("ObjectUpdate", self._handle_object_update)
@@ -619,52 +614,19 @@ class WorldObjectManager:
message_handler.subscribe("ObjectUpdateCached",
self._handle_object_update_cached)
def _wrap_region_update_handler(self, handler: Callable, message: Message):
"""
Dispatch an ObjectUpdate to a region's handler based on RegionHandle
Indra doesn't care what region actually sent the message, just what
region handle is in the message, so we need a global message handler
plus dispatch.
"""
region = self._session.region_by_handle(message["RegionData"]["RegionHandle"])
if not region:
return
return handler(region.objects, message)
def _handle_object_update(self, message: Message):
self._wrap_region_update_handler(ObjectManager.handle_object_update, message)
def _handle_terse_object_update(self, message: Message):
self._wrap_region_update_handler(ObjectManager.handle_terse_object_update, message)
def _handle_object_update_compressed(self, message: Message):
self._wrap_region_update_handler(ObjectManager.handle_object_update_compressed, message)
def _handle_object_update_cached(self, message: Message):
self._wrap_region_update_handler(ObjectManager.handle_object_update_cached, message)
def handle_new_object(self, obj: Object):
"""Called by a region's ObjectManager when a new Object is tracked"""
self._fullid_lookup[obj.FullID] = obj
def handle_object_gone(self, obj: Object):
"""Called by a region's ObjectManager on KillObject or region going away"""
self._fullid_lookup.pop(obj.FullID, None)
def lookup_fullid(self, full_id: UUID) -> Optional[Object]:
return self._fullid_lookup.get(full_id, None)
def lookup_avatar(self, full_id: UUID) -> Optional[Avatar]:
return {a.FullID: a for a in self.all_avatars}.get(full_id, None)
def __len__(self):
return len(self._fullid_lookup)
@property
def all_objects(self) -> Iterable[Object]:
return self._fullid_lookup.values()
# TODO: all Avatars should be owned by the WorldObjectManager to deal with the
# possibility of duplicate avatars in different regions due to CoarseLocationUpdate.
# would also allow us to have persistent Avatar instances.
def lookup_avatar(self, full_id: UUID) -> Optional[Avatar]:
return {a.FullID: a for a in self.all_avatars}.get(full_id, None)
@property
def all_avatars(self) -> Iterable[Avatar]:
chained = itertools.chain(*(r.objects.all_avatars for r in self._session.regions))
@@ -673,13 +635,51 @@ class WorldObjectManager:
vals = {a.FullID: a for a in sorted(chained, key=lambda x: x.Object is not None)}
return vals.values()
def __len__(self):
return len(self._fullid_lookup)
def _wrap_region_update_handler(self, handler: Callable, message: Message):
"""
Dispatch an ObjectUpdate to a region's handler based on RegionHandle
Indra doesn't care what region actually sent the message, just what
region handle is in the message, so we need a global message handler
plus dispatch.
"""
handle = message["RegionData"]["RegionHandle"]
region = self._session.region_by_handle(handle)
if not region:
LOG.warning(f"Got {message.name} for unknown region {handle}")
return
return handler(region.objects, message)
def _handle_object_update(self, message: Message):
self._wrap_region_update_handler(ClientObjectManager.handle_object_update, message)
def _handle_terse_object_update(self, message: Message):
self._wrap_region_update_handler(ClientObjectManager.handle_terse_object_update, message)
def _handle_object_update_compressed(self, message: Message):
self._wrap_region_update_handler(ClientObjectManager.handle_object_update_compressed, message)
def _handle_object_update_cached(self, message: Message):
self._wrap_region_update_handler(ClientObjectManager.handle_object_update_cached, message)
def handle_new_object(self, obj: Object):
"""Called by a region's ObjectManager when a new Object is tracked"""
self._fullid_lookup[obj.FullID] = obj
def handle_object_gone(self, obj: Object):
"""Called by a region's ObjectManager on KillObject or region going away"""
self._fullid_lookup.pop(obj.FullID, None)
def request_missing_objects(self) -> List[asyncio.Future[Object]]:
futs = []
for region in self._session.regions:
futs.extend(region.objects.request_missing_objects())
return futs
def request_object_properties(self, objects: typing.Union[Object, typing.Sequence[Object]]) \
def request_object_properties(self, objects: Union[Object, Sequence[Object]]) \
-> List[asyncio.Future[Object]]:
# Doesn't accept local ID unlike ObjectManager because they're ambiguous here.
if isinstance(objects, Object):
@@ -710,3 +710,6 @@ class WorldObjectManager:
if obj.Parent is None:
await asyncio.wait_for(region.objects.request_objects(obj.ParentID)[0], wait_time)
obj = obj.Parent
def clear(self):
self._fullid_lookup.clear()

View File

@@ -0,0 +1,34 @@
"""
Base classes for common session-related state shared between clients and proxies
"""
from __future__ import annotations
import abc
from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.circuit import ConnectionHolder
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
if TYPE_CHECKING:
from hippolyzer.lib.client.object_manager import ClientObjectManager, ClientWorldObjectManager
class BaseClientRegion(ConnectionHolder, abc.ABC):
"""Represents a client's view of a remote region"""
# Actually a weakref
handle: Optional[int]
session: Callable[[], BaseClientSession]
objects: ClientObjectManager
class BaseClientSession(abc.ABC):
"""Represents a client's view of a remote session"""
id: UUID
agent_id: UUID
secure_session_id: UUID
message_handler: MessageHandler[Message]
regions: Sequence[BaseClientRegion]
region_by_handle: Callable[[int], Optional[BaseClientRegion]]
objects: ClientWorldObjectManager

View File

@@ -5,10 +5,22 @@ import re
import sys
from typing import *
from hippolyzer.lib.base.network.caps_client import CapsClient
from hippolyzer.lib.base.network.caps_client import CapsClient, CAPS_DICT
if TYPE_CHECKING:
from hippolyzer.lib.proxy.region import ProxiedRegion
class ProxyCapsClient(CapsClient):
def __init__(self, region: Optional[ProxiedRegion] = None):
super().__init__(None)
self._region = region
def _get_caps(self) -> Optional[CAPS_DICT]:
if not self._region:
return None
return self._region.caps
def _request_fixups(self, cap_or_url: str, headers: Dict, proxy: Optional[bool], ssl: Any):
# We want to proxy this through Hippolyzer
if proxy is None:

View File

@@ -1,52 +1,28 @@
from __future__ import annotations
import dataclasses
import logging
from typing import *
from hippolyzer.lib.base import llsd
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.client.namecache import NameCache
from hippolyzer.lib.proxy.viewer_settings import iter_viewer_cache_dirs
if TYPE_CHECKING:
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
from hippolyzer.lib.base.message.message import Message
@dataclasses.dataclass
class NameCacheEntry:
first_name: Optional[str] = None
last_name: Optional[str] = None
display_name: Optional[str] = None
def __str__(self):
if self.display_name:
return f"{self.display_name} ({self.legacy_name})"
return self.legacy_name
@property
def legacy_name(self):
return f"{self.first_name} {self.last_name}"
@property
def preferred_name(self):
if self.display_name:
return self.display_name
return self.legacy_name
class NameCache:
def __init__(self):
self._cache: Dict[UUID, NameCacheEntry] = {}
class ProxyNameCache(NameCache):
def create_subscriptions(
self,
message_handler: MessageHandler[Message],
http_message_handler: MessageHandler[HippoHTTPFlow],
http_message_handler: Optional[MessageHandler[HippoHTTPFlow]] = None,
):
message_handler.subscribe("UUIDNameReply", self._handle_uuid_name_reply)
http_message_handler.subscribe("GetDisplayNames", self._handle_get_display_names)
super().create_subscriptions(message_handler)
if http_message_handler is not None:
http_message_handler.subscribe("GetDisplayNames", self._handle_get_display_names)
def load_viewer_caches(self):
for cache_dir in iter_viewer_cache_dirs():
@@ -69,38 +45,7 @@ class NameCache:
except:
logging.exception(f"Failed to load namecache from {cache_dir}")
def lookup(self, uuid: UUID) -> Optional[NameCacheEntry]:
return self._cache.get(uuid)
def update(self, uuid: UUID, vals: dict):
# upsert the cache entry
entry = self._cache.get(uuid) or NameCacheEntry()
if "FirstName" in vals:
entry.first_name = vals["FirstName"]
if "LastName" in vals:
entry.last_name = vals["LastName"]
if "DisplayName" in vals:
entry.display_name = vals["DisplayName"] if vals["DisplayName"] else None
self._cache[uuid] = entry
def _handle_uuid_name_reply(self, msg: Message):
for block in msg.blocks["UUIDNameBlock"]:
self.update(block["ID"], {
"FirstName": block["FirstName"],
"LastName": block["LastName"],
})
def _handle_get_display_names(self, flow: HippoHTTPFlow):
if flow.response.status_code != 200:
return
parsed = llsd.parse_xml(flow.response.content)
for agent in parsed["agents"]:
# Don't set display name if they just have the default
display_name = None
if not agent["is_display_name_default"]:
display_name = agent["display_name"]
self.update(agent["id"], {
"FirstName": agent["legacy_first_name"],
"LastName": agent["legacy_last_name"],
"DisplayName": display_name,
})
self._process_display_names_response(llsd.parse_xml(flow.response.content))

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import logging
from typing import *
from hippolyzer.lib.base import llsd
from hippolyzer.lib.client.object_manager import (
ClientObjectManager,
UpdateType,
)
from hippolyzer.lib.base.objects import Object
from hippolyzer.lib.proxy.addons import AddonManager
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
from hippolyzer.lib.proxy.namecache import ProxyNameCache
from hippolyzer.lib.proxy.vocache import RegionViewerObjectCacheChain
if TYPE_CHECKING:
from hippolyzer.lib.proxy.region import ProxiedRegion
LOG = logging.getLogger(__name__)
class ProxyObjectManager(ClientObjectManager):
"""
Object manager for a specific region
"""
_region: ProxiedRegion
def __init__(
self,
region: ProxiedRegion,
name_cache: Optional[ProxyNameCache] = None,
use_vo_cache: bool = False
):
super().__init__(region, name_cache)
self.use_vo_cache = use_vo_cache
self.cache_loaded = False
self.object_cache = RegionViewerObjectCacheChain([])
region.http_message_handler.subscribe("GetObjectCost",
self._handle_get_object_cost)
def _handle_get_object_cost(self, flow: HippoHTTPFlow):
parsed = llsd.parse_xml(flow.response.content)
self._process_get_object_cost_response(parsed)
def run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType):
super().run_object_update_hooks(obj, updated_props, update_type)
AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props)
def run_kill_object_hooks(self, obj: Object):
AddonManager.handle_object_killed(self._region.session(), self._region, obj)
def _lookup_cache_entry(self, local_id: int, crc: int) -> Optional[bytes]:
return self.object_cache.lookup_object_data(local_id, crc)
def load_cache(self):
if not self.use_vo_cache or self.cache_loaded:
return
handle = self._region.handle
if not handle:
LOG.warning(f"Tried to load cache for {self._region} without a handle")
return
self.cache_loaded = True
self.object_cache = RegionViewerObjectCacheChain.for_region(handle, self._region.cache_id)
def _handle_object_update_cached_misses(self, local_ids: Set[int]):
# Don't do anything automatically. People have to manually ask for
# missed objects to be fetched.
pass
def clear(self):
super().clear()
self.object_cache = RegionViewerObjectCacheChain([])
self.cache_loaded = False
def _is_localid_selected(self, localid: int):
return localid in self._region.session().selected.object_locals

View File

@@ -11,12 +11,14 @@ import urllib.parse
import multidict
from hippolyzer.lib.base.datatypes import Vector3, UUID
from hippolyzer.lib.base.helpers import proxify
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.objects import handle_to_global_pos
from hippolyzer.lib.client.state import BaseClientRegion
from hippolyzer.lib.proxy.caps_client import ProxyCapsClient
from hippolyzer.lib.proxy.circuit import ProxiedCircuit
from hippolyzer.lib.proxy.objects import ObjectManager
from hippolyzer.lib.proxy.object_manager import ProxyObjectManager
from hippolyzer.lib.base.transfer_manager import TransferManager
from hippolyzer.lib.base.xfer_manager import XferManager
@@ -33,6 +35,10 @@ class CapType(enum.Enum):
class CapsMultiDict(multidict.MultiDict[Tuple[CapType, str]]):
# TODO: Make a view object for this that's just name -> URL
# deriving from MultiMapping[_T] so we don't have to do
# so many copies for consumers that aren't expecting the
# CapType tag.
def add(self, key, value) -> None:
# Prepend rather than append when adding caps.
# Necessary so the most recent for a region URI is returned
@@ -42,7 +48,7 @@ class CapsMultiDict(multidict.MultiDict[Tuple[CapType, str]]):
super().add(key, val)
class ProxiedRegion:
class ProxiedRegion(BaseClientRegion):
def __init__(self, circuit_addr, seed_cap: str, session, handle=None):
# A client may make a Seed request twice, and may get back two (valid!) sets of
# Cap URIs. We need to be able to look up both, so MultiDict is necessary.
@@ -56,27 +62,17 @@ class ProxiedRegion:
self._caps_url_lookup: Dict[str, Tuple[CapType, str]] = {}
if seed_cap:
self._caps["Seed"] = (CapType.NORMAL, seed_cap)
self.session: Optional[Callable[[], Session]] = weakref.ref(session)
self.session: Callable[[], Session] = weakref.ref(session)
self.message_handler: MessageHandler[Message] = MessageHandler()
self.http_message_handler: MessageHandler[HippoHTTPFlow] = MessageHandler()
self.eq_manager = EventQueueManager(self)
self.caps_client = ProxyCapsClient(self._caps)
self.objects = ObjectManager(self, use_vo_cache=True)
self.caps_client = ProxyCapsClient(proxify(self))
name_cache = session.session_manager.name_cache
self.objects: ProxyObjectManager = ProxyObjectManager(self, name_cache, use_vo_cache=True)
self.xfer_manager = XferManager(proxify(self), self.session().secure_session_id)
self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id)
self._recalc_caps()
@property
def xfer_manager(self) -> XferManager:
return XferManager(self.message_handler, self.circuit, self.session().secure_session_id)
@property
def transfer_manager(self) -> TransferManager:
return TransferManager(
self.message_handler,
self.circuit,
self.session().agent_id,
self.session().session_id,
)
@property
def name(self):
if self._name:

View File

@@ -10,21 +10,22 @@ from typing import *
from weakref import ref
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.client.object_manager import ClientWorldObjectManager
from hippolyzer.lib.client.state import BaseClientSession
from hippolyzer.lib.proxy.circuit import ProxiedCircuit
from hippolyzer.lib.proxy.http_asset_repo import HTTPAssetRepo
from hippolyzer.lib.proxy.http_proxy import HTTPFlowContext, is_asset_server_cap_name, SerializedCapData
from hippolyzer.lib.proxy.namecache import NameCache
from hippolyzer.lib.proxy.objects import WorldObjectManager
from hippolyzer.lib.proxy.namecache import ProxyNameCache
from hippolyzer.lib.proxy.region import ProxiedRegion, CapType
if TYPE_CHECKING:
from hippolyzer.lib.proxy.message_logger import BaseMessageLogger
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
from hippolyzer.lib.base.message.message import Message
class Session:
class Session(BaseClientSession):
def __init__(self, session_id, secure_session_id, agent_id, circuit_code,
login_data=None, session_manager=None):
self.login_data = login_data or {}
@@ -42,7 +43,7 @@ class Session:
self.started_at = datetime.datetime.now()
self.message_handler: MessageHandler[Message] = MessageHandler()
self.http_message_handler: MessageHandler[HippoHTTPFlow] = MessageHandler()
self.objects = WorldObjectManager(self)
self.objects = ClientWorldObjectManager(self)
self._main_region = None
@property
@@ -174,7 +175,7 @@ class SessionManager:
self.asset_repo = HTTPAssetRepo()
self.message_logger: Optional[BaseMessageLogger] = None
self.addon_ctx: Dict[str, Any] = {}
self.name_cache = NameCache()
self.name_cache = ProxyNameCache()
self.use_viewer_object_cache: bool = False
def create_session(self, login_data) -> Session:
@@ -197,6 +198,7 @@ class SessionManager:
def close_session(self, session: Session):
logging.info("Closed %r" % session)
session.objects.clear()
self.sessions.remove(session)
def resolve_cap(self, url: str) -> Optional["CapData"]:

View File

@@ -6,6 +6,7 @@ from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.message.circuit import ConnectionHolder
from hippolyzer.lib.base.templates import (
AssetType,
EstateAssetType,
@@ -30,6 +31,12 @@ class MockHandlingCircuit(ProxiedCircuit):
asyncio.get_event_loop().call_soon(self.handler.handle, message)
class MockConnectionHolder(ConnectionHolder):
def __init__(self, circuit, message_handler):
self.circuit = circuit
self.message_handler = message_handler
class BaseTransferTests(unittest.IsolatedAsyncioTestCase):
SMALL_PAYLOAD = b"foobar"
LARGE_PAYLOAD = b"foobar" * 500
@@ -41,13 +48,15 @@ class BaseTransferTests(unittest.IsolatedAsyncioTestCase):
# and vice-versa
self.client_circuit = MockHandlingCircuit(self.server_message_handler)
self.server_circuit = MockHandlingCircuit(self.client_message_handler)
self.server_connection = MockConnectionHolder(self.server_circuit, self.server_message_handler)
self.client_connection = MockConnectionHolder(self.client_circuit, self.client_message_handler)
class XferManagerTests(BaseTransferTests):
def setUp(self) -> None:
super().setUp()
self.secure_session_id = UUID.random()
self.xfer_manager = XferManager(self.client_message_handler, self.client_circuit, self.secure_session_id)
self.xfer_manager = XferManager(self.client_connection, self.secure_session_id)
self.received_bytes: Optional[bytes] = None
async def _handle_vfile_upload(self):
@@ -58,7 +67,7 @@ class XferManagerTests(BaseTransferTests):
if asset_block["AssetData"]:
self.received_bytes = asset_block["AssetData"]
else:
manager = XferManager(self.server_message_handler, self.server_circuit)
manager = XferManager(self.server_connection)
xfer = await manager.request(vfile_id=asset_id, vfile_type=AssetType.BODYPART)
self.received_bytes = xfer.reassemble_chunks()
self.server_circuit.send_message(Message(
@@ -87,8 +96,7 @@ class TestTransferManager(BaseTransferTests):
def setUp(self) -> None:
super().setUp()
self.transfer_manager = TransferManager(
self.client_message_handler,
self.client_circuit,
self.client_connection,
UUID.random(),
UUID.random(),
)