Should help deal with automatic template reloading issues since mtime wasn't changing on `lib.proxy.templates`.
287 lines
11 KiB
Python
287 lines
11 KiB
Python
import ast
|
|
import base64
|
|
import importlib
|
|
import logging
|
|
import math
|
|
import os
|
|
import re
|
|
import uuid
|
|
from typing import *
|
|
|
|
import hippolyzer.lib.base.datatypes
|
|
from hippolyzer.lib.base.datatypes import *
|
|
import hippolyzer.lib.base.serialization as se
|
|
from hippolyzer.lib.base import llsd
|
|
from hippolyzer.lib.base.helpers import HippoPrettyPrinter
|
|
from hippolyzer.lib.base.message.message import Message, Block, PacketFlags
|
|
import hippolyzer.lib.base.templates as templates
|
|
from hippolyzer.lib.base.message.msgtypes import MsgBlockType
|
|
from hippolyzer.lib.base.message.template import MessageTemplate
|
|
from hippolyzer.lib.proxy.packets import Direction
|
|
|
|
_TEMPLATES_MTIME = os.stat(templates.__file__).st_mtime
|
|
|
|
|
|
def _maybe_reload_templates():
|
|
# Templates may be modified at runtime during development, check
|
|
# if they've changed since startup and reload if they have.
|
|
global _TEMPLATES_MTIME
|
|
templates_mtime = os.stat(templates.__file__).st_mtime
|
|
|
|
if _TEMPLATES_MTIME is None or _TEMPLATES_MTIME < templates_mtime:
|
|
print("Reloading templates")
|
|
try:
|
|
importlib.reload(templates) # type: ignore
|
|
_TEMPLATES_MTIME = templates_mtime
|
|
except:
|
|
logging.exception("Failed to reload templates!")
|
|
|
|
|
|
def _trunc_repr(val, max_len):
|
|
if isinstance(val, (uuid.UUID, TupleCoord)):
|
|
val = str(val)
|
|
repr_val = repr(val)
|
|
if isinstance(val, str):
|
|
repr_val = repr_val[1:-1]
|
|
if isinstance(val, bytes):
|
|
repr_val = repr_val[2:-1]
|
|
if len(repr_val) > max_len:
|
|
return repr_val[:max_len] + "…"
|
|
return repr_val
|
|
|
|
|
|
class VerbatimHumanVal(str):
|
|
pass
|
|
|
|
|
|
def _filtered_exports(mod):
|
|
return {k: getattr(mod, k) for k in mod.__all__}
|
|
|
|
|
|
def proxy_eval(eval_str: str, globals_=None, locals_=None):
|
|
return eval(
|
|
eval_str,
|
|
{
|
|
"llsd": llsd,
|
|
"base64": base64,
|
|
"math": math,
|
|
**_filtered_exports(hippolyzer.lib.base.datatypes),
|
|
**(globals_ or {})},
|
|
locals_
|
|
)
|
|
|
|
|
|
TextSpan = Tuple[int, int]
|
|
SpanDict = Dict[Tuple[Union[str, int], ...], TextSpan]
|
|
|
|
|
|
class SpannedString(str):
|
|
spans: SpanDict = {}
|
|
|
|
|
|
class ProxiedMessage(Message):
|
|
__slots__ = ("meta", "injected", "dropped", "direction")
|
|
|
|
def __init__(self, *args, direction=None, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.direction = direction if direction is not None else Direction.OUT
|
|
self.meta = {}
|
|
self.injected = False
|
|
self.dropped = False
|
|
_maybe_reload_templates()
|
|
|
|
def to_human_string(self, replacements=None, beautify=False,
|
|
template: Optional[MessageTemplate] = None) -> SpannedString:
|
|
replacements = replacements or {}
|
|
_maybe_reload_templates()
|
|
spans: SpanDict = {}
|
|
string = ""
|
|
if self.direction is not None:
|
|
string += f'{self.direction.name} '
|
|
string += self.name
|
|
if self.packet_id is not None:
|
|
string += f'\n# {self.packet_id}: {PacketFlags(self.send_flags)!r}'
|
|
string += f'{", DROPPED" if self.dropped else ""}{", INJECTED" if self.injected else ""}'
|
|
if self.extra:
|
|
string += f'\n# EXTRA: {self.extra!r}'
|
|
string += '\n\n'
|
|
|
|
for block_name, block_list in self.blocks.items():
|
|
block_suffix = ""
|
|
if template and template.get_block(block_name).block_type == MsgBlockType.MBT_VARIABLE:
|
|
block_suffix = ' # Variable'
|
|
for block_num, block in enumerate(block_list):
|
|
string += f"[{block_name}]{block_suffix}\n"
|
|
for var_name, val in block.items():
|
|
start_len = len(string)
|
|
string += self._format_var(block, var_name, val, replacements, beautify)
|
|
end_len = len(string)
|
|
# Store the spans for each var so we can highlight specific matches
|
|
spans[(self.name, block_name, block_num, var_name)] = (start_len, end_len)
|
|
string += "\n"
|
|
spanned = SpannedString(string)
|
|
spanned.spans = spans
|
|
return spanned
|
|
|
|
def _format_var(self, block, var_name, var_val, replacements, beautify=False):
|
|
string = ""
|
|
# Check if we have a more human-readable way to present this field
|
|
ser_key = (self.name, block.name, var_name)
|
|
serializer = se.SUBFIELD_SERIALIZERS.get(ser_key)
|
|
field_prefix = ""
|
|
if isinstance(var_val, VerbatimHumanVal):
|
|
var_data = var_val
|
|
elif isinstance(var_val, (uuid.UUID, TupleCoord)):
|
|
var_data = str(var_val)
|
|
elif isinstance(var_val, (str, bytes)) and not serializer:
|
|
var_data = self._multi_line_pformat(var_val)
|
|
else:
|
|
var_data = repr(var_val)
|
|
if serializer and beautify and not isinstance(var_val, VerbatimHumanVal):
|
|
try:
|
|
pretty_data = serializer.deserialize(block, var_val, pod=True)
|
|
if pretty_data is not se.UNSERIALIZABLE:
|
|
string += f" {var_name} =| {self._multi_line_pformat(pretty_data)}"
|
|
if serializer.AS_HEX and isinstance(var_val, int):
|
|
var_data = hex(var_val)
|
|
if serializer.ORIG_INLINE:
|
|
string += f" #{var_data}"
|
|
return string
|
|
else:
|
|
string += "\n"
|
|
# Human-readable version should be used, orig data is commented out
|
|
field_prefix = "#"
|
|
except:
|
|
logging.exception(f"Failed in subfield serializer {ser_key!r}")
|
|
if beautify:
|
|
if block.name == "AgentData":
|
|
if var_name == "AgentID" and var_val == replacements.get("AGENT_ID"):
|
|
var_data = "[[AGENT_ID]]"
|
|
elif var_name == "SessionID" and var_val == replacements.get("SESSION_ID"):
|
|
var_data = "[[SESSION_ID]]"
|
|
if "CircuitCode" in var_name or ("Code" in var_name and "Circuit" in block.name):
|
|
if var_val == replacements.get("CIRCUIT_CODE"):
|
|
var_data = "[[CIRCUIT_CODE]]"
|
|
string += f" {field_prefix}{var_name} = {var_data}"
|
|
return string
|
|
|
|
@staticmethod
|
|
def _multi_line_pformat(val):
|
|
printer = HippoPrettyPrinter(width=100)
|
|
val = printer.pformat(val)
|
|
newstr = ""
|
|
# Now we need to rebuild this to add in the appropriate
|
|
# line continuations.
|
|
lines = list(val.splitlines())
|
|
first_line = True
|
|
while lines:
|
|
line = lines.pop(0)
|
|
prefix = ""
|
|
suffix = ""
|
|
if first_line:
|
|
first_line = False
|
|
else:
|
|
prefix = " "
|
|
|
|
if lines:
|
|
suffix = " \\\n"
|
|
newstr += f"{prefix}{line}{suffix}"
|
|
return newstr
|
|
|
|
def to_summary(self):
|
|
string = ""
|
|
for block_name, block_list in self.blocks.items():
|
|
for block in block_list:
|
|
for var_name, val in block.items():
|
|
if block.name == "AgentData" and var_name in ("AgentID", "SessionID"):
|
|
continue
|
|
if string:
|
|
string += ", "
|
|
string += f"{var_name}={_trunc_repr(val, 10)}"
|
|
return string
|
|
|
|
@classmethod
|
|
def from_human_string(cls, string, replacements=None, env=None, safe=True):
|
|
_maybe_reload_templates()
|
|
replacements = replacements or {}
|
|
env = env or {}
|
|
first_line = True
|
|
cur_block = None
|
|
msg = None
|
|
lines = [x.strip() for x in string.split("\n") if x.strip()]
|
|
while lines:
|
|
line = lines.pop(0)
|
|
# Ignore comment / blank lines
|
|
if re.match(r"^\s*(#.*)?$", line):
|
|
continue
|
|
|
|
if first_line:
|
|
direction, message_name = line.split(" ", 1)
|
|
msg = ProxiedMessage(message_name)
|
|
msg.direction = Direction[direction.upper()]
|
|
first_line = False
|
|
continue
|
|
|
|
if line.startswith("["):
|
|
cur_block = Block(re.search(r"\w+", line).group(0))
|
|
msg.add_block(cur_block)
|
|
else:
|
|
expr_match = re.match(r"^\s*(\w+)\s*(=[|$]*)\s*(.*)$", line)
|
|
var_name, operator, var_val = expr_match.groups()
|
|
# Multiline, eat all the line continuations
|
|
while var_val.endswith("\\"):
|
|
var_val = var_val[:-1].rstrip()
|
|
if lines:
|
|
var_val += lines.pop(0)
|
|
|
|
plain = operator == "="
|
|
packed = "|" in operator
|
|
evaled = "$" in operator
|
|
|
|
if evaled and safe:
|
|
raise ValueError("Can't use eval operator in safe mode")
|
|
|
|
if plain:
|
|
replacement_match = re.match(r"\[\[(\w+)]]", var_val)
|
|
if replacement_match:
|
|
replacement_name = replacement_match.group(1)
|
|
var_val = replacements.get(replacement_name)
|
|
if var_val is None:
|
|
raise ValueError("Tried to use undefined replacement %s" % replacement_name)
|
|
if callable(var_val):
|
|
var_val = var_val()
|
|
# alternate way of specifying a vector or quat
|
|
elif var_val.startswith("<"):
|
|
var_val = re.sub(r"[<>]", "", var_val)
|
|
var_val = tuple(float(x) for x in var_val.split(","))
|
|
# UUID-ish
|
|
elif re.match(r"\A\w+-\w+-.*", var_val):
|
|
var_val = UUID(var_val)
|
|
else:
|
|
var_val = ast.literal_eval(var_val)
|
|
|
|
# Normally gross, but necessary for expressiveness in built messages
|
|
# unless a metalanguage is added.
|
|
if evaled:
|
|
var_val = proxy_eval(
|
|
var_val,
|
|
globals_={**env, **replacements},
|
|
locals_={"block": cur_block}
|
|
)
|
|
# Using an packer specific to this message
|
|
if packed:
|
|
if not evaled:
|
|
var_val = ast.literal_eval(var_val)
|
|
ser_key = (msg.name, cur_block.name, var_name)
|
|
serializer = se.SUBFIELD_SERIALIZERS.get(ser_key)
|
|
if not serializer:
|
|
raise KeyError(f"No subfield serializer for {ser_key!r}")
|
|
var_val = serializer.serialize(cur_block, var_val)
|
|
|
|
cur_block[var_name] = var_val
|
|
return msg
|
|
|
|
def _args_repr(self, pretty=False):
|
|
base = super()._args_repr(pretty=pretty)
|
|
return f"{base}, direction=Direction.{self.direction.name}"
|