diff --git a/hippolyzer/apps/proxy_gui.py b/hippolyzer/apps/proxy_gui.py index 1479b03..e5be11f 100644 --- a/hippolyzer/apps/proxy_gui.py +++ b/hippolyzer/apps/proxy_gui.py @@ -24,7 +24,7 @@ from hippolyzer.apps.model import MessageLogModel, MessageLogHeader, RegionListM from hippolyzer.apps.proxy import start_proxy from hippolyzer.lib.base import llsd from hippolyzer.lib.base.datatypes import UUID -from hippolyzer.lib.base.helpers import bytes_unescape, bytes_escape, get_resource_filename +from hippolyzer.lib.base.helpers import bytes_unescape, bytes_escape, get_resource_filename, create_logged_task from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.message.message_formatting import ( @@ -826,7 +826,7 @@ class MessageBuilderWindow(QtWidgets.QMainWindow): # enough for the full response to pass through the proxy await resp.read() - asyncio.create_task(_send_request()) + create_logged_task(_send_request(), "Send HTTP Request") class AddonDialog(QtWidgets.QDialog): diff --git a/hippolyzer/lib/base/events.py b/hippolyzer/lib/base/events.py index 65da140..c4f8b91 100644 --- a/hippolyzer/lib/base/events.py +++ b/hippolyzer/lib/base/events.py @@ -21,6 +21,8 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import asyncio import logging +from hippolyzer.lib.base.helpers import create_logged_task + LOG = logging.getLogger(__name__) @@ -67,8 +69,7 @@ class Event: unsubscribe = await handler(args, *inner_args, **kwargs) if unsubscribe: _ = self.unsubscribe(handler, *inner_args, **kwargs) - task = asyncio.create_task(_run_handler_wrapper()) - task.add_done_callback(self._log_task_failures) + create_logged_task(_run_handler_wrapper(), self.name, LOG) else: try: if handler(args, *inner_args, **kwargs) and not one_shot: @@ -77,10 +78,6 @@ class Event: # One handler failing shouldn't prevent notification of other handlers. LOG.exception(f"Failed in handler for {self.name}") - def _log_task_failures(self, task: asyncio.Task): - if task.exception(): - LOG.exception(f"Failed in handler for {self.name}", exc_info=task.exception()) - def __len__(self): return len(self.subscribers) diff --git a/hippolyzer/lib/base/helpers.py b/hippolyzer/lib/base/helpers.py index d3517a4..2366c11 100644 --- a/hippolyzer/lib/base/helpers.py +++ b/hippolyzer/lib/base/helpers.py @@ -1,7 +1,9 @@ from __future__ import annotations +import asyncio import codecs import functools +import logging import os import lazy_object_proxy @@ -165,3 +167,31 @@ def get_mtime(path): return os.stat(path).st_mtime except: return None + + +def fut_logger(name: str, logger: logging.Logger, fut: asyncio.Future, *args) -> None: + """Callback suitable for exception logging in `Future.add_done_callback()`""" + if fut.exception(): + if isinstance(fut.exception(), asyncio.CancelledError): + # Don't really care if the task was just cancelled + return + logger.exception(f"Failed in task for {name}", exc_info=fut.exception()) + + +def add_future_logger( + fut: asyncio.Future, + name: Optional[str] = None, + logger: Optional[logging.Logger] = None, +): + """Add a logger to Futures that will never be directly `await`ed, logging exceptions""" + fut.add_done_callback(functools.partial(fut_logger, name, logger or logging.getLogger())) + + +def create_logged_task( + coro: Coroutine, + name: Optional[str] = None, + logger: Optional[logging.Logger] = None, +) -> asyncio.Task: + task = asyncio.create_task(coro, name=name) + add_future_logger(task, name, logger) + return task diff --git a/hippolyzer/lib/base/transfer_manager.py b/hippolyzer/lib/base/transfer_manager.py index fd384a3..d164fff 100644 --- a/hippolyzer/lib/base/transfer_manager.py +++ b/hippolyzer/lib/base/transfer_manager.py @@ -8,6 +8,7 @@ import dataclasses from typing import * from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.helpers import create_logged_task from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.message.circuit import ConnectionHolder from hippolyzer.lib.base.message.msgtypes import PacketFlags @@ -108,7 +109,7 @@ class TransferManager: flags=PacketFlags.RELIABLE, )) transfer = Transfer(transfer_id) - asyncio.create_task(self._pump_transfer_replies(transfer)) + create_logged_task(self._pump_transfer_replies(transfer), "Transfer Pump") return transfer async def _pump_transfer_replies(self, transfer: Transfer): diff --git a/hippolyzer/lib/base/xfer_manager.py b/hippolyzer/lib/base/xfer_manager.py index dd3393f..66784a0 100644 --- a/hippolyzer/lib/base/xfer_manager.py +++ b/hippolyzer/lib/base/xfer_manager.py @@ -9,6 +9,7 @@ import random from typing import * from hippolyzer.lib.base.datatypes import UUID, RawBytes +from hippolyzer.lib.base.helpers import create_logged_task from hippolyzer.lib.base.message.data_packer import TemplateDataPacker from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.message.msgtypes import MsgType, PacketFlags @@ -125,7 +126,7 @@ class XferManager: direction=direction, )) xfer = Xfer(xfer_id, direction=direction, turbo=turbo) - asyncio.create_task(self._pump_xfer_replies(xfer)) + create_logged_task(self._pump_xfer_replies(xfer), "Xfer Pump") return xfer async def _pump_xfer_replies(self, xfer: Xfer): diff --git a/hippolyzer/lib/client/hippo_client.py b/hippolyzer/lib/client/hippo_client.py index 8a0a79a..21f1c7f 100644 --- a/hippolyzer/lib/client/hippo_client.py +++ b/hippolyzer/lib/client/hippo_client.py @@ -13,7 +13,7 @@ import aiohttp import multidict from hippolyzer.lib.base.datatypes import Vector3, StringEnum -from hippolyzer.lib.base.helpers import proxify, get_resource_filename +from hippolyzer.lib.base.helpers import proxify, get_resource_filename, create_logged_task from hippolyzer.lib.base.message.circuit import Circuit from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer from hippolyzer.lib.base.message.message import Message, Block @@ -231,13 +231,13 @@ class HippoClientRegion(BaseClientRegion): seed_resp.raise_for_status() self.update_caps(await seed_resp.read_llsd()) - self._eq_task = asyncio.create_task(self._poll_event_queue()) + self._eq_task = create_logged_task(self._poll_event_queue(), "EQ Poll") settings = self.session().session_manager.settings if settings.AUTO_REQUEST_PARCELS: - _ = asyncio.create_task(self.parcel_manager.request_dirty_parcels()) + _ = create_logged_task(self.parcel_manager.request_dirty_parcels(), "Parcel Request") if settings.AUTO_REQUEST_MATERIALS: - _ = asyncio.create_task(self.objects.request_all_materials()) + _ = create_logged_task(self.objects.request_all_materials(), "Request All Materials") except Exception as e: # Let consumers who were `await`ing the connected signal know there was an error @@ -391,10 +391,10 @@ class HippoClientSession(BaseClientSession): need_connect = (region.circuit and region.circuit.is_alive) or moving_to_region self.open_circuit(sim_addr) if need_connect: - asyncio.create_task(region.connect(main_region=moving_to_region)) + create_logged_task(region.connect(main_region=moving_to_region), "Region Connect") elif moving_to_region: # No need to connect, but we do need to complete agent movement. - asyncio.create_task(region.complete_agent_movement()) + create_logged_task(region.complete_agent_movement(), "CompleteAgentMovement") class HippoClient(BaseClientSessionManager): @@ -660,7 +660,7 @@ class HippoClient(BaseClientSessionManager): self.session = HippoClientSession.from_login_data(login_data, self) self.session.transport, self.session.protocol = await self._create_transport() - self._resend_task = asyncio.create_task(self._attempt_resends()) + self._resend_task = create_logged_task(self._attempt_resends(), "Circuit Resend") self.session.message_handler.subscribe("AgentDataUpdate", self._handle_agent_data_update) self.session.message_handler.subscribe("AgentGroupDataUpdate", self._handle_agent_group_data_update) @@ -742,7 +742,7 @@ class HippoClient(BaseClientSessionManager): return teleport_fut.set_result(None) - asyncio.create_task(_handle_teleport()) + create_logged_task(_handle_teleport(), "Teleport") return teleport_fut diff --git a/hippolyzer/lib/proxy/http_proxy.py b/hippolyzer/lib/proxy/http_proxy.py index b056b45..27227a8 100644 --- a/hippolyzer/lib/proxy/http_proxy.py +++ b/hippolyzer/lib/proxy/http_proxy.py @@ -24,7 +24,7 @@ from mitmproxy.http import HTTPFlow from mitmproxy.proxy.layers import tls import OpenSSL -from hippolyzer.lib.base.helpers import get_resource_filename +from hippolyzer.lib.base.helpers import get_resource_filename, create_logged_task from hippolyzer.lib.base.multiprocessing_utils import ParentProcessWatcher from hippolyzer.lib.proxy.caps import SerializedCapData @@ -130,7 +130,7 @@ class IPCInterceptionAddon: def running(self): # register to pump the events or something here - asyncio.create_task(self._pump_callbacks()) + create_logged_task(self._pump_callbacks(), "Pump HTTP proxy callbacks") # Tell the main process mitmproxy is ready to handle requests self.mitmproxy_ready.set() diff --git a/hippolyzer/lib/voice/client.py b/hippolyzer/lib/voice/client.py index f94593f..18b627a 100644 --- a/hippolyzer/lib/voice/client.py +++ b/hippolyzer/lib/voice/client.py @@ -16,6 +16,7 @@ from hippolyzer.lib.base.events import Event from hippolyzer.lib.base.message.message_handler import MessageHandler from hippolyzer.lib.base.objects import handle_to_gridxy from .connection import VivoxConnection, VivoxMessage +from ..base.helpers import create_logged_task LOG = logging.getLogger(__name__) RESP_LOG = logging.getLogger(__name__ + ".responses") @@ -79,7 +80,7 @@ class VoiceClient: self._pos = Vector3(0, 0, 0) self.vivox_conn: Optional[VivoxConnection] = None - self._poll_task = asyncio.create_task(self._poll_messages()) + self._poll_task = create_logged_task(self._poll_messages(), "Poll Vivox messages") self.event_handler: MessageHandler[VivoxMessage, str] = MessageHandler(take_by_default=False) self.event_handler.subscribe( @@ -352,7 +353,7 @@ class VoiceClient: RESP_LOG.debug("%s %s %s %r" % ("Request", request_id, msg_type, data)) - asyncio.create_task(self.vivox_conn.send_request(request_id, msg_type, data)) + create_logged_task(self.vivox_conn.send_request(request_id, msg_type, data), "Send Vivox message") future = asyncio.Future() self._pending_req_futures[request_id] = future return future diff --git a/tests/base/test_xfer_transfer.py b/tests/base/test_xfer_transfer.py index cd9a14a..c451e61 100644 --- a/tests/base/test_xfer_transfer.py +++ b/tests/base/test_xfer_transfer.py @@ -4,6 +4,7 @@ import unittest from typing import * from hippolyzer.lib.base.datatypes import UUID +from hippolyzer.lib.base.helpers import create_logged_task from hippolyzer.lib.base.message.message import Block, Message from hippolyzer.lib.base.message.message_handler import MessageHandler from hippolyzer.lib.base.templates import ( @@ -61,7 +62,7 @@ class XferManagerTests(BaseTransferTests): )) async def test_small_xfer_upload(self): - asyncio.create_task(self._handle_vfile_upload()) + _ = create_logged_task(self._handle_vfile_upload()) await asyncio.wait_for(self.xfer_manager.upload_asset( AssetType.BODYPART, self.SMALL_PAYLOAD ), timeout=0.1) @@ -69,7 +70,7 @@ class XferManagerTests(BaseTransferTests): async def test_large_xfer_upload(self): # Larger payloads take a different path - asyncio.create_task(self._handle_vfile_upload()) + _ = create_logged_task(self._handle_vfile_upload()) await asyncio.wait_for(self.xfer_manager.upload_asset( AssetType.BODYPART, self.LARGE_PAYLOAD ), timeout=0.1) @@ -125,7 +126,7 @@ class TestTransferManager(BaseTransferTests): packet_num += 1 async def test_simple_transfer(self): - asyncio.create_task(self._handle_covenant_download()) + _ = create_logged_task(self._handle_covenant_download()) transfer: Transfer = await asyncio.wait_for(self.transfer_manager.request( source_type=TransferSourceType.SIM_ESTATE, params=TransferRequestParamsSimEstate( diff --git a/tests/proxy/integration/test_http.py b/tests/proxy/integration/test_http.py index 55bbccb..bdc2d00 100644 --- a/tests/proxy/integration/test_http.py +++ b/tests/proxy/integration/test_http.py @@ -12,6 +12,7 @@ from yarl import URL from hippolyzer.apps.proxy import run_http_proxy_process from hippolyzer.lib.base.datatypes import Vector3 +from hippolyzer.lib.base.helpers import create_logged_task from hippolyzer.lib.proxy.addon_utils import BaseAddon from hippolyzer.lib.proxy.addons import AddonManager from hippolyzer.lib.proxy.http_event_manager import MITMProxyEventManager @@ -164,7 +165,7 @@ class TestMITMProxy(BaseProxyTest): def test_mitmproxy_works(self): async def _request_example_com(): # Pump callbacks from mitmproxy - asyncio.create_task(self.http_event_manager.run()) + _ = create_logged_task(self.http_event_manager.run()) try: async with self.caps_client.get("http://example.com/", timeout=0.5) as resp: self.assertIn(b"Example Domain", await resp.read()) diff --git a/tests/proxy/test_object_manager.py b/tests/proxy/test_object_manager.py index 955da43..0804e26 100644 --- a/tests/proxy/test_object_manager.py +++ b/tests/proxy/test_object_manager.py @@ -6,6 +6,7 @@ from typing import * from unittest import mock from hippolyzer.lib.base.datatypes import * +from hippolyzer.lib.base.helpers import create_logged_task from hippolyzer.lib.base.message.message import Block, Message as Message from hippolyzer.lib.base.message.udpdeserializer import UDPMessageDeserializer from hippolyzer.lib.base.message.udpserializer import UDPMessageSerializer @@ -620,7 +621,7 @@ class SessionObjectManagerTests(ObjectManagerTestMixin, unittest.IsolatedAsyncio async def _create_after(): await asyncio.sleep(0.001) self._create_object(region_handle=123, local_id=child.ParentID) - asyncio.create_task(_create_after()) + _ = create_logged_task(_create_after()) await self.session.objects.load_ancestors(child) await self.session.objects.load_ancestors(parentless)