From f3c8015366cab9f3395f5fd20465789517a3ba07 Mon Sep 17 00:00:00 2001 From: Salad Dais Date: Fri, 8 Jul 2022 22:04:59 +0000 Subject: [PATCH] Support mutable InventoryModels --- hippolyzer/lib/base/datatypes.py | 6 +++ hippolyzer/lib/base/inventory.py | 88 +++++++++++++++++++++----------- tests/base/test_legacy_schema.py | 62 +++++++++++++++++----- 3 files changed, 112 insertions(+), 44 deletions(-) diff --git a/hippolyzer/lib/base/datatypes.py b/hippolyzer/lib/base/datatypes.py index 21fe98c..022f7f7 100644 --- a/hippolyzer/lib/base/datatypes.py +++ b/hippolyzer/lib/base/datatypes.py @@ -18,6 +18,8 @@ You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ +from __future__ import annotations + import ast import enum import hashlib @@ -247,6 +249,7 @@ class Quaternion(TupleCoord): class UUID(uuid.UUID): _NULL_UUID_STR = '00000000-0000-0000-0000-000000000000' + ZERO: UUID __slots__ = () def __init__(self, val: Union[uuid.UUID, str, None] = None, bytes=None, int=None): @@ -271,6 +274,9 @@ class UUID(uuid.UUID): return self.__class__(int=self.int ^ other.int) +UUID.ZERO = UUID() + + class JankStringyBytes(bytes): """ Treat bytes as UTF8 if used in string context diff --git a/hippolyzer/lib/base/inventory.py b/hippolyzer/lib/base/inventory.py index d727ec8..07c9f60 100644 --- a/hippolyzer/lib/base/inventory.py +++ b/hippolyzer/lib/base/inventory.py @@ -138,6 +138,11 @@ class InventoryModel(InventoryBase): self.items: Dict[UUID, InventoryItem] = {} self.root: Optional[InventoryContainerBase] = None + def __eq__(self, other): + if not isinstance(other, InventoryModel): + return False + return tuple(self.all_nodes) == tuple(other.all_nodes) + @classmethod def from_reader(cls, reader: StringIO, read_header=False) -> InventoryModel: model = cls() @@ -145,18 +150,17 @@ class InventoryModel(InventoryBase): if key == "inv_object": obj = InventoryObject.from_reader(reader) if obj is not None: - model.add_container(obj) + model.add(obj) elif key == "inv_category": cat = InventoryCategory.from_reader(reader) if cat is not None: - model.add_container(cat) + model.add(cat) elif key == "inv_item": item = InventoryItem.from_reader(reader) if item is not None: - model.add_item(item) + model.add(item) else: LOG.warning("Unknown key {0}".format(key)) - model.reparent_nodes() return model @classmethod @@ -165,16 +169,15 @@ class InventoryModel(InventoryBase): for obj_dict in llsd_val: if InventoryCategory.ID_ATTR in obj_dict: if (obj := InventoryCategory.from_llsd(obj_dict)) is not None: - model.add_container(obj) + model.add(obj) elif InventoryObject.ID_ATTR in obj_dict: if (obj := InventoryObject.from_llsd(obj_dict)) is not None: - model.add_container(obj) + model.add(obj) elif InventoryItem.ID_ATTR in obj_dict: if (obj := InventoryItem.from_llsd(obj_dict)) is not None: - model.add_item(obj) + model.add(obj) else: LOG.warning(f"Unknown object type {obj_dict!r}") - model.reparent_nodes() return model def to_writer(self, writer: StringIO): @@ -191,28 +194,40 @@ class InventoryModel(InventoryBase): vals.append(item.to_llsd()) return vals - def add_container(self, container: InventoryContainerBase): - self.containers[container.node_id] = container - container.model = weakref.proxy(self) + def add(self, node: InventoryNodeBase): + if node.node_id in self.items or node.node_id in self.containers: + raise KeyError(f"{node.node_id} already exists in the inventory model") - def add_item(self, item: InventoryItem): - self.items[item.item_id] = item - item.model = weakref.proxy(self) + if isinstance(node, InventoryContainerBase): + self.containers[node.node_id] = node + if node.parent_id == UUID.ZERO: + self.root = node + elif isinstance(node, InventoryItem): + self.items[node.node_id] = node + else: + raise ValueError(f"Unknown node type for {node!r}") + node.model = weakref.proxy(self) - def reparent_nodes(self): - self.root = None + def unlink(self, node: InventoryNodeBase) -> Sequence[InventoryNodeBase]: + """Unlink a node and its descendants from the tree, returning the removed nodes""" + assert node.model == self + if node == self.root: + self.root = None + unlinked = [node] + if isinstance(node, InventoryContainerBase): + for child in node.children: + unlinked.extend(self.unlink(child)) + self.items.pop(node.node_id, None) + self.containers.pop(node.node_id, None) + node.model = None + return unlinked + + @property + def all_nodes(self) -> Iterable[InventoryNodeBase]: for container in self.containers.values(): - container.children.clear() - if container.parent_id == UUID(): - self.root = container - for obj in itertools.chain(self.items.values(), self.containers.values()): - if not obj.parent_id or obj.parent_id == UUID(): - continue - parent_container = self.containers.get(obj.parent_id) - if not parent_container: - LOG.warning("{0} had an invalid parent {1}".format(obj, obj.parent_id)) - continue - parent_container.children.append(obj) + yield container + for item in self.items.values(): + yield item @dataclasses.dataclass @@ -243,16 +258,21 @@ class InventoryNodeBase(InventoryBase): ID_ATTR: ClassVar[str] parent_id: Optional[UUID] = schema_field(SchemaUUID) - model: Optional[InventoryModel] = dataclasses.field(default=None, init=False) + model: Optional[InventoryModel] = dataclasses.field( + default=None, init=False, hash=False, compare=False, repr=False + ) @property def node_id(self) -> UUID: return getattr(self, self.ID_ATTR) @property - def parent(self): + def parent(self) -> Optional[InventoryContainerBase]: return self.model.containers.get(self.parent_id) + def unlink(self) -> Sequence[InventoryNodeBase]: + return self.model.unlink(self) + @classmethod def _obj_from_dict(cls, obj_dict): # Bad entry, ignore @@ -267,7 +287,15 @@ class InventoryNodeBase(InventoryBase): class InventoryContainerBase(InventoryNodeBase): type: str = schema_field(SchemaStr) name: str = schema_field(SchemaMultilineStr) - children: List[InventoryNodeBase] = dataclasses.field(default_factory=list, init=False) + + @property + def children(self) -> Sequence[InventoryNodeBase]: + return tuple( + x for x in ( + itertools.chain(self.model.containers.values(), self.model.items.values()) + ) + if x.parent_id == self.node_id + ) @dataclasses.dataclass diff --git a/tests/base/test_legacy_schema.py b/tests/base/test_legacy_schema.py index 1db7269..16b3f50 100644 --- a/tests/base/test_legacy_schema.py +++ b/tests/base/test_legacy_schema.py @@ -1,3 +1,4 @@ +import copy import unittest from hippolyzer.lib.base.datatypes import * @@ -44,27 +45,59 @@ SIMPLE_INV = """\tinv_object\t0 class TestLegacyInv(unittest.TestCase): + def setUp(self) -> None: + self.model = InventoryModel.from_str(SIMPLE_INV) + def test_parse(self): - model = InventoryModel.from_str(SIMPLE_INV) - self.assertTrue(UUID('f4d91477-def1-487a-b4f3-6fa201c17376') in model.containers) - self.assertIsNotNone(model.root) + self.assertTrue(UUID('f4d91477-def1-487a-b4f3-6fa201c17376') in self.model.containers) + self.assertIsNotNone(self.model.root) def test_serialize(self): - model = InventoryModel.from_str(SIMPLE_INV) - new_model = InventoryModel.from_str(model.to_str()) - self.assertEqual(model, new_model) + self.model = InventoryModel.from_str(SIMPLE_INV) + new_model = InventoryModel.from_str(self.model.to_str()) + self.assertEqual(self.model, new_model) def test_item_access(self): - model = InventoryModel.from_str(SIMPLE_INV) - item = model.items[UUID('dd163122-946b-44df-99f6-a6030e2b9597')] + + item = self.model.items[UUID('dd163122-946b-44df-99f6-a6030e2b9597')] self.assertEqual(item.name, "New Script") self.assertEqual(item.sale_info.sale_type, "not") - self.assertEqual(item.model, model) + self.assertEqual(item.model, self.model) + + def test_access_children(self): + root = self.model.root + item = tuple(self.model.items.values())[0] + self.assertEqual((item,), root.children) + + def test_access_parent(self): + root = self.model.root + item = tuple(self.model.items.values())[0] + self.assertEqual(root, item.parent) + self.assertEqual(None, root.parent) + + def test_unlink(self): + self.assertEqual(1, len(self.model.root.children)) + item = tuple(self.model.items.values())[0] + self.assertEqual([item], item.unlink()) + self.assertEqual(0, len(self.model.root.children)) + self.assertEqual(None, item.model) + + def test_relink(self): + item = tuple(self.model.items.values())[0] + for unlinked in item.unlink(): + self.model.add(unlinked) + self.assertEqual(self.model, item.model) + self.assertEqual(1, len(self.model.root.children)) + + def test_eq_excludes_model(self): + item = tuple(self.model.items.values())[0] + item_copy = copy.copy(item) + item_copy.model = None + self.assertEqual(item, item_copy) def test_llsd_serialization(self): - model = InventoryModel.from_str(SIMPLE_INV) self.assertEqual( - model.to_llsd(), + self.model.to_llsd(), [ { 'name': 'Contents', @@ -102,9 +135,10 @@ class TestLegacyInv(unittest.TestCase): ) def test_llsd_legacy_equality(self): - model = InventoryModel.from_str(SIMPLE_INV) - new_model = InventoryModel.from_llsd(model.to_llsd()) - self.assertEqual(model, new_model) + new_model = InventoryModel.from_llsd(self.model.to_llsd()) + self.assertEqual(self.model, new_model) + tuple(new_model.containers.values())[0].name = "foo" + self.assertNotEqual(self.model, new_model) GIRL_NEXT_DOOR_SHAPE = """LLWearable version 22