diff --git a/hippolyzer/lib/proxy/caps_client.py b/hippolyzer/lib/proxy/caps_client.py index 3847fc8..29a6dc0 100644 --- a/hippolyzer/lib/proxy/caps_client.py +++ b/hippolyzer/lib/proxy/caps_client.py @@ -28,7 +28,8 @@ class ProxyCapsClient(CapsClient): # We go through the proxy by default, tack on a header letting mitmproxy know the # request came from us so we can tag the request as injected. The header will be popped # off before passing through to the server. - headers["X-Hippo-Injected"] = "1" + if "X-Hippo-Injected" not in headers: + headers["X-Hippo-Injected"] = "1" proxy_port = self._settings.HTTP_PROXY_PORT proxy = f"http://127.0.0.1:{proxy_port}" # TODO: set up the SSLContext to validate mitmproxy's cert diff --git a/hippolyzer/lib/proxy/http_flow.py b/hippolyzer/lib/proxy/http_flow.py index f0fada8..2d90233 100644 --- a/hippolyzer/lib/proxy/http_flow.py +++ b/hippolyzer/lib/proxy/http_flow.py @@ -111,6 +111,12 @@ class HippoHTTPFlow: self.resumed = True self.callback_queue().put(("callback", self.flow.id, self.get_state())) + def preempt(self): + # Must be some flow that we previously resumed, we're racing + # the result from the server end. + assert not self.taken and self.resumed + self.callback_queue().put(("preempt", self.flow.id, self.get_state())) + @property def is_replay(self) -> bool: return bool(self.flow.is_replay) diff --git a/hippolyzer/lib/proxy/http_proxy.py b/hippolyzer/lib/proxy/http_proxy.py index cad59cb..e693661 100644 --- a/hippolyzer/lib/proxy/http_proxy.py +++ b/hippolyzer/lib/proxy/http_proxy.py @@ -7,6 +7,7 @@ import sys import queue import typing import uuid +import weakref import mitmproxy.certs import mitmproxy.ctx @@ -99,7 +100,7 @@ class IPCInterceptionAddon: """ def __init__(self, flow_context: HTTPFlowContext): self.mitmproxy_ready = flow_context.mitmproxy_ready - self.intercepted_flows: typing.Dict[str, HTTPFlow] = {} + self.flows: weakref.WeakValueDictionary[str, HTTPFlow] = weakref.WeakValueDictionary() self.from_proxy_queue: multiprocessing.Queue = flow_context.from_proxy_queue self.to_proxy_queue: multiprocessing.Queue = flow_context.to_proxy_queue self.shutdown_signal: multiprocessing.Event = flow_context.shutdown_signal @@ -134,8 +135,13 @@ class IPCInterceptionAddon: await asyncio.sleep(0.001) continue if event_type == "callback": - orig_flow = self.intercepted_flows.pop(flow_id) + orig_flow = self.flows[flow_id] orig_flow.set_state(flow_state) + elif event_type == "preempt": + orig_flow = self.flows.get(flow_id) + if orig_flow: + orig_flow.intercept() + orig_flow.set_state(flow_state) elif event_type == "replay": flow: HTTPFlow = HTTPFlow.from_state(flow_state) # mitmproxy won't replay intercepted flows, this is an old flow so @@ -157,8 +163,8 @@ class IPCInterceptionAddon: from_browser = "Mozilla" in flow.request.headers.get("User-Agent", "") flow.metadata["from_browser"] = from_browser # Only trust the "injected" header if not from a browser - was_injected = flow.request.headers.pop("X-Hippo-Injected", False) - if was_injected and not from_browser: + was_injected = flow.request.headers.pop("X-Hippo-Injected", "") + if was_injected == "1" and not from_browser: flow.metadata["request_injected"] = True # Does this request need the stupid hack around aiohttp's windows proactor bug @@ -169,7 +175,7 @@ class IPCInterceptionAddon: def _queue_flow_interception(self, event_type: str, flow: HTTPFlow): flow.intercept() - self.intercepted_flows[flow.id] = flow + self.flows[flow.id] = flow self.from_proxy_queue.put((event_type, flow.get_state()), True) def responseheaders(self, flow: HTTPFlow): diff --git a/hippolyzer/lib/proxy/region.py b/hippolyzer/lib/proxy/region.py index 0546d19..626271c 100644 --- a/hippolyzer/lib/proxy/region.py +++ b/hippolyzer/lib/proxy/region.py @@ -11,7 +11,7 @@ import multidict from hippolyzer.lib.base.datatypes import Vector3, UUID from hippolyzer.lib.base.helpers import proxify -from hippolyzer.lib.base.message.message import Message +from hippolyzer.lib.base.message.message import Message, Block from hippolyzer.lib.base.message.message_handler import MessageHandler from hippolyzer.lib.base.objects import handle_to_global_pos from hippolyzer.lib.client.state import BaseClientRegion @@ -171,6 +171,19 @@ class EventQueueManager: def inject_event(self, event: dict): self._queued_events.append(event) + if self._region: + circuit: ProxiedCircuit = self._region.circuit + session: Session = self._region.session() + # Inject an outbound PlacesQuery message so we can trigger an inbound PlacesReply + # over the EQ. That will allow us to shove our own event onto the response once it comes in, + # otherwise we have to wait until the EQ legitimately returns 200 due to a new event. + # May or may not work in OpenSim. + circuit.send_message(Message( + 'PlacesQuery', + Block('AgentData', AgentID=session.agent_id, SessionID=session.id, QueryID=UUID()), + Block('TransactionData', TransactionID=UUID()), + Block('QueryData', QueryText=b'', QueryFlags=64, Category=-1, SimName=b''), + )) def take_injected_events(self): events = self._queued_events