Start supporting enums in inventory schema
This commit is contained in:
@@ -8,6 +8,8 @@ from __future__ import annotations
|
||||
import abc
|
||||
import dataclasses
|
||||
import datetime as dt
|
||||
import enum
|
||||
import inspect
|
||||
import logging
|
||||
import struct
|
||||
import typing
|
||||
@@ -47,6 +49,39 @@ class SchemaFlagField(SchemaHexInt):
|
||||
return struct.pack("!I", val)
|
||||
|
||||
|
||||
class LegacyIntEnum(enum.IntEnum):
|
||||
_ignore_ = ['_LEGACY_NAMES']
|
||||
|
||||
@classmethod
|
||||
def _get_legacy_names(cls) -> Tuple[str, ...]:
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def from_legacy_name(cls, name: str):
|
||||
return cls(cls._get_legacy_names().index(name))
|
||||
|
||||
def to_legacy_name(self) -> str:
|
||||
return self._get_legacy_names()[self.value]
|
||||
|
||||
|
||||
class SchemaEnumField(SchemaStr, Generic[_T]):
|
||||
def __init__(self, enum_cls: Type[LegacyIntEnum]):
|
||||
super().__init__()
|
||||
self._enum_cls = enum_cls
|
||||
|
||||
def deserialize(self, val: str) -> _T:
|
||||
return self._enum_cls.from_legacy_name(val)
|
||||
|
||||
def serialize(self, val: _T) -> str:
|
||||
return self._enum_cls.to_legacy_name(val)
|
||||
|
||||
def from_llsd(self, val: str) -> _T:
|
||||
return self.deserialize(val)
|
||||
|
||||
def to_llsd(self, val: _T) -> str:
|
||||
return self.serialize(val)
|
||||
|
||||
|
||||
def _yield_schema_tokens(reader: StringIO):
|
||||
in_bracket = False
|
||||
# empty str == EOF in Python
|
||||
@@ -100,10 +135,14 @@ class InventoryBase(SchemaBase):
|
||||
if not spec:
|
||||
LOG.warning(f"Internal key {key!r}")
|
||||
continue
|
||||
|
||||
spec_cls = spec
|
||||
if not inspect.isclass(spec_cls):
|
||||
spec_cls = spec_cls.__class__
|
||||
# some kind of nested structure like sale_info
|
||||
if issubclass(spec, SchemaBase):
|
||||
if issubclass(spec_cls, SchemaBase):
|
||||
obj_dict[key] = spec.from_reader(reader)
|
||||
elif issubclass(spec, SchemaFieldSerializer):
|
||||
elif issubclass(spec_cls, SchemaFieldSerializer):
|
||||
obj_dict[key] = spec.deserialize(val)
|
||||
else:
|
||||
raise ValueError(f"Unsupported spec for {key!r}, {spec!r}")
|
||||
@@ -138,10 +177,13 @@ class InventoryBase(SchemaBase):
|
||||
if val is None and not field.metadata.get("include_none"):
|
||||
continue
|
||||
|
||||
spec_cls = spec
|
||||
if not inspect.isclass(spec_cls):
|
||||
spec_cls = spec_cls.__class__
|
||||
# Some kind of nested structure like sale_info
|
||||
if isinstance(val, SchemaBase):
|
||||
val.to_writer(writer)
|
||||
elif issubclass(spec, SchemaFieldSerializer):
|
||||
elif issubclass(spec_cls, SchemaFieldSerializer):
|
||||
writer.write(f"\t\t{field_name}\t{spec.serialize(val)}\n")
|
||||
else:
|
||||
raise ValueError(f"Bad inventory spec {spec!r}")
|
||||
@@ -296,12 +338,23 @@ class InventoryPermissions(InventoryBase):
|
||||
is_owner_group: Optional[int] = schema_field(SchemaInt, default=None, llsd_only=True)
|
||||
|
||||
|
||||
class SaleType(LegacyIntEnum):
|
||||
NOT = 0
|
||||
ORIGINAL = 1
|
||||
COPY = 2
|
||||
CONTENTS = 3
|
||||
|
||||
@classmethod
|
||||
def _get_legacy_names(cls) -> Tuple[str, ...]:
|
||||
return "not", "orig", "copy", "cntn"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class InventorySaleInfo(InventoryBase):
|
||||
SCHEMA_NAME: ClassVar[str] = "sale_info"
|
||||
|
||||
# TODO: Not a string in AIS
|
||||
sale_type: str = schema_field(SchemaStr)
|
||||
sale_type: SaleType = schema_field(SchemaEnumField(SaleType))
|
||||
sale_price: int = schema_field(SchemaInt)
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import abc
|
||||
import calendar
|
||||
import dataclasses
|
||||
import datetime as dt
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
from io import StringIO
|
||||
@@ -124,7 +125,10 @@ class SchemaLLSD(SchemaFieldSerializer[_T]):
|
||||
return llsd.format_xml(val).split(b">", 1)[1].decode("utf8") + "\n|"
|
||||
|
||||
|
||||
def schema_field(spec: Type[Union[SchemaBase, SchemaFieldSerializer]], *, default=dataclasses.MISSING, init=True,
|
||||
_SCHEMA_SPEC = Union[Type[Union["SchemaBase", SchemaFieldSerializer]], SchemaFieldSerializer]
|
||||
|
||||
|
||||
def schema_field(spec: _SCHEMA_SPEC, *, default=dataclasses.MISSING, init=True,
|
||||
repr=True, hash=None, compare=True, llsd_name=None, llsd_only=False,
|
||||
include_none=False) -> dataclasses.Field: # noqa
|
||||
"""Describe a field in the inventory schema and the shape of its value"""
|
||||
@@ -188,10 +192,15 @@ class SchemaBase(abc.ABC):
|
||||
if not spec:
|
||||
LOG.warning(f"Internal key {key!r}")
|
||||
continue
|
||||
|
||||
spec_cls = spec
|
||||
if not inspect.isclass(spec_cls):
|
||||
spec_cls = spec_cls.__class__
|
||||
|
||||
# some kind of nested structure like sale_info
|
||||
if issubclass(spec, SchemaBase):
|
||||
if issubclass(spec_cls, SchemaBase):
|
||||
obj_dict[key] = spec.from_llsd(val)
|
||||
elif issubclass(spec, SchemaFieldSerializer):
|
||||
elif issubclass(spec_cls, SchemaFieldSerializer):
|
||||
obj_dict[key] = spec.from_llsd(val)
|
||||
else:
|
||||
raise ValueError(f"Unsupported spec for {key!r}, {spec!r}")
|
||||
@@ -220,10 +229,14 @@ class SchemaBase(abc.ABC):
|
||||
if val is None:
|
||||
continue
|
||||
|
||||
spec_cls = spec
|
||||
if not inspect.isclass(spec_cls):
|
||||
spec_cls = spec_cls.__class__
|
||||
|
||||
# Some kind of nested structure like sale_info
|
||||
if isinstance(val, SchemaBase):
|
||||
val = val.to_llsd()
|
||||
elif issubclass(spec, SchemaFieldSerializer):
|
||||
elif issubclass(spec_cls, SchemaFieldSerializer):
|
||||
val = spec.to_llsd(val)
|
||||
else:
|
||||
raise ValueError(f"Bad inventory spec {spec!r}")
|
||||
|
||||
@@ -2,7 +2,7 @@ import copy
|
||||
import unittest
|
||||
|
||||
from hippolyzer.lib.base.datatypes import *
|
||||
from hippolyzer.lib.base.inventory import InventoryModel
|
||||
from hippolyzer.lib.base.inventory import InventoryModel, SaleType
|
||||
from hippolyzer.lib.base.wearables import Wearable, VISUAL_PARAMS
|
||||
|
||||
SIMPLE_INV = """\tinv_object\t0
|
||||
@@ -64,7 +64,7 @@ class TestLegacyInv(unittest.TestCase):
|
||||
def test_item_access(self):
|
||||
item = self.model.nodes[UUID('dd163122-946b-44df-99f6-a6030e2b9597')]
|
||||
self.assertEqual(item.name, "New Script")
|
||||
self.assertEqual(item.sale_info.sale_type, "not")
|
||||
self.assertEqual(item.sale_info.sale_type, SaleType.NOT)
|
||||
self.assertDictEqual(item.metadata, {"experience": UUID("a2e76fcd-9360-4f6d-a924-000000000003")})
|
||||
self.assertEqual(item.model, self.model)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user