Initial work to support in-flight EQ response pre-emption

This commit is contained in:
Salad Dais
2022-07-02 21:47:11 +00:00
parent 03a56c9982
commit fde9ddf4d9
4 changed files with 33 additions and 7 deletions

View File

@@ -28,7 +28,8 @@ class ProxyCapsClient(CapsClient):
# We go through the proxy by default, tack on a header letting mitmproxy know the
# request came from us so we can tag the request as injected. The header will be popped
# off before passing through to the server.
headers["X-Hippo-Injected"] = "1"
if "X-Hippo-Injected" not in headers:
headers["X-Hippo-Injected"] = "1"
proxy_port = self._settings.HTTP_PROXY_PORT
proxy = f"http://127.0.0.1:{proxy_port}"
# TODO: set up the SSLContext to validate mitmproxy's cert

View File

@@ -111,6 +111,12 @@ class HippoHTTPFlow:
self.resumed = True
self.callback_queue().put(("callback", self.flow.id, self.get_state()))
def preempt(self):
# Must be some flow that we previously resumed, we're racing
# the result from the server end.
assert not self.taken and self.resumed
self.callback_queue().put(("preempt", self.flow.id, self.get_state()))
@property
def is_replay(self) -> bool:
return bool(self.flow.is_replay)

View File

@@ -7,6 +7,7 @@ import sys
import queue
import typing
import uuid
import weakref
import mitmproxy.certs
import mitmproxy.ctx
@@ -99,7 +100,7 @@ class IPCInterceptionAddon:
"""
def __init__(self, flow_context: HTTPFlowContext):
self.mitmproxy_ready = flow_context.mitmproxy_ready
self.intercepted_flows: typing.Dict[str, HTTPFlow] = {}
self.flows: weakref.WeakValueDictionary[str, HTTPFlow] = weakref.WeakValueDictionary()
self.from_proxy_queue: multiprocessing.Queue = flow_context.from_proxy_queue
self.to_proxy_queue: multiprocessing.Queue = flow_context.to_proxy_queue
self.shutdown_signal: multiprocessing.Event = flow_context.shutdown_signal
@@ -134,8 +135,13 @@ class IPCInterceptionAddon:
await asyncio.sleep(0.001)
continue
if event_type == "callback":
orig_flow = self.intercepted_flows.pop(flow_id)
orig_flow = self.flows[flow_id]
orig_flow.set_state(flow_state)
elif event_type == "preempt":
orig_flow = self.flows.get(flow_id)
if orig_flow:
orig_flow.intercept()
orig_flow.set_state(flow_state)
elif event_type == "replay":
flow: HTTPFlow = HTTPFlow.from_state(flow_state)
# mitmproxy won't replay intercepted flows, this is an old flow so
@@ -157,8 +163,8 @@ class IPCInterceptionAddon:
from_browser = "Mozilla" in flow.request.headers.get("User-Agent", "")
flow.metadata["from_browser"] = from_browser
# Only trust the "injected" header if not from a browser
was_injected = flow.request.headers.pop("X-Hippo-Injected", False)
if was_injected and not from_browser:
was_injected = flow.request.headers.pop("X-Hippo-Injected", "")
if was_injected == "1" and not from_browser:
flow.metadata["request_injected"] = True
# Does this request need the stupid hack around aiohttp's windows proactor bug
@@ -169,7 +175,7 @@ class IPCInterceptionAddon:
def _queue_flow_interception(self, event_type: str, flow: HTTPFlow):
flow.intercept()
self.intercepted_flows[flow.id] = flow
self.flows[flow.id] = flow
self.from_proxy_queue.put((event_type, flow.get_state()), True)
def responseheaders(self, flow: HTTPFlow):

View File

@@ -11,7 +11,7 @@ import multidict
from hippolyzer.lib.base.datatypes import Vector3, UUID
from hippolyzer.lib.base.helpers import proxify
from hippolyzer.lib.base.message.message import Message
from hippolyzer.lib.base.message.message import Message, Block
from hippolyzer.lib.base.message.message_handler import MessageHandler
from hippolyzer.lib.base.objects import handle_to_global_pos
from hippolyzer.lib.client.state import BaseClientRegion
@@ -171,6 +171,19 @@ class EventQueueManager:
def inject_event(self, event: dict):
self._queued_events.append(event)
if self._region:
circuit: ProxiedCircuit = self._region.circuit
session: Session = self._region.session()
# Inject an outbound PlacesQuery message so we can trigger an inbound PlacesReply
# over the EQ. That will allow us to shove our own event onto the response once it comes in,
# otherwise we have to wait until the EQ legitimately returns 200 due to a new event.
# May or may not work in OpenSim.
circuit.send_message(Message(
'PlacesQuery',
Block('AgentData', AgentID=session.agent_id, SessionID=session.id, QueryID=UUID()),
Block('TransactionData', TransactionID=UUID()),
Block('QueryData', QueryText=b'', QueryFlags=64, Category=-1, SimName=b''),
))
def take_injected_events(self):
events = self._queued_events