Files
Hippolyzer/hippolyzer/lib/proxy/inventory_manager.py
2024-02-20 04:56:25 +00:00

108 lines
4.6 KiB
Python

import asyncio
import datetime as dt
import functools
import logging
from typing import *
from hippolyzer.lib.base import llsd
from hippolyzer.lib.base.helpers import get_mtime, create_logged_task
from hippolyzer.lib.client.inventory_manager import InventoryManager
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
from hippolyzer.lib.proxy.viewer_settings import iter_viewer_cache_dirs
if TYPE_CHECKING:
from hippolyzer.lib.proxy.sessions import Session
LOG = logging.getLogger(__name__)
class ProxyInventoryManager(InventoryManager):
_session: "Session"
def __init__(self, session: "Session"):
# These handlers all need their processing deferred until the cache has been loaded.
# Since cache is loaded asynchronously, the viewer may get ahead of us due to parsing
# the cache faster and start requesting inventory details we can't do anything with yet.
self._handle_update_create_inventory_item = self._wrap_with_cache_defer(
self._handle_update_create_inventory_item
)
self._handle_remove_inventory_item = self._wrap_with_cache_defer(
self._handle_remove_inventory_item
)
self._handle_remove_inventory_folder = self._wrap_with_cache_defer(
self._handle_remove_inventory_folder
)
self._handle_bulk_update_inventory = self._wrap_with_cache_defer(
self._handle_bulk_update_inventory
)
self._handle_move_inventory_item = self._wrap_with_cache_defer(
self._handle_move_inventory_item
)
self.process_aisv3_response = self._wrap_with_cache_defer(
self.process_aisv3_response
)
# Base constructor after, because it registers handlers to specific methods, which need to
# be wrapped before we call they're registered. Handlers are registered by method reference,
# not by name!
super().__init__(session)
session.http_message_handler.subscribe("InventoryAPIv3", self._handle_aisv3_flow)
newest_cache = None
newest_timestamp = dt.datetime(year=1970, month=1, day=1, tzinfo=dt.timezone.utc)
# So consumers know when the inventory should be complete
self.cache_loaded: asyncio.Event = asyncio.Event()
self._cache_deferred_calls: List[Tuple[Callable[..., None], Tuple]] = []
# Look for the newest version of the cached inventory and use that.
# Not foolproof, but close enough if we're not sure what viewer is being used.
for cache_dir in iter_viewer_cache_dirs():
inv_cache_path = cache_dir / (str(session.agent_id) + ".inv.llsd.gz")
if inv_cache_path.exists():
mod = get_mtime(inv_cache_path)
if not mod:
continue
mod_ts = dt.datetime.fromtimestamp(mod, dt.timezone.utc)
if mod_ts <= newest_timestamp:
continue
newest_cache = inv_cache_path
if newest_cache:
cache_load_fut = asyncio.ensure_future(asyncio.to_thread(self.load_cache, newest_cache))
# Meh. Don't care if it fails.
cache_load_fut.add_done_callback(lambda *args: self.cache_loaded.set())
create_logged_task(self._apply_deferred_after_loaded(), "Apply deferred inventory", LOG)
else:
self.cache_loaded.set()
async def _apply_deferred_after_loaded(self):
await self.cache_loaded.wait()
LOG.info("Applying deferred inventory calls")
deferred_calls = self._cache_deferred_calls[:]
self._cache_deferred_calls.clear()
for func, args in deferred_calls:
try:
func(*args)
except:
LOG.exception("Failed to apply deferred inventory call")
def _wrap_with_cache_defer(self, func: Callable[..., None]):
@functools.wraps(func)
def wrapped(*inner_args):
if not self.cache_loaded.is_set():
self._cache_deferred_calls.append((func, inner_args))
else:
func(*inner_args)
return wrapped
def _handle_aisv3_flow(self, flow: HippoHTTPFlow):
if flow.response.status_code < 200 or flow.response.status_code > 300:
# Probably not a success
return
content_type = flow.response.headers.get("Content-Type", "")
if "llsd" not in content_type:
# Okay, probably still some kind of error...
return
# Try and add anything from the response into the model
self.process_aisv3_response(llsd.parse(flow.response.content))