Make sure asyncio.Tasks always have their exceptiosn logged

This commit is contained in:
Salad Dais
2024-01-15 22:24:16 +00:00
parent 3500212da0
commit a6bbd97b98
11 changed files with 60 additions and 27 deletions

View File

@@ -24,7 +24,7 @@ from hippolyzer.apps.model import MessageLogModel, MessageLogHeader, RegionListM
from hippolyzer.apps.proxy import start_proxy
from hippolyzer.lib.base import llsd
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.helpers import bytes_unescape, bytes_escape, get_resource_filename
from hippolyzer.lib.base.helpers import bytes_unescape, bytes_escape, get_resource_filename, create_logged_task
from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.message_formatting import (
@@ -826,7 +826,7 @@ class MessageBuilderWindow(QtWidgets.QMainWindow):
# enough for the full response to pass through the proxy
await resp.read()
asyncio.create_task(_send_request())
create_logged_task(_send_request(), "Send HTTP Request")
class AddonDialog(QtWidgets.QDialog):

View File

@@ -21,6 +21,8 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import asyncio
import logging
from hippolyzer.lib.base.helpers import create_logged_task
LOG = logging.getLogger(__name__)
@@ -67,8 +69,7 @@ class Event:
unsubscribe = await handler(args, *inner_args, **kwargs)
if unsubscribe:
_ = self.unsubscribe(handler, *inner_args, **kwargs)
task = asyncio.create_task(_run_handler_wrapper())
task.add_done_callback(self._log_task_failures)
create_logged_task(_run_handler_wrapper(), self.name, LOG)
else:
try:
if handler(args, *inner_args, **kwargs) and not one_shot:
@@ -77,10 +78,6 @@ class Event:
# One handler failing shouldn't prevent notification of other handlers.
LOG.exception(f"Failed in handler for {self.name}")
def _log_task_failures(self, task: asyncio.Task):
if task.exception():
LOG.exception(f"Failed in handler for {self.name}", exc_info=task.exception())
def __len__(self):
return len(self.subscribers)

View File

@@ -1,7 +1,9 @@
from __future__ import annotations
import asyncio
import codecs
import functools
import logging
import os
import lazy_object_proxy
@@ -165,3 +167,31 @@ def get_mtime(path):
return os.stat(path).st_mtime
except:
return None
def fut_logger(name: str, logger: logging.Logger, fut: asyncio.Future, *args) -> None:
"""Callback suitable for exception logging in `Future.add_done_callback()`"""
if fut.exception():
if isinstance(fut.exception(), asyncio.CancelledError):
# Don't really care if the task was just cancelled
return
logger.exception(f"Failed in task for {name}", exc_info=fut.exception())
def add_future_logger(
fut: asyncio.Future,
name: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Add a logger to Futures that will never be directly `await`ed, logging exceptions"""
fut.add_done_callback(functools.partial(fut_logger, name, logger or logging.getLogger()))
def create_logged_task(
coro: Coroutine,
name: Optional[str] = None,
logger: Optional[logging.Logger] = None,
) -> asyncio.Task:
task = asyncio.create_task(coro, name=name)
add_future_logger(task, name, logger)
return task

View File

@@ -8,6 +8,7 @@ import dataclasses
from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.helpers import create_logged_task
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.circuit import ConnectionHolder
from hippolyzer.lib.base.message.msgtypes import PacketFlags
@@ -108,7 +109,7 @@ class TransferManager:
flags=PacketFlags.RELIABLE,
))
transfer = Transfer(transfer_id)
asyncio.create_task(self._pump_transfer_replies(transfer))
create_logged_task(self._pump_transfer_replies(transfer), "Transfer Pump")
return transfer
async def _pump_transfer_replies(self, transfer: Transfer):

View File

@@ -9,6 +9,7 @@ import random
from typing import *
from hippolyzer.lib.base.datatypes import UUID, RawBytes
from hippolyzer.lib.base.helpers import create_logged_task
from hippolyzer.lib.base.message.data_packer import TemplateDataPacker
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.msgtypes import MsgType, PacketFlags
@@ -125,7 +126,7 @@ class XferManager:
direction=direction,
))
xfer = Xfer(xfer_id, direction=direction, turbo=turbo)
asyncio.create_task(self._pump_xfer_replies(xfer))
create_logged_task(self._pump_xfer_replies(xfer), "Xfer Pump")
return xfer
async def _pump_xfer_replies(self, xfer: Xfer):

View File

@@ -13,7 +13,7 @@ import aiohttp
import multidict
from hippolyzer.lib.base.datatypes import Vector3, StringEnum
from hippolyzer.lib.base.helpers import proxify, get_resource_filename
from hippolyzer.lib.base.helpers import proxify, get_resource_filename, create_logged_task
from hippolyzer.lib.base.message.circuit import Circuit
from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer
from hippolyzer.lib.base.message.message import Message, Block
@@ -231,13 +231,13 @@ class HippoClientRegion(BaseClientRegion):
seed_resp.raise_for_status()
self.update_caps(await seed_resp.read_llsd())
self._eq_task = asyncio.create_task(self._poll_event_queue())
self._eq_task = create_logged_task(self._poll_event_queue(), "EQ Poll")
settings = self.session().session_manager.settings
if settings.AUTO_REQUEST_PARCELS:
_ = asyncio.create_task(self.parcel_manager.request_dirty_parcels())
_ = create_logged_task(self.parcel_manager.request_dirty_parcels(), "Parcel Request")
if settings.AUTO_REQUEST_MATERIALS:
_ = asyncio.create_task(self.objects.request_all_materials())
_ = create_logged_task(self.objects.request_all_materials(), "Request All Materials")
except Exception as e:
# Let consumers who were `await`ing the connected signal know there was an error
@@ -391,10 +391,10 @@ class HippoClientSession(BaseClientSession):
need_connect = (region.circuit and region.circuit.is_alive) or moving_to_region
self.open_circuit(sim_addr)
if need_connect:
asyncio.create_task(region.connect(main_region=moving_to_region))
create_logged_task(region.connect(main_region=moving_to_region), "Region Connect")
elif moving_to_region:
# No need to connect, but we do need to complete agent movement.
asyncio.create_task(region.complete_agent_movement())
create_logged_task(region.complete_agent_movement(), "CompleteAgentMovement")
class HippoClient(BaseClientSessionManager):
@@ -660,7 +660,7 @@ class HippoClient(BaseClientSessionManager):
self.session = HippoClientSession.from_login_data(login_data, self)
self.session.transport, self.session.protocol = await self._create_transport()
self._resend_task = asyncio.create_task(self._attempt_resends())
self._resend_task = create_logged_task(self._attempt_resends(), "Circuit Resend")
self.session.message_handler.subscribe("AgentDataUpdate", self._handle_agent_data_update)
self.session.message_handler.subscribe("AgentGroupDataUpdate", self._handle_agent_group_data_update)
@@ -742,7 +742,7 @@ class HippoClient(BaseClientSessionManager):
return
teleport_fut.set_result(None)
asyncio.create_task(_handle_teleport())
create_logged_task(_handle_teleport(), "Teleport")
return teleport_fut

View File

@@ -24,7 +24,7 @@ from mitmproxy.http import HTTPFlow
from mitmproxy.proxy.layers import tls
import OpenSSL
from hippolyzer.lib.base.helpers import get_resource_filename
from hippolyzer.lib.base.helpers import get_resource_filename, create_logged_task
from hippolyzer.lib.base.multiprocessing_utils import ParentProcessWatcher
from hippolyzer.lib.proxy.caps import SerializedCapData
@@ -130,7 +130,7 @@ class IPCInterceptionAddon:
def running(self):
# register to pump the events or something here
asyncio.create_task(self._pump_callbacks())
create_logged_task(self._pump_callbacks(), "Pump HTTP proxy callbacks")
# Tell the main process mitmproxy is ready to handle requests
self.mitmproxy_ready.set()

View File

@@ -16,6 +16,7 @@ from hippolyzer.lib.base.events import Event
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.objects import handle_to_gridxy
from .connection import VivoxConnection, VivoxMessage
from ..base.helpers import create_logged_task
LOG = logging.getLogger(__name__)
RESP_LOG = logging.getLogger(__name__ + ".responses")
@@ -79,7 +80,7 @@ class VoiceClient:
self._pos = Vector3(0, 0, 0)
self.vivox_conn: Optional[VivoxConnection] = None
self._poll_task = asyncio.create_task(self._poll_messages())
self._poll_task = create_logged_task(self._poll_messages(), "Poll Vivox messages")
self.event_handler: MessageHandler[VivoxMessage, str] = MessageHandler(take_by_default=False)
self.event_handler.subscribe(
@@ -352,7 +353,7 @@ class VoiceClient:
RESP_LOG.debug("%s %s %s %r" % ("Request", request_id, msg_type, data))
asyncio.create_task(self.vivox_conn.send_request(request_id, msg_type, data))
create_logged_task(self.vivox_conn.send_request(request_id, msg_type, data), "Send Vivox message")
future = asyncio.Future()
self._pending_req_futures[request_id] = future
return future

View File

@@ -4,6 +4,7 @@ import unittest
from typing import *
from hippolyzer.lib.base.datatypes import UUID
from hippolyzer.lib.base.helpers import create_logged_task
from hippolyzer.lib.base.message.message import Block, Message
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.templates import (
@@ -61,7 +62,7 @@ class XferManagerTests(BaseTransferTests):
))
async def test_small_xfer_upload(self):
asyncio.create_task(self._handle_vfile_upload())
_ = create_logged_task(self._handle_vfile_upload())
await asyncio.wait_for(self.xfer_manager.upload_asset(
AssetType.BODYPART, self.SMALL_PAYLOAD
), timeout=0.1)
@@ -69,7 +70,7 @@ class XferManagerTests(BaseTransferTests):
async def test_large_xfer_upload(self):
# Larger payloads take a different path
asyncio.create_task(self._handle_vfile_upload())
_ = create_logged_task(self._handle_vfile_upload())
await asyncio.wait_for(self.xfer_manager.upload_asset(
AssetType.BODYPART, self.LARGE_PAYLOAD
), timeout=0.1)
@@ -125,7 +126,7 @@ class TestTransferManager(BaseTransferTests):
packet_num += 1
async def test_simple_transfer(self):
asyncio.create_task(self._handle_covenant_download())
_ = create_logged_task(self._handle_covenant_download())
transfer: Transfer = await asyncio.wait_for(self.transfer_manager.request(
source_type=TransferSourceType.SIM_ESTATE,
params=TransferRequestParamsSimEstate(

View File

@@ -12,6 +12,7 @@ from yarl import URL
from hippolyzer.apps.proxy import run_http_proxy_process
from hippolyzer.lib.base.datatypes import Vector3
from hippolyzer.lib.base.helpers import create_logged_task
from hippolyzer.lib.proxy.addon_utils import BaseAddon
from hippolyzer.lib.proxy.addons import AddonManager
from hippolyzer.lib.proxy.http_event_manager import MITMProxyEventManager
@@ -164,7 +165,7 @@ class TestMITMProxy(BaseProxyTest):
def test_mitmproxy_works(self):
async def _request_example_com():
# Pump callbacks from mitmproxy
asyncio.create_task(self.http_event_manager.run())
_ = create_logged_task(self.http_event_manager.run())
try:
async with self.caps_client.get("http://example.com/", timeout=0.5) as resp:
self.assertIn(b"Example Domain", await resp.read())

View File

@@ -6,6 +6,7 @@ from typing import *
from unittest import mock
from hippolyzer.lib.base.datatypes import *
from hippolyzer.lib.base.helpers import create_logged_task
from hippolyzer.lib.base.message.message import Block, Message as Message
from hippolyzer.lib.base.message.udpdeserializer import UDPMessageDeserializer
from hippolyzer.lib.base.message.udpserializer import UDPMessageSerializer
@@ -620,7 +621,7 @@ class SessionObjectManagerTests(ObjectManagerTestMixin, unittest.IsolatedAsyncio
async def _create_after():
await asyncio.sleep(0.001)
self._create_object(region_handle=123, local_id=child.ParentID)
asyncio.create_task(_create_after())
_ = create_logged_task(_create_after())
await self.session.objects.load_ancestors(child)
await self.session.objects.load_ancestors(parentless)