Add support for LLSD inventory representations

This commit is contained in:
Salad Dais
2021-12-03 05:38:40 +00:00
parent 74c3287cc0
commit 5de3ed0d5e
6 changed files with 165 additions and 13 deletions

View File

@@ -3,7 +3,7 @@ Example of how to request a Transfer
"""
from typing import *
from hippolyzer.lib.base.legacy_inv import InventoryModel, InventoryItem
from hippolyzer.lib.base.inventory import InventoryModel, InventoryItem
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.templates import (
AssetType,

View File

@@ -2,7 +2,7 @@
Example of how to request an Xfer
"""
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.legacy_inv import InventoryModel
from hippolyzer.lib.base.inventory import InventoryModel
from hippolyzer.lib.base.templates import XferFilePath, AssetType, InventoryType, WearableType
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.proxy.addon_utils import BaseAddon, show_message

View File

@@ -9,6 +9,7 @@ import dataclasses
import datetime as dt
import itertools
import logging
import struct
import weakref
from io import StringIO
from typing import *
@@ -33,6 +34,17 @@ LOG = logging.getLogger(__name__)
_T = TypeVar("_T")
class SchemaFlagField(SchemaHexInt):
"""Like a hex int, but must be serialized as bytes in LLSD due to being a U32"""
@classmethod
def from_llsd(cls, val: Any) -> int:
return struct.unpack("!I", val)[0]
@classmethod
def to_llsd(cls, val: int) -> Any:
return struct.pack("!I", val)
def _yield_schema_tokens(reader: StringIO):
in_bracket = False
# empty str == EOF in Python
@@ -76,7 +88,7 @@ class InventoryBase(SchemaBase):
if schema_name != cls.SCHEMA_NAME:
raise ValueError(f"Expected schema name {schema_name!r} to be {cls.SCHEMA_NAME!r}")
fields = cls._fields_dict()
fields = cls._get_fields_dict()
obj_dict = {}
for key, val in tok_iter:
if key in fields:
@@ -100,7 +112,7 @@ class InventoryBase(SchemaBase):
def to_writer(self, writer: StringIO):
writer.write(f"\t{self.SCHEMA_NAME}\t0\n")
writer.write("\t{\n")
for field_name, field in self._fields_dict().items():
for field_name, field in self._get_fields_dict().items():
spec = field.metadata.get("spec")
# Not meant to be serialized
if not spec:
@@ -147,12 +159,38 @@ class InventoryModel(InventoryBase):
model.reparent_nodes()
return model
@classmethod
def from_llsd(cls, llsd_val: List[Dict]) -> InventoryModel:
model = cls()
for obj_dict in llsd_val:
if InventoryCategory.ID_ATTR in obj_dict:
if (obj := InventoryCategory.from_llsd(obj_dict)) is not None:
model.add_container(obj)
elif InventoryObject.ID_ATTR in obj_dict:
if (obj := InventoryObject.from_llsd(obj_dict)) is not None:
model.add_container(obj)
elif InventoryItem.ID_ATTR in obj_dict:
if (obj := InventoryItem.from_llsd(obj_dict)) is not None:
model.add_item(obj)
else:
LOG.warning(f"Unknown object type {obj_dict!r}")
model.reparent_nodes()
return model
def to_writer(self, writer: StringIO):
for container in self.containers.values():
container.to_writer(writer)
for item in self.items.values():
item.to_writer(writer)
def to_llsd(self):
vals = []
for container in self.containers.values():
vals.append(container.to_llsd())
for item in self.items.values():
vals.append(item.to_llsd())
return vals
def add_container(self, container: InventoryContainerBase):
self.containers[container.node_id] = container
container.model = weakref.proxy(self)
@@ -246,7 +284,7 @@ class InventoryCategory(InventoryContainerBase):
SCHEMA_NAME: ClassVar[str] = "inv_object"
cat_id: UUID = schema_field(SchemaUUID)
pref_type: str = schema_field(SchemaStr)
pref_type: str = schema_field(SchemaStr, llsd_name="preferred_type")
owner_id: UUID = schema_field(SchemaUUID)
version: int = schema_field(SchemaInt)
@@ -259,10 +297,10 @@ class InventoryItem(InventoryNodeBase):
item_id: UUID = schema_field(SchemaUUID)
type: str = schema_field(SchemaStr)
inv_type: str = schema_field(SchemaStr)
flags: int = schema_field(SchemaHexInt)
flags: int = schema_field(SchemaFlagField)
name: str = schema_field(SchemaMultilineStr)
desc: str = schema_field(SchemaMultilineStr)
creation_date: dt.datetime = schema_field(SchemaDate)
creation_date: dt.datetime = schema_field(SchemaDate, llsd_name="created_at")
permissions: InventoryPermissions = schema_field(InventoryPermissions)
sale_info: InventorySaleInfo = schema_field(InventorySaleInfo)
asset_id: Optional[UUID] = schema_field(SchemaUUID, default=None)

