diff --git a/hippolyzer/apps/proxy_gui.py b/hippolyzer/apps/proxy_gui.py index 550e7b6..4eff290 100644 --- a/hippolyzer/apps/proxy_gui.py +++ b/hippolyzer/apps/proxy_gui.py @@ -790,7 +790,6 @@ def gui_main(): window = ProxyGUI() timer = QtCore.QTimer(app) timer.timeout.connect(window.sessionManager.checkRegions) - timer.timeout.connect(window.model.append_queued_entries) timer.start(100) signal.signal(signal.SIGINT, lambda *args: QtWidgets.QApplication.quit()) window.show() diff --git a/hippolyzer/lib/base/message/message_handler.py b/hippolyzer/lib/base/message/message_handler.py index 6bf2ad7..27b0d69 100644 --- a/hippolyzer/lib/base/message/message_handler.py +++ b/hippolyzer/lib/base/message/message_handler.py @@ -79,8 +79,15 @@ class MessageHandler(Generic[_T]): notifiers = self._subscribe_all(message_names, _handler_wrapper, predicate=predicate) + async def _get_wrapper(): + try: + return await msg_queue.get() + finally: + # Consumption is completion + msg_queue.task_done() + try: - yield msg_queue.get + yield _get_wrapper finally: for n in notifiers: n.unsubscribe(_handler_wrapper) diff --git a/hippolyzer/lib/proxy/message_logger.py b/hippolyzer/lib/proxy/message_logger.py index 562cd38..cee562b 100644 --- a/hippolyzer/lib/proxy/message_logger.py +++ b/hippolyzer/lib/proxy/message_logger.py @@ -6,7 +6,6 @@ import fnmatch import io import logging import pickle -import queue import re import typing import weakref @@ -42,10 +41,9 @@ class FilteringMessageLogger(BaseMessageLogger): def __init__(self): BaseMessageLogger.__init__(self) self._raw_entries = collections.deque(maxlen=2000) - self._queued_entries = queue.Queue() - self._filtered_entries = [] + self._filtered_entries: typing.List[AbstractMessageLogEntry] = [] self._paused = False - self.filter: typing.Optional[BaseFilterNode] = None + self.filter: BaseFilterNode = compile_filter("") def set_filter(self, filter_str: str): self.filter = compile_filter(filter_str) @@ -65,7 +63,7 @@ class FilteringMessageLogger(BaseMessageLogger): def log_lludp_message(self, session: Session, region: ProxiedRegion, message: ProxiedMessage): if self._paused: return - self.queue_log_entry(LLUDPMessageLogEntry(message, region, session)) + self._add_log_entry(LLUDPMessageLogEntry(message, region, session)) def log_http_response(self, flow: HippoHTTPFlow): if self._paused: @@ -73,12 +71,12 @@ class FilteringMessageLogger(BaseMessageLogger): # These are huge, let's not log them for now. if flow.cap_data and flow.cap_data.asset_server_cap: return - self.queue_log_entry(HTTPMessageLogEntry(flow)) + self._add_log_entry(HTTPMessageLogEntry(flow)) def log_eq_event(self, session: Session, region: ProxiedRegion, event: dict): if self._paused: return - self.queue_log_entry(EQMessageLogEntry(event, region, session)) + self._add_log_entry(EQMessageLogEntry(event, region, session)) # Hooks that Qt models will want to implement def _begin_insert(self, insert_idx: int): @@ -93,36 +91,29 @@ class FilteringMessageLogger(BaseMessageLogger): def _end_reset(self): pass - def append_queued_entries(self): - while not self._queued_entries.empty(): - entry: AbstractMessageLogEntry = self._queued_entries.get(block=False) + def _add_log_entry(self, entry: AbstractMessageLogEntry): + try: # Paused, throw it away. if self._paused: - continue + return self._raw_entries.append(entry) - try: - if self.filter.match(entry): - next_idx = len(self._filtered_entries) - self._begin_insert(next_idx) - self._filtered_entries.append(entry) - self._end_insert() + if self.filter.match(entry): + next_idx = len(self._filtered_entries) + self._begin_insert(next_idx) + self._filtered_entries.append(entry) + self._end_insert() - entry.cache_summary() - # In the common case we don't need to keep around the serialization - # caches anymore. If the filter changes, the caches will be repopulated - # as necessary. - entry.freeze() - except Exception: - LOG.exception("Failed to filter queued message") - - def queue_log_entry(self, entry: AbstractMessageLogEntry): - self._queued_entries.put(entry, block=False) + entry.cache_summary() + # In the common case we don't need to keep around the serialization + # caches anymore. If the filter changes, the caches will be repopulated + # as necessary. + entry.freeze() + except Exception: + LOG.exception("Failed to filter queued message") def clear(self): self._begin_reset() self._filtered_entries.clear() - while not self._queued_entries.empty(): - self._queued_entries.get(block=False) self._raw_entries.clear() self._end_reset() diff --git a/tests/proxy/integration/test_lludp.py b/tests/proxy/integration/test_lludp.py index 1297a3b..51088a5 100644 --- a/tests/proxy/integration/test_lludp.py +++ b/tests/proxy/integration/test_lludp.py @@ -13,6 +13,7 @@ from hippolyzer.lib.base.objects import Object from hippolyzer.lib.proxy.addon_utils import BaseAddon from hippolyzer.lib.proxy.addons import AddonManager from hippolyzer.lib.proxy.message import ProxiedMessage +from hippolyzer.lib.proxy.message_logger import FilteringMessageLogger from hippolyzer.lib.proxy.packets import ProxiedUDPPacket, Direction from hippolyzer.lib.proxy.region import ProxiedRegion from hippolyzer.lib.proxy.sessions import Session @@ -35,6 +36,12 @@ class MockAddon(BaseAddon): self.events.append(("object_update", session.id, region.circuit_addr, obj.LocalID)) +class SimpleMessageLogger(FilteringMessageLogger): + @property + def entries(self): + return self._filtered_entries + + class LLUDPIntegrationTests(BaseIntegrationTest): def setUp(self) -> None: super().setUp() @@ -169,3 +176,14 @@ class LLUDPIntegrationTests(BaseIntegrationTest): obj = self.session.regions[0].objects.lookup_localid(1234) self.assertIsInstance(obj.TextureEntry, lazy_object_proxy.Proxy) self.assertEqual(obj.TextureEntry.Textures[None], UUID("89556747-24cb-43ed-920b-47caed15465f")) + + async def test_message_logger(self): + message_logger = SimpleMessageLogger() + self.session_manager.message_logger = message_logger + self._setup_circuit() + obj_update = self._make_objectupdate_compressed(1234) + self.protocol.datagram_received(obj_update, self.region_addr) + await self._wait_drained() + entries = message_logger.entries + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0].name, "ObjectUpdateCompressed")