Support mutable InventoryModels
This commit is contained in:
@@ -18,6 +18,8 @@ You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program; if not, write to the Free Software Foundation,
|
||||
Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import enum
|
||||
import hashlib
|
||||
@@ -247,6 +249,7 @@ class Quaternion(TupleCoord):
|
||||
|
||||
class UUID(uuid.UUID):
|
||||
_NULL_UUID_STR = '00000000-0000-0000-0000-000000000000'
|
||||
ZERO: UUID
|
||||
__slots__ = ()
|
||||
|
||||
def __init__(self, val: Union[uuid.UUID, str, None] = None, bytes=None, int=None):
|
||||
@@ -271,6 +274,9 @@ class UUID(uuid.UUID):
|
||||
return self.__class__(int=self.int ^ other.int)
|
||||
|
||||
|
||||
UUID.ZERO = UUID()
|
||||
|
||||
|
||||
class JankStringyBytes(bytes):
|
||||
"""
|
||||
Treat bytes as UTF8 if used in string context
|
||||
|
||||
@@ -138,6 +138,11 @@ class InventoryModel(InventoryBase):
|
||||
self.items: Dict[UUID, InventoryItem] = {}
|
||||
self.root: Optional[InventoryContainerBase] = None
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, InventoryModel):
|
||||
return False
|
||||
return tuple(self.all_nodes) == tuple(other.all_nodes)
|
||||
|
||||
@classmethod
|
||||
def from_reader(cls, reader: StringIO, read_header=False) -> InventoryModel:
|
||||
model = cls()
|
||||
@@ -145,18 +150,17 @@ class InventoryModel(InventoryBase):
|
||||
if key == "inv_object":
|
||||
obj = InventoryObject.from_reader(reader)
|
||||
if obj is not None:
|
||||
model.add_container(obj)
|
||||
model.add(obj)
|
||||
elif key == "inv_category":
|
||||
cat = InventoryCategory.from_reader(reader)
|
||||
if cat is not None:
|
||||
model.add_container(cat)
|
||||
model.add(cat)
|
||||
elif key == "inv_item":
|
||||
item = InventoryItem.from_reader(reader)
|
||||
if item is not None:
|
||||
model.add_item(item)
|
||||
model.add(item)
|
||||
else:
|
||||
LOG.warning("Unknown key {0}".format(key))
|
||||
model.reparent_nodes()
|
||||
return model
|
||||
|
||||
@classmethod
|
||||
@@ -165,16 +169,15 @@ class InventoryModel(InventoryBase):
|
||||
for obj_dict in llsd_val:
|
||||
if InventoryCategory.ID_ATTR in obj_dict:
|
||||
if (obj := InventoryCategory.from_llsd(obj_dict)) is not None:
|
||||
model.add_container(obj)
|
||||
model.add(obj)
|
||||
elif InventoryObject.ID_ATTR in obj_dict:
|
||||
if (obj := InventoryObject.from_llsd(obj_dict)) is not None:
|
||||
model.add_container(obj)
|
||||
model.add(obj)
|
||||
elif InventoryItem.ID_ATTR in obj_dict:
|
||||
if (obj := InventoryItem.from_llsd(obj_dict)) is not None:
|
||||
model.add_item(obj)
|
||||
model.add(obj)
|
||||
else:
|
||||
LOG.warning(f"Unknown object type {obj_dict!r}")
|
||||
model.reparent_nodes()
|
||||
return model
|
||||
|
||||
def to_writer(self, writer: StringIO):
|
||||
@@ -191,28 +194,40 @@ class InventoryModel(InventoryBase):
|
||||
vals.append(item.to_llsd())
|
||||
return vals
|
||||
|
||||
def add_container(self, container: InventoryContainerBase):
|
||||
self.containers[container.node_id] = container
|
||||
container.model = weakref.proxy(self)
|
||||
def add(self, node: InventoryNodeBase):
|
||||
if node.node_id in self.items or node.node_id in self.containers:
|
||||
raise KeyError(f"{node.node_id} already exists in the inventory model")
|
||||
|
||||
def add_item(self, item: InventoryItem):
|
||||
self.items[item.item_id] = item
|
||||
item.model = weakref.proxy(self)
|
||||
if isinstance(node, InventoryContainerBase):
|
||||
self.containers[node.node_id] = node
|
||||
if node.parent_id == UUID.ZERO:
|
||||
self.root = node
|
||||
elif isinstance(node, InventoryItem):
|
||||
self.items[node.node_id] = node
|
||||
else:
|
||||
raise ValueError(f"Unknown node type for {node!r}")
|
||||
node.model = weakref.proxy(self)
|
||||
|
||||
def reparent_nodes(self):
|
||||
self.root = None
|
||||
def unlink(self, node: InventoryNodeBase) -> Sequence[InventoryNodeBase]:
|
||||
"""Unlink a node and its descendants from the tree, returning the removed nodes"""
|
||||
assert node.model == self
|
||||
if node == self.root:
|
||||
self.root = None
|
||||
unlinked = [node]
|
||||
if isinstance(node, InventoryContainerBase):
|
||||
for child in node.children:
|
||||
unlinked.extend(self.unlink(child))
|
||||
self.items.pop(node.node_id, None)
|
||||
self.containers.pop(node.node_id, None)
|
||||
node.model = None
|
||||
return unlinked
|
||||
|
||||
@property
|
||||
def all_nodes(self) -> Iterable[InventoryNodeBase]:
|
||||
for container in self.containers.values():
|
||||
container.children.clear()
|
||||
if container.parent_id == UUID():
|
||||
self.root = container
|
||||
for obj in itertools.chain(self.items.values(), self.containers.values()):
|
||||
if not obj.parent_id or obj.parent_id == UUID():
|
||||
continue
|
||||
parent_container = self.containers.get(obj.parent_id)
|
||||
if not parent_container:
|
||||
LOG.warning("{0} had an invalid parent {1}".format(obj, obj.parent_id))
|
||||
continue
|
||||
parent_container.children.append(obj)
|
||||
yield container
|
||||
for item in self.items.values():
|
||||
yield item
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
@@ -243,16 +258,21 @@ class InventoryNodeBase(InventoryBase):
|
||||
ID_ATTR: ClassVar[str]
|
||||
|
||||
parent_id: Optional[UUID] = schema_field(SchemaUUID)
|
||||
model: Optional[InventoryModel] = dataclasses.field(default=None, init=False)
|
||||
model: Optional[InventoryModel] = dataclasses.field(
|
||||
default=None, init=False, hash=False, compare=False, repr=False
|
||||
)
|
||||
|
||||
@property
|
||||
def node_id(self) -> UUID:
|
||||
return getattr(self, self.ID_ATTR)
|
||||
|
||||
@property
|
||||
def parent(self):
|
||||
def parent(self) -> Optional[InventoryContainerBase]:
|
||||
return self.model.containers.get(self.parent_id)
|
||||
|
||||
def unlink(self) -> Sequence[InventoryNodeBase]:
|
||||
return self.model.unlink(self)
|
||||
|
||||
@classmethod
|
||||
def _obj_from_dict(cls, obj_dict):
|
||||
# Bad entry, ignore
|
||||
@@ -267,7 +287,15 @@ class InventoryNodeBase(InventoryBase):
|
||||
class InventoryContainerBase(InventoryNodeBase):
|
||||
type: str = schema_field(SchemaStr)
|
||||
name: str = schema_field(SchemaMultilineStr)
|
||||
children: List[InventoryNodeBase] = dataclasses.field(default_factory=list, init=False)
|
||||
|
||||
@property
|
||||
def children(self) -> Sequence[InventoryNodeBase]:
|
||||
return tuple(
|
||||
x for x in (
|
||||
itertools.chain(self.model.containers.values(), self.model.items.values())
|
||||
)
|
||||
if x.parent_id == self.node_id
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import copy
|
||||
import unittest
|
||||
|
||||
from hippolyzer.lib.base.datatypes import *
|
||||
@@ -44,27 +45,59 @@ SIMPLE_INV = """\tinv_object\t0
|
||||
|
||||
|
||||
class TestLegacyInv(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.model = InventoryModel.from_str(SIMPLE_INV)
|
||||
|
||||
def test_parse(self):
|
||||
model = InventoryModel.from_str(SIMPLE_INV)
|
||||
self.assertTrue(UUID('f4d91477-def1-487a-b4f3-6fa201c17376') in model.containers)
|
||||
self.assertIsNotNone(model.root)
|
||||
self.assertTrue(UUID('f4d91477-def1-487a-b4f3-6fa201c17376') in self.model.containers)
|
||||
self.assertIsNotNone(self.model.root)
|
||||
|
||||
def test_serialize(self):
|
||||
model = InventoryModel.from_str(SIMPLE_INV)
|
||||
new_model = InventoryModel.from_str(model.to_str())
|
||||
self.assertEqual(model, new_model)
|
||||
self.model = InventoryModel.from_str(SIMPLE_INV)
|
||||
new_model = InventoryModel.from_str(self.model.to_str())
|
||||
self.assertEqual(self.model, new_model)
|
||||
|
||||
def test_item_access(self):
|
||||
model = InventoryModel.from_str(SIMPLE_INV)
|
||||
item = model.items[UUID('dd163122-946b-44df-99f6-a6030e2b9597')]
|
||||
|
||||
item = self.model.items[UUID('dd163122-946b-44df-99f6-a6030e2b9597')]
|
||||
self.assertEqual(item.name, "New Script")
|
||||
self.assertEqual(item.sale_info.sale_type, "not")
|
||||
self.assertEqual(item.model, model)
|
||||
self.assertEqual(item.model, self.model)
|
||||
|
||||
def test_access_children(self):
|
||||
root = self.model.root
|
||||
item = tuple(self.model.items.values())[0]
|
||||
self.assertEqual((item,), root.children)
|
||||
|
||||
def test_access_parent(self):
|
||||
root = self.model.root
|
||||
item = tuple(self.model.items.values())[0]
|
||||
self.assertEqual(root, item.parent)
|
||||
self.assertEqual(None, root.parent)
|
||||
|
||||
def test_unlink(self):
|
||||
self.assertEqual(1, len(self.model.root.children))
|
||||
item = tuple(self.model.items.values())[0]
|
||||
self.assertEqual([item], item.unlink())
|
||||
self.assertEqual(0, len(self.model.root.children))
|
||||
self.assertEqual(None, item.model)
|
||||
|
||||
def test_relink(self):
|
||||
item = tuple(self.model.items.values())[0]
|
||||
for unlinked in item.unlink():
|
||||
self.model.add(unlinked)
|
||||
self.assertEqual(self.model, item.model)
|
||||
self.assertEqual(1, len(self.model.root.children))
|
||||
|
||||
def test_eq_excludes_model(self):
|
||||
item = tuple(self.model.items.values())[0]
|
||||
item_copy = copy.copy(item)
|
||||
item_copy.model = None
|
||||
self.assertEqual(item, item_copy)
|
||||
|
||||
def test_llsd_serialization(self):
|
||||
model = InventoryModel.from_str(SIMPLE_INV)
|
||||
self.assertEqual(
|
||||
model.to_llsd(),
|
||||
self.model.to_llsd(),
|
||||
[
|
||||
{
|
||||
'name': 'Contents',
|
||||
@@ -102,9 +135,10 @@ class TestLegacyInv(unittest.TestCase):
|
||||
)
|
||||
|
||||
def test_llsd_legacy_equality(self):
|
||||
model = InventoryModel.from_str(SIMPLE_INV)
|
||||
new_model = InventoryModel.from_llsd(model.to_llsd())
|
||||
self.assertEqual(model, new_model)
|
||||
new_model = InventoryModel.from_llsd(self.model.to_llsd())
|
||||
self.assertEqual(self.model, new_model)
|
||||
tuple(new_model.containers.values())[0].name = "foo"
|
||||
self.assertNotEqual(self.model, new_model)
|
||||
|
||||
|
||||
GIRL_NEXT_DOOR_SHAPE = """LLWearable version 22
|
||||
|
||||
Reference in New Issue
Block a user