diff --git a/hippolyzer/lib/proxy/objects.py b/hippolyzer/lib/proxy/objects.py index 1bb8309..1140284 100644 --- a/hippolyzer/lib/proxy/objects.py +++ b/hippolyzer/lib/proxy/objects.py @@ -1,7 +1,9 @@ from __future__ import annotations +import asyncio import collections import enum +import itertools import logging import math import typing @@ -72,6 +74,13 @@ class LocationType(enum.IntEnum): EXACT = enum.auto() +class UpdateType(enum.IntEnum): + OBJECT_UPDATE = enum.auto() + PROPERTIES = enum.auto() + FAMILY = enum.auto() + COSTS = enum.auto() + + class Avatar: """Wrapper for an avatar known through ObjectUpdate or CoarseLocationUpdate""" def __init__( @@ -131,12 +140,14 @@ class ObjectManager: self._localid_lookup: typing.Dict[int, Object] = {} self._fullid_lookup: typing.Dict[UUID, int] = {} self._coarse_locations: typing.Dict[UUID, Vector3] = {} + self._update_futures: typing.Dict[int, List[asyncio.Future]] = collections.defaultdict(list) + self._property_futures: typing.Dict[int, List[asyncio.Future]] = collections.defaultdict(list) # Objects that we've seen references to but don't have data for self.missing_locals = set() self._orphan_manager = OrphanManager() name_cache = None session = self._region.session() - if session: + if session and session.session_manager: name_cache = session.session_manager.name_cache # Use a local namecache if we don't have a session manager self.name_cache: Optional[NameCache] = name_cache or NameCache() @@ -219,7 +230,7 @@ class ObjectManager: self._parent_object(child_obj) if notify: - self._run_object_update_hooks(obj, set(obj.to_dict().keys())) + self._run_object_update_hooks(obj, set(obj.to_dict().keys()), UpdateType.OBJECT_UPDATE) def _untrack_object(self, obj: Object): former_child_ids = obj.ChildIDs[:] @@ -279,7 +290,7 @@ class ObjectManager: else: LOG.debug(f"Changing parent of {obj.LocalID}, but couldn't find old parent") - def _update_existing_object(self, obj: Object, new_properties): + def _update_existing_object(self, obj: Object, new_properties: dict, update_type: UpdateType): new_parent_id = new_properties.get("ParentID", obj.ParentID) actually_updated_props = set() @@ -310,7 +321,7 @@ class ObjectManager: # Common case where this may be falsy is if we get an ObjectUpdateCached # that didn't have a changed UpdateFlags field. if actually_updated_props: - self._run_object_update_hooks(obj, actually_updated_props) + self._run_object_update_hooks(obj, actually_updated_props, update_type) def _handle_object_update(self, packet: ProxiedMessage): seen_locals = [] @@ -320,7 +331,7 @@ class ObjectManager: seen_locals.append(object_data["LocalID"]) obj = self.lookup_fullid(object_data["FullID"]) if obj: - self._update_existing_object(obj, object_data) + self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE) else: obj = Object(**object_data) self._track_object(obj) @@ -339,7 +350,7 @@ class ObjectManager: seen_locals.append(object_data["LocalID"]) if obj: - self._update_existing_object(obj, object_data) + self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE) else: self.missing_locals.add(object_data["LocalID"]) LOG.debug(f"Received terse update for unknown object {object_data['LocalID']}") @@ -357,7 +368,7 @@ class ObjectManager: if obj is not None: self._update_existing_object(obj, { "UpdateFlags": update_flags, - }) + }, UpdateType.OBJECT_UPDATE) continue # Check if the object is in a viewer's VOCache @@ -379,7 +390,7 @@ class ObjectManager: seen_locals.append(object_data["LocalID"]) obj = self.lookup_localid(object_data["LocalID"]) if obj: - self._update_existing_object(obj, object_data) + self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE) else: obj = Object(**object_data) self._track_object(obj) @@ -395,7 +406,7 @@ class ObjectManager: obj = self.lookup_fullid(block["ObjectID"]) if obj: seen_locals.append(obj.LocalID) - self._update_existing_object(obj, object_properties) + self._update_existing_object(obj, object_properties, UpdateType.PROPERTIES) else: LOG.debug(f"Received {packet.name} for unknown {block['ObjectID']}") packet.meta["ObjectUpdateIDs"] = tuple(seen_locals) @@ -445,7 +456,7 @@ class ObjectManager: LOG.debug(f"Received ObjectCost for unknown {object_id}") continue obj.ObjectCosts.update(object_costs) - self._run_object_update_hooks(obj, {"ObjectCosts"}) + self._run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS) def _handle_coarse_location_update(self, packet: ProxiedMessage): self._coarse_locations.clear() @@ -465,10 +476,20 @@ class ObjectManager: self._coarse_locations.update(coarse_locations) - def _run_object_update_hooks(self, obj: Object, updated_props: Set[str]): + def _run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType): if obj.PCode == PCode.AVATAR and "NameValue" in updated_props: if obj.NameValue: self.name_cache.update(obj.FullID, obj.NameValue.to_dict()) + if update_type == UpdateType.OBJECT_UPDATE: + update_futures = self._update_futures[obj.LocalID] + for fut in update_futures[:]: + fut.set_result(obj) + update_futures.remove(fut) + elif update_type == UpdateType.PROPERTIES: + property_futures = self._property_futures[obj.LocalID] + for fut in property_futures[:]: + fut.set_result(obj) + property_futures.remove(fut) AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props) def load_cache(self): @@ -487,46 +508,81 @@ class ObjectManager: self._coarse_locations.clear() self._orphan_manager.clear() self.missing_locals.clear() + self._clear_futures(self._update_futures) + self._clear_futures(self._property_futures) self.object_cache = RegionViewerObjectCacheChain([]) self.cache_loaded = False - def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]]): + @staticmethod + def _clear_futures(future_dict: dict): + for future in itertools.chain(*future_dict.values()): + future.cancel() + future_dict.clear() + + def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]])\ + -> List[asyncio.Future[Object]]: if isinstance(objects, (Object, int)): objects = (objects,) if not objects: - return + return [] session = self._region.session() local_ids = tuple((o.LocalID if isinstance(o, Object) else o) for o in objects) # Don't mess with already selected objects - local_ids = tuple(local for local in local_ids if local not in session.selected.object_locals) + unselected_ids = tuple(local for local in local_ids if local not in session.selected.object_locals) + ids_to_req = unselected_ids - while local_ids: + while ids_to_req: blocks = [ Block("AgentData", AgentID=session.agent_id, SessionID=session.id), - *[Block("ObjectData", ObjectLocalID=x) for x in local_ids[:100]], + *[Block("ObjectData", ObjectLocalID=x) for x in ids_to_req[:100]], ] # Selecting causes ObjectProperties to be sent self._region.circuit.send_message(ProxiedMessage("ObjectSelect", blocks)) self._region.circuit.send_message(ProxiedMessage("ObjectDeselect", blocks)) - local_ids = local_ids[100:] + ids_to_req = ids_to_req[100:] - def request_missing_objects(self): - self.request_objects(self.missing_locals) + futures = [] + for local_id in local_ids: + fut = asyncio.Future() + if local_id in unselected_ids: + # Need to wait until we get our reply + self._property_futures[local_id].append(fut) + else: + # This was selected so we should already have up to date info + fut.set_result(self.lookup_localid(local_id)) + futures.append(fut) + return futures - def request_objects(self, local_ids): + def request_missing_objects(self) -> List[Awaitable[Object]]: + return self.request_objects(self.missing_locals) + + def request_objects(self, local_ids) -> List[Awaitable[Object]]: + """ + Request object local IDs, returning a list of awaitable handles for the objects + + Some may never be resolved, so use `asyncio.wait()` or `asyncio.wait_for()`. + """ if isinstance(local_ids, int): local_ids = (local_ids,) if isinstance(local_ids, set): local_ids = tuple(local_ids) session = self._region.session() - while local_ids: + ids_to_req = local_ids + while ids_to_req: self._region.circuit.send_message(ProxiedMessage( "RequestMultipleObjects", Block("AgentData", AgentID=session.agent_id, SessionID=session.id), - *[Block("ObjectData", CacheMissType=0, ID=x) for x in local_ids[:100]], + *[Block("ObjectData", CacheMissType=0, ID=x) for x in ids_to_req[:100]], )) - local_ids = local_ids[100:] + ids_to_req = ids_to_req[100:] + + futures = [] + for local_id in local_ids: + fut = asyncio.Future() + self._update_futures[local_id].append(fut) + futures.append(fut) + return futures diff --git a/tests/proxy/test_object_manager.py b/tests/proxy/test_object_manager.py index fd88539..3a22861 100644 --- a/tests/proxy/test_object_manager.py +++ b/tests/proxy/test_object_manager.py @@ -1,3 +1,4 @@ +import asyncio import math import random import unittest @@ -33,10 +34,18 @@ OBJECT_UPDATE_COMPRESSED_DATA = ( ) +class MockSession: + def __init__(self): + self.id = UUID.random() + self.agent_id = UUID.random() + self.session_manager = None + + class MockRegion: def __init__(self, message_handler: MessageHandler): - self.session = lambda: None + self.session = lambda: MockSession() self.handle = 123 + self.circuit = mock.MagicMock() self.cache_id = UUID.random() self.message_handler = message_handler self.http_message_handler = MessageHandler() @@ -54,7 +63,7 @@ class ObjectTrackingAddon(BaseAddon): self.events.append(("kill", obj)) -class ObjectManagerTests(unittest.TestCase): +class ObjectManagerTestMixin(unittest.TestCase): def setUp(self) -> None: self.message_handler = MessageHandler() self.region = MockRegion(self.message_handler) @@ -136,6 +145,8 @@ class ObjectManagerTests(unittest.TestCase): def _get_avatar_positions(self) -> Dict[UUID, Vector3]: return {av.FullID: av.RegionPosition for av in self.object_manager.all_avatars} + +class ObjectManagerTests(ObjectManagerTestMixin, unittest.TestCase): def test_basic_tracking(self): """Does creating an object result in it being tracked?""" msg = self._create_object_update() @@ -429,3 +440,24 @@ class ObjectManagerTests(unittest.TestCase): self.assertEqual(obj.FullID, UUID('121210bf-1658-427e-8fb4-fb001acd9be5')) # Flags from the ObjectUpdateCached should have been merged in self.assertEqual(obj.UpdateFlags, 4321) + + +class AsyncObjectManagerTests(ObjectManagerTestMixin, unittest.IsolatedAsyncioTestCase): + async def test_request_objects(self): + # request three objects, one of which won't receive an ObjectUpdate + futures = self.object_manager.request_objects((1234, 1235, 1236)) + self._create_object(1234) + self._create_object(1235) + done, pending = await asyncio.wait(futures, timeout=0.0001) + objects = await asyncio.gather(*done) + # wait() returns unordered results, so use a set. + self.assertEqual(set(o.LocalID for o in objects), {1234, 1235}) + pending = list(pending) + self.assertEqual(len(pending), 1) + # The other futures being resolved should have removed them from the dict + pending_futures = sum(len(x) for x in self.object_manager._update_futures.values()) + self.assertEqual(pending_futures, 1) + + self.assertFalse(pending[0].cancelled()) + self.object_manager.clear() + self.assertTrue(pending[0].cancelled())