View File

@@ -31,6 +31,14 @@ class SchemaFieldSerializer(abc.ABC, Generic[_T]):
def serialize(cls, val: _T) -> str:
pass
@classmethod
def from_llsd(cls, val: Any) -> _T:
return val
@classmethod
def to_llsd(cls, val: _T) -> Any:
return val
class SchemaDate(SchemaFieldSerializer[dt.datetime]):
@classmethod
@@ -41,6 +49,14 @@ class SchemaDate(SchemaFieldSerializer[dt.datetime]):
def serialize(cls, val: dt.datetime) -> str:
return str(calendar.timegm(val.utctimetuple()))
@classmethod
def from_llsd(cls, val: Any) -> dt.datetime:
return dt.datetime.utcfromtimestamp(val)
@classmethod
def to_llsd(cls, val: dt.datetime):
return calendar.timegm(val.utctimetuple())
class SchemaHexInt(SchemaFieldSerializer[int]):
@classmethod
@@ -95,10 +111,11 @@ class SchemaUUID(SchemaFieldSerializer[UUID]):
def schema_field(spec: Type[Union[SchemaBase, SchemaFieldSerializer]], *, default=dataclasses.MISSING, init=True,
repr=True, hash=None, compare=True) -> dataclasses.Field: # noqa
repr=True, hash=None, compare=True, llsd_name=None) -> dataclasses.Field: # noqa
"""Describe a field in the inventory schema and the shape of its value"""
return dataclasses.field(
metadata={"spec": spec}, default=default, init=init, repr=repr, hash=hash, compare=compare
metadata={"spec": spec, "llsd_name": llsd_name}, default=default,
init=init, repr=repr, hash=hash, compare=compare,
)
@@ -121,8 +138,14 @@ def parse_schema_line(line: str):
@dataclasses.dataclass
class SchemaBase(abc.ABC):
@classmethod
def _fields_dict(cls):
return {f.name: f for f in dataclasses.fields(cls)}
def _get_fields_dict(cls, llsd=False):
fields_dict = {}
for field in dataclasses.fields(cls):
field_name = field.name
if llsd:
field_name = field.metadata.get("llsd_name") or field_name
fields_dict[field_name] = field
return fields_dict
@classmethod
def from_str(cls, text: str):
@@ -137,6 +160,30 @@ class SchemaBase(abc.ABC):
def from_bytes(cls, data: bytes):
return cls.from_str(data.decode("utf8"))
@classmethod
def from_llsd(cls, inv_dict: Dict):
fields = cls._get_fields_dict(llsd=True)
obj_dict = {}
for key, val in inv_dict.items():
if key in fields:
field: dataclasses.Field = fields[key]
key = field.name
spec = field.metadata.get("spec")
# Not a real key, an internal var on our dataclass
if not spec:
LOG.warning(f"Internal key {key!r}")
continue
# some kind of nested structure like sale_info
if issubclass(spec, SchemaBase):
obj_dict[key] = spec.from_llsd(val)
elif issubclass(spec, SchemaFieldSerializer):
obj_dict[key] = spec.from_llsd(val)
else:
raise ValueError(f"Unsupported spec for {key!r}, {spec!r}")
else:
LOG.warning(f"Unknown key {key!r}")
return cls._obj_from_dict(obj_dict)
def to_bytes(self) -> bytes:
return self.to_str().encode("utf8")
@@ -146,6 +193,28 @@ class SchemaBase(abc.ABC):
writer.seek(0)
return writer.read()
def to_llsd(self):
obj_dict = {}
for field_name, field in self._get_fields_dict(llsd=True).items():
spec = field.metadata.get("spec")
# Not meant to be serialized
if not spec:
continue
val = getattr(self, field.name)
if val is None:
continue
# Some kind of nested structure like sale_info
if isinstance(val, SchemaBase):
val = val.to_llsd()
elif issubclass(spec, SchemaFieldSerializer):
val = spec.to_llsd(val)
else:
raise ValueError(f"Bad inventory spec {spec!r}")
obj_dict[field_name] = val
return obj_dict
@abc.abstractmethod
def to_writer(self, writer: StringIO):
pass

