Faster message logging, improved queue usage
This commit is contained in:
@@ -790,7 +790,6 @@ def gui_main():
|
||||
window = ProxyGUI()
|
||||
timer = QtCore.QTimer(app)
|
||||
timer.timeout.connect(window.sessionManager.checkRegions)
|
||||
timer.timeout.connect(window.model.append_queued_entries)
|
||||
timer.start(100)
|
||||
signal.signal(signal.SIGINT, lambda *args: QtWidgets.QApplication.quit())
|
||||
window.show()
|
||||
|
||||
@@ -79,8 +79,15 @@ class MessageHandler(Generic[_T]):
|
||||
|
||||
notifiers = self._subscribe_all(message_names, _handler_wrapper, predicate=predicate)
|
||||
|
||||
async def _get_wrapper():
|
||||
try:
|
||||
return await msg_queue.get()
|
||||
finally:
|
||||
# Consumption is completion
|
||||
msg_queue.task_done()
|
||||
|
||||
try:
|
||||
yield msg_queue.get
|
||||
yield _get_wrapper
|
||||
finally:
|
||||
for n in notifiers:
|
||||
n.unsubscribe(_handler_wrapper)
|
||||
|
||||
@@ -6,7 +6,6 @@ import fnmatch
|
||||
import io
|
||||
import logging
|
||||
import pickle
|
||||
import queue
|
||||
import re
|
||||
import typing
|
||||
import weakref
|
||||
@@ -42,10 +41,9 @@ class FilteringMessageLogger(BaseMessageLogger):
|
||||
def __init__(self):
|
||||
BaseMessageLogger.__init__(self)
|
||||
self._raw_entries = collections.deque(maxlen=2000)
|
||||
self._queued_entries = queue.Queue()
|
||||
self._filtered_entries = []
|
||||
self._filtered_entries: typing.List[AbstractMessageLogEntry] = []
|
||||
self._paused = False
|
||||
self.filter: typing.Optional[BaseFilterNode] = None
|
||||
self.filter: BaseFilterNode = compile_filter("")
|
||||
|
||||
def set_filter(self, filter_str: str):
|
||||
self.filter = compile_filter(filter_str)
|
||||
@@ -65,7 +63,7 @@ class FilteringMessageLogger(BaseMessageLogger):
|
||||
def log_lludp_message(self, session: Session, region: ProxiedRegion, message: ProxiedMessage):
|
||||
if self._paused:
|
||||
return
|
||||
self.queue_log_entry(LLUDPMessageLogEntry(message, region, session))
|
||||
self._add_log_entry(LLUDPMessageLogEntry(message, region, session))
|
||||
|
||||
def log_http_response(self, flow: HippoHTTPFlow):
|
||||
if self._paused:
|
||||
@@ -73,12 +71,12 @@ class FilteringMessageLogger(BaseMessageLogger):
|
||||
# These are huge, let's not log them for now.
|
||||
if flow.cap_data and flow.cap_data.asset_server_cap:
|
||||
return
|
||||
self.queue_log_entry(HTTPMessageLogEntry(flow))
|
||||
self._add_log_entry(HTTPMessageLogEntry(flow))
|
||||
|
||||
def log_eq_event(self, session: Session, region: ProxiedRegion, event: dict):
|
||||
if self._paused:
|
||||
return
|
||||
self.queue_log_entry(EQMessageLogEntry(event, region, session))
|
||||
self._add_log_entry(EQMessageLogEntry(event, region, session))
|
||||
|
||||
# Hooks that Qt models will want to implement
|
||||
def _begin_insert(self, insert_idx: int):
|
||||
@@ -93,36 +91,29 @@ class FilteringMessageLogger(BaseMessageLogger):
|
||||
def _end_reset(self):
|
||||
pass
|
||||
|
||||
def append_queued_entries(self):
|
||||
while not self._queued_entries.empty():
|
||||
entry: AbstractMessageLogEntry = self._queued_entries.get(block=False)
|
||||
def _add_log_entry(self, entry: AbstractMessageLogEntry):
|
||||
try:
|
||||
# Paused, throw it away.
|
||||
if self._paused:
|
||||
continue
|
||||
return
|
||||
self._raw_entries.append(entry)
|
||||
try:
|
||||
if self.filter.match(entry):
|
||||
next_idx = len(self._filtered_entries)
|
||||
self._begin_insert(next_idx)
|
||||
self._filtered_entries.append(entry)
|
||||
self._end_insert()
|
||||
if self.filter.match(entry):
|
||||
next_idx = len(self._filtered_entries)
|
||||
self._begin_insert(next_idx)
|
||||
self._filtered_entries.append(entry)
|
||||
self._end_insert()
|
||||
|
||||
entry.cache_summary()
|
||||
# In the common case we don't need to keep around the serialization
|
||||
# caches anymore. If the filter changes, the caches will be repopulated
|
||||
# as necessary.
|
||||
entry.freeze()
|
||||
except Exception:
|
||||
LOG.exception("Failed to filter queued message")
|
||||
|
||||
def queue_log_entry(self, entry: AbstractMessageLogEntry):
|
||||
self._queued_entries.put(entry, block=False)
|
||||
entry.cache_summary()
|
||||
# In the common case we don't need to keep around the serialization
|
||||
# caches anymore. If the filter changes, the caches will be repopulated
|
||||
# as necessary.
|
||||
entry.freeze()
|
||||
except Exception:
|
||||
LOG.exception("Failed to filter queued message")
|
||||
|
||||
def clear(self):
|
||||
self._begin_reset()
|
||||
self._filtered_entries.clear()
|
||||
while not self._queued_entries.empty():
|
||||
self._queued_entries.get(block=False)
|
||||
self._raw_entries.clear()
|
||||
self._end_reset()
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from hippolyzer.lib.base.objects import Object
|
||||
from hippolyzer.lib.proxy.addon_utils import BaseAddon
|
||||
from hippolyzer.lib.proxy.addons import AddonManager
|
||||
from hippolyzer.lib.proxy.message import ProxiedMessage
|
||||
from hippolyzer.lib.proxy.message_logger import FilteringMessageLogger
|
||||
from hippolyzer.lib.proxy.packets import ProxiedUDPPacket, Direction
|
||||
from hippolyzer.lib.proxy.region import ProxiedRegion
|
||||
from hippolyzer.lib.proxy.sessions import Session
|
||||
@@ -35,6 +36,12 @@ class MockAddon(BaseAddon):
|
||||
self.events.append(("object_update", session.id, region.circuit_addr, obj.LocalID))
|
||||
|
||||
|
||||
class SimpleMessageLogger(FilteringMessageLogger):
|
||||
@property
|
||||
def entries(self):
|
||||
return self._filtered_entries
|
||||
|
||||
|
||||
class LLUDPIntegrationTests(BaseIntegrationTest):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
@@ -169,3 +176,14 @@ class LLUDPIntegrationTests(BaseIntegrationTest):
|
||||
obj = self.session.regions[0].objects.lookup_localid(1234)
|
||||
self.assertIsInstance(obj.TextureEntry, lazy_object_proxy.Proxy)
|
||||
self.assertEqual(obj.TextureEntry.Textures[None], UUID("89556747-24cb-43ed-920b-47caed15465f"))
|
||||
|
||||
async def test_message_logger(self):
|
||||
message_logger = SimpleMessageLogger()
|
||||
self.session_manager.message_logger = message_logger
|
||||
self._setup_circuit()
|
||||
obj_update = self._make_objectupdate_compressed(1234)
|
||||
self.protocol.datagram_received(obj_update, self.region_addr)
|
||||
await self._wait_drained()
|
||||
entries = message_logger.entries
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].name, "ObjectUpdateCompressed")
|
||||
|
||||
Reference in New Issue
Block a user