Allow awaiting object update / property requests
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import enum
|
||||
import itertools
|
||||
import logging
|
||||
import math
|
||||
import typing
|
||||
@@ -72,6 +74,13 @@ class LocationType(enum.IntEnum):
|
||||
EXACT = enum.auto()
|
||||
|
||||
|
||||
class UpdateType(enum.IntEnum):
|
||||
OBJECT_UPDATE = enum.auto()
|
||||
PROPERTIES = enum.auto()
|
||||
FAMILY = enum.auto()
|
||||
COSTS = enum.auto()
|
||||
|
||||
|
||||
class Avatar:
|
||||
"""Wrapper for an avatar known through ObjectUpdate or CoarseLocationUpdate"""
|
||||
def __init__(
|
||||
@@ -131,12 +140,14 @@ class ObjectManager:
|
||||
self._localid_lookup: typing.Dict[int, Object] = {}
|
||||
self._fullid_lookup: typing.Dict[UUID, int] = {}
|
||||
self._coarse_locations: typing.Dict[UUID, Vector3] = {}
|
||||
self._update_futures: typing.Dict[int, List[asyncio.Future]] = collections.defaultdict(list)
|
||||
self._property_futures: typing.Dict[int, List[asyncio.Future]] = collections.defaultdict(list)
|
||||
# Objects that we've seen references to but don't have data for
|
||||
self.missing_locals = set()
|
||||
self._orphan_manager = OrphanManager()
|
||||
name_cache = None
|
||||
session = self._region.session()
|
||||
if session:
|
||||
if session and session.session_manager:
|
||||
name_cache = session.session_manager.name_cache
|
||||
# Use a local namecache if we don't have a session manager
|
||||
self.name_cache: Optional[NameCache] = name_cache or NameCache()
|
||||
@@ -219,7 +230,7 @@ class ObjectManager:
|
||||
self._parent_object(child_obj)
|
||||
|
||||
if notify:
|
||||
self._run_object_update_hooks(obj, set(obj.to_dict().keys()))
|
||||
self._run_object_update_hooks(obj, set(obj.to_dict().keys()), UpdateType.OBJECT_UPDATE)
|
||||
|
||||
def _untrack_object(self, obj: Object):
|
||||
former_child_ids = obj.ChildIDs[:]
|
||||
@@ -279,7 +290,7 @@ class ObjectManager:
|
||||
else:
|
||||
LOG.debug(f"Changing parent of {obj.LocalID}, but couldn't find old parent")
|
||||
|
||||
def _update_existing_object(self, obj: Object, new_properties):
|
||||
def _update_existing_object(self, obj: Object, new_properties: dict, update_type: UpdateType):
|
||||
new_parent_id = new_properties.get("ParentID", obj.ParentID)
|
||||
|
||||
actually_updated_props = set()
|
||||
@@ -310,7 +321,7 @@ class ObjectManager:
|
||||
# Common case where this may be falsy is if we get an ObjectUpdateCached
|
||||
# that didn't have a changed UpdateFlags field.
|
||||
if actually_updated_props:
|
||||
self._run_object_update_hooks(obj, actually_updated_props)
|
||||
self._run_object_update_hooks(obj, actually_updated_props, update_type)
|
||||
|
||||
def _handle_object_update(self, packet: ProxiedMessage):
|
||||
seen_locals = []
|
||||
@@ -320,7 +331,7 @@ class ObjectManager:
|
||||
seen_locals.append(object_data["LocalID"])
|
||||
obj = self.lookup_fullid(object_data["FullID"])
|
||||
if obj:
|
||||
self._update_existing_object(obj, object_data)
|
||||
self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE)
|
||||
else:
|
||||
obj = Object(**object_data)
|
||||
self._track_object(obj)
|
||||
@@ -339,7 +350,7 @@ class ObjectManager:
|
||||
|
||||
seen_locals.append(object_data["LocalID"])
|
||||
if obj:
|
||||
self._update_existing_object(obj, object_data)
|
||||
self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE)
|
||||
else:
|
||||
self.missing_locals.add(object_data["LocalID"])
|
||||
LOG.debug(f"Received terse update for unknown object {object_data['LocalID']}")
|
||||
@@ -357,7 +368,7 @@ class ObjectManager:
|
||||
if obj is not None:
|
||||
self._update_existing_object(obj, {
|
||||
"UpdateFlags": update_flags,
|
||||
})
|
||||
}, UpdateType.OBJECT_UPDATE)
|
||||
continue
|
||||
|
||||
# Check if the object is in a viewer's VOCache
|
||||
@@ -379,7 +390,7 @@ class ObjectManager:
|
||||
seen_locals.append(object_data["LocalID"])
|
||||
obj = self.lookup_localid(object_data["LocalID"])
|
||||
if obj:
|
||||
self._update_existing_object(obj, object_data)
|
||||
self._update_existing_object(obj, object_data, UpdateType.OBJECT_UPDATE)
|
||||
else:
|
||||
obj = Object(**object_data)
|
||||
self._track_object(obj)
|
||||
@@ -395,7 +406,7 @@ class ObjectManager:
|
||||
obj = self.lookup_fullid(block["ObjectID"])
|
||||
if obj:
|
||||
seen_locals.append(obj.LocalID)
|
||||
self._update_existing_object(obj, object_properties)
|
||||
self._update_existing_object(obj, object_properties, UpdateType.PROPERTIES)
|
||||
else:
|
||||
LOG.debug(f"Received {packet.name} for unknown {block['ObjectID']}")
|
||||
packet.meta["ObjectUpdateIDs"] = tuple(seen_locals)
|
||||
@@ -445,7 +456,7 @@ class ObjectManager:
|
||||
LOG.debug(f"Received ObjectCost for unknown {object_id}")
|
||||
continue
|
||||
obj.ObjectCosts.update(object_costs)
|
||||
self._run_object_update_hooks(obj, {"ObjectCosts"})
|
||||
self._run_object_update_hooks(obj, {"ObjectCosts"}, UpdateType.COSTS)
|
||||
|
||||
def _handle_coarse_location_update(self, packet: ProxiedMessage):
|
||||
self._coarse_locations.clear()
|
||||
@@ -465,10 +476,20 @@ class ObjectManager:
|
||||
|
||||
self._coarse_locations.update(coarse_locations)
|
||||
|
||||
def _run_object_update_hooks(self, obj: Object, updated_props: Set[str]):
|
||||
def _run_object_update_hooks(self, obj: Object, updated_props: Set[str], update_type: UpdateType):
|
||||
if obj.PCode == PCode.AVATAR and "NameValue" in updated_props:
|
||||
if obj.NameValue:
|
||||
self.name_cache.update(obj.FullID, obj.NameValue.to_dict())
|
||||
if update_type == UpdateType.OBJECT_UPDATE:
|
||||
update_futures = self._update_futures[obj.LocalID]
|
||||
for fut in update_futures[:]:
|
||||
fut.set_result(obj)
|
||||
update_futures.remove(fut)
|
||||
elif update_type == UpdateType.PROPERTIES:
|
||||
property_futures = self._property_futures[obj.LocalID]
|
||||
for fut in property_futures[:]:
|
||||
fut.set_result(obj)
|
||||
property_futures.remove(fut)
|
||||
AddonManager.handle_object_updated(self._region.session(), self._region, obj, updated_props)
|
||||
|
||||
def load_cache(self):
|
||||
@@ -487,46 +508,81 @@ class ObjectManager:
|
||||
self._coarse_locations.clear()
|
||||
self._orphan_manager.clear()
|
||||
self.missing_locals.clear()
|
||||
self._clear_futures(self._update_futures)
|
||||
self._clear_futures(self._property_futures)
|
||||
self.object_cache = RegionViewerObjectCacheChain([])
|
||||
self.cache_loaded = False
|
||||
|
||||
def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]]):
|
||||
@staticmethod
|
||||
def _clear_futures(future_dict: dict):
|
||||
for future in itertools.chain(*future_dict.values()):
|
||||
future.cancel()
|
||||
future_dict.clear()
|
||||
|
||||
def request_object_properties(self, objects: typing.Union[OBJECT_OR_LOCAL, typing.Sequence[OBJECT_OR_LOCAL]])\
|
||||
-> List[asyncio.Future[Object]]:
|
||||
if isinstance(objects, (Object, int)):
|
||||
objects = (objects,)
|
||||
if not objects:
|
||||
return
|
||||
return []
|
||||
|
||||
session = self._region.session()
|
||||
|
||||
local_ids = tuple((o.LocalID if isinstance(o, Object) else o) for o in objects)
|
||||
|
||||
# Don't mess with already selected objects
|
||||
local_ids = tuple(local for local in local_ids if local not in session.selected.object_locals)
|
||||
unselected_ids = tuple(local for local in local_ids if local not in session.selected.object_locals)
|
||||
ids_to_req = unselected_ids
|
||||
|
||||
while local_ids:
|
||||
while ids_to_req:
|
||||
blocks = [
|
||||
Block("AgentData", AgentID=session.agent_id, SessionID=session.id),
|
||||
*[Block("ObjectData", ObjectLocalID=x) for x in local_ids[:100]],
|
||||
*[Block("ObjectData", ObjectLocalID=x) for x in ids_to_req[:100]],
|
||||
]
|
||||
# Selecting causes ObjectProperties to be sent
|
||||
self._region.circuit.send_message(ProxiedMessage("ObjectSelect", blocks))
|
||||
self._region.circuit.send_message(ProxiedMessage("ObjectDeselect", blocks))
|
||||
local_ids = local_ids[100:]
|
||||
ids_to_req = ids_to_req[100:]
|
||||
|
||||
def request_missing_objects(self):
|
||||
self.request_objects(self.missing_locals)
|
||||
futures = []
|
||||
for local_id in local_ids:
|
||||
fut = asyncio.Future()
|
||||
if local_id in unselected_ids:
|
||||
# Need to wait until we get our reply
|
||||
self._property_futures[local_id].append(fut)
|
||||
else:
|
||||
# This was selected so we should already have up to date info
|
||||
fut.set_result(self.lookup_localid(local_id))
|
||||
futures.append(fut)
|
||||
return futures
|
||||
|
||||
def request_objects(self, local_ids):
|
||||
def request_missing_objects(self) -> List[Awaitable[Object]]:
|
||||
return self.request_objects(self.missing_locals)
|
||||
|
||||
def request_objects(self, local_ids) -> List[Awaitable[Object]]:
|
||||
"""
|
||||
Request object local IDs, returning a list of awaitable handles for the objects
|
||||
|
||||
Some may never be resolved, so use `asyncio.wait()` or `asyncio.wait_for()`.
|
||||
"""
|
||||
if isinstance(local_ids, int):
|
||||
local_ids = (local_ids,)
|
||||
if isinstance(local_ids, set):
|
||||
local_ids = tuple(local_ids)
|
||||
|
||||
session = self._region.session()
|
||||
while local_ids:
|
||||
ids_to_req = local_ids
|
||||
while ids_to_req:
|
||||
self._region.circuit.send_message(ProxiedMessage(
|
||||
"RequestMultipleObjects",
|
||||
Block("AgentData", AgentID=session.agent_id, SessionID=session.id),
|
||||
*[Block("ObjectData", CacheMissType=0, ID=x) for x in local_ids[:100]],
|
||||
*[Block("ObjectData", CacheMissType=0, ID=x) for x in ids_to_req[:100]],
|
||||
))
|
||||
local_ids = local_ids[100:]
|
||||
ids_to_req = ids_to_req[100:]
|
||||
|
||||
futures = []
|
||||
for local_id in local_ids:
|
||||
fut = asyncio.Future()
|
||||
self._update_futures[local_id].append(fut)
|
||||
futures.append(fut)
|
||||
return futures
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import math
|
||||
import random
|
||||
import unittest
|
||||
@@ -33,10 +34,18 @@ OBJECT_UPDATE_COMPRESSED_DATA = (
|
||||
)
|
||||
|
||||
|
||||
class MockSession:
|
||||
def __init__(self):
|
||||
self.id = UUID.random()
|
||||
self.agent_id = UUID.random()
|
||||
self.session_manager = None
|
||||
|
||||
|
||||
class MockRegion:
|
||||
def __init__(self, message_handler: MessageHandler):
|
||||
self.session = lambda: None
|
||||
self.session = lambda: MockSession()
|
||||
self.handle = 123
|
||||
self.circuit = mock.MagicMock()
|
||||
self.cache_id = UUID.random()
|
||||
self.message_handler = message_handler
|
||||
self.http_message_handler = MessageHandler()
|
||||
@@ -54,7 +63,7 @@ class ObjectTrackingAddon(BaseAddon):
|
||||
self.events.append(("kill", obj))
|
||||
|
||||
|
||||
class ObjectManagerTests(unittest.TestCase):
|
||||
class ObjectManagerTestMixin(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.message_handler = MessageHandler()
|
||||
self.region = MockRegion(self.message_handler)
|
||||
@@ -136,6 +145,8 @@ class ObjectManagerTests(unittest.TestCase):
|
||||
def _get_avatar_positions(self) -> Dict[UUID, Vector3]:
|
||||
return {av.FullID: av.RegionPosition for av in self.object_manager.all_avatars}
|
||||
|
||||
|
||||
class ObjectManagerTests(ObjectManagerTestMixin, unittest.TestCase):
|
||||
def test_basic_tracking(self):
|
||||
"""Does creating an object result in it being tracked?"""
|
||||
msg = self._create_object_update()
|
||||
@@ -429,3 +440,24 @@ class ObjectManagerTests(unittest.TestCase):
|
||||
self.assertEqual(obj.FullID, UUID('121210bf-1658-427e-8fb4-fb001acd9be5'))
|
||||
# Flags from the ObjectUpdateCached should have been merged in
|
||||
self.assertEqual(obj.UpdateFlags, 4321)
|
||||
|
||||
|
||||
class AsyncObjectManagerTests(ObjectManagerTestMixin, unittest.IsolatedAsyncioTestCase):
|
||||
async def test_request_objects(self):
|
||||
# request three objects, one of which won't receive an ObjectUpdate
|
||||
futures = self.object_manager.request_objects((1234, 1235, 1236))
|
||||
self._create_object(1234)
|
||||
self._create_object(1235)
|
||||
done, pending = await asyncio.wait(futures, timeout=0.0001)
|
||||
objects = await asyncio.gather(*done)
|
||||
# wait() returns unordered results, so use a set.
|
||||
self.assertEqual(set(o.LocalID for o in objects), {1234, 1235})
|
||||
pending = list(pending)
|
||||
self.assertEqual(len(pending), 1)
|
||||
# The other futures being resolved should have removed them from the dict
|
||||
pending_futures = sum(len(x) for x in self.object_manager._update_futures.values())
|
||||
self.assertEqual(pending_futures, 1)
|
||||
|
||||
self.assertFalse(pending[0].cancelled())
|
||||
self.object_manager.clear()
|
||||
self.assertTrue(pending[0].cancelled())
|
||||
|
||||
Reference in New Issue
Block a user