View File

@@ -13,7 +13,7 @@ from xml.etree.ElementTree import parse as parse_etree
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.helpers import get_resource_filename
from hippolyzer.lib.base.legacy_inv import InventorySaleInfo, InventoryPermissions
from hippolyzer.lib.base.inventory import InventorySaleInfo, InventoryPermissions
from hippolyzer.lib.base.legacy_schema import SchemaBase, parse_schema_line, SchemaParsingError
from hippolyzer.lib.base.templates import WearableType

View File

@@ -1,7 +1,7 @@
import unittest
from hippolyzer.lib.base.datatypes import *
from hippolyzer.lib.base.legacy_inv import InventoryModel
from hippolyzer.lib.base.inventory import InventoryModel
from hippolyzer.lib.base.wearables import Wearable, VISUAL_PARAMS
SIMPLE_INV = """\tinv_object\t0
@@ -61,6 +61,51 @@ class TestLegacyInv(unittest.TestCase):
self.assertEqual(item.sale_info.sale_type, "not")
self.assertEqual(item.model, model)
def test_llsd_serialization(self):
model = InventoryModel.from_str(SIMPLE_INV)
self.assertEqual(
model.to_llsd(),
[
{
'name': 'Contents',
'obj_id': UUID('f4d91477-def1-487a-b4f3-6fa201c17376'),
'parent_id': UUID('00000000-0000-0000-0000-000000000000'),
'type': 'category'
},
{
'asset_id': UUID('00000000-0000-0000-0000-000000000000'),
'created_at': 1587367239,
'desc': '2020-04-20 04:20:39 lsl2 script',
'flags': b'\x00\x00\x00\x00',
'inv_type': 'script',
'item_id': UUID('dd163122-946b-44df-99f6-a6030e2b9597'),
'name': 'New Script',
'parent_id': UUID('f4d91477-def1-487a-b4f3-6fa201c17376'),
'permissions': {
'base_mask': 2147483647,
'creator_id': UUID('a2e76fcd-9360-4f6d-a924-000000000003'),
'everyone_mask': 0,
'group_id': UUID('00000000-0000-0000-0000-000000000000'),
'group_mask': 0,
'last_owner_id': UUID('a2e76fcd-9360-4f6d-a924-000000000003'),
'next_owner_mask': 581632,
'owner_id': UUID('a2e76fcd-9360-4f6d-a924-000000000003'),
'owner_mask': 2147483647
},
'sale_info': {
'sale_price': 10,
'sale_type': 'not'
},
'type': 'lsltext'
}
]
)
def test_llsd_legacy_equality(self):
model = InventoryModel.from_str(SIMPLE_INV)
new_model = InventoryModel.from_llsd(model.to_llsd())
self.assertEqual(model, new_model)
GIRL_NEXT_DOOR_SHAPE = """LLWearable version 22
Girl Next Door - C2 - med - Adam n Eve