377 lines
17 KiB
Python
377 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import multiprocessing
|
|
import queue
|
|
from typing import *
|
|
import urllib.parse
|
|
import weakref
|
|
import xmlrpc.client
|
|
|
|
import defusedxml.ElementTree
|
|
import defusedxml.xmlrpc
|
|
import mitmproxy.http
|
|
|
|
from hippolyzer.lib.base import llsd
|
|
from hippolyzer.lib.base.datatypes import UUID
|
|
from hippolyzer.lib.base.message.llsd_msg_serializer import LLSDMessageSerializer
|
|
from hippolyzer.lib.base.message.message import Message
|
|
from hippolyzer.lib.base.network.transport import Direction
|
|
from hippolyzer.lib.proxy.addons import AddonManager
|
|
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
|
|
from hippolyzer.lib.proxy.caps import CapData, CapType
|
|
from hippolyzer.lib.proxy.region import ProxiedRegion
|
|
from hippolyzer.lib.proxy.sessions import SessionManager, Session
|
|
from hippolyzer.lib.proxy.http_proxy import HTTPFlowContext
|
|
|
|
|
|
def apply_security_monkeypatches():
|
|
defusedxml.xmlrpc.monkey_patch()
|
|
llsd.fromstring = defusedxml.ElementTree.fromstring
|
|
|
|
|
|
apply_security_monkeypatches()
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class MITMProxyEventManager:
|
|
"""
|
|
Handles HTTP request and response events from the mitmproxy process
|
|
"""
|
|
|
|
UPLOAD_CREATING_CAPS = {
|
|
"NewFileAgentInventory", "UpdateGestureAgentInventory", "UpdateGestureTaskInventory",
|
|
"UpdateNotecardAgentInventory", "UpdateNotecardTaskInventory",
|
|
"UpdateScriptAgent", "UpdateScriptTask",
|
|
"UpdateSettingsAgentInventory", "UpdateSettingsTaskInventory",
|
|
"UploadBakedTexture", "UploadAgentProfileImage",
|
|
}
|
|
|
|
def __init__(self, session_manager: SessionManager, flow_context: HTTPFlowContext):
|
|
self.session_manager: SessionManager = weakref.proxy(session_manager)
|
|
self.from_proxy_queue: multiprocessing.Queue = flow_context.from_proxy_queue
|
|
self.to_proxy_queue: multiprocessing.Queue = flow_context.to_proxy_queue
|
|
self.shutdown_signal: multiprocessing.Event = flow_context.shutdown_signal
|
|
self.llsd_message_serializer = LLSDMessageSerializer()
|
|
self._asset_server_proxied = False
|
|
|
|
async def run(self):
|
|
while not self.shutdown_signal.is_set():
|
|
try:
|
|
await self.pump_proxy_event()
|
|
except:
|
|
LOG.exception("Exploded when handling parsed packets")
|
|
|
|
async def pump_proxy_event(self):
|
|
try:
|
|
event_type, flow_state = self.from_proxy_queue.get(False)
|
|
except queue.Empty:
|
|
await asyncio.sleep(0.001)
|
|
return
|
|
|
|
flow = HippoHTTPFlow.from_state(flow_state, self.session_manager)
|
|
try:
|
|
if event_type == "request":
|
|
self._handle_request(flow)
|
|
# A response was injected early in the cycle, we won't get a response
|
|
# callback from mitmproxy so just log it now.
|
|
message_logger = self.session_manager.message_logger
|
|
if message_logger and flow.response_injected:
|
|
message_logger.log_http_response(flow)
|
|
elif event_type == "response":
|
|
self._handle_response(flow)
|
|
else:
|
|
raise Exception(f"Unknown mitmproxy event type {event_type}")
|
|
finally:
|
|
# If someone has taken this request out of the regular callback flow,
|
|
# they'll manually send a callback at some later time.
|
|
if not flow.taken and not flow.resumed:
|
|
# Addon hasn't taken ownership of this flow, send it back to mitmproxy
|
|
# ourselves.
|
|
flow.resume()
|
|
|
|
def _handle_request(self, flow: HippoHTTPFlow):
|
|
url = flow.request.url
|
|
cap_data = self.session_manager.resolve_cap(url)
|
|
flow.cap_data = cap_data
|
|
# Don't do anything special with the proxy's own requests unless the requested
|
|
# URL can only be handled by the proxy. Ideally we only pass the request through
|
|
# for logging purposes.
|
|
if flow.request_injected and (not cap_data or not cap_data.type.fake):
|
|
return
|
|
|
|
# The local asset repo gets first bite at the apple
|
|
if cap_data and cap_data.asset_server_cap:
|
|
if self.session_manager.asset_repo.try_serve_asset(flow):
|
|
print(f"Served asset for {flow.request.pretty_url}")
|
|
return
|
|
|
|
AddonManager.handle_http_request(flow)
|
|
if cap_data and cap_data.cap_name.endswith("ProxyWrapper"):
|
|
orig_cap_name = cap_data.cap_name.rsplit("ProxyWrapper", 1)[0]
|
|
orig_cap_url = cap_data.region().cap_urls[orig_cap_name]
|
|
split_orig_url = urllib.parse.urlsplit(orig_cap_url)
|
|
orig_cap_host = split_orig_url[1]
|
|
|
|
# Not a req we want to handle, so redirect to the original, unproxied URL
|
|
split_req_url = list(urllib.parse.urlsplit(flow.request.url))
|
|
split_req_url[1] = orig_cap_host
|
|
redir_url = urllib.parse.urlunsplit(split_req_url)
|
|
|
|
# We can't redirect because the URL we redirect to will be proxied as well.
|
|
# Don't make them eat the cost of the redirect too, just rewrite the URL
|
|
# we'll make the request to.
|
|
# We also need to do this if an addon indicated the response shouldn't be streamed
|
|
# because it needs to inspect or mutate it.
|
|
if not flow.can_stream or self._asset_server_proxied:
|
|
flow.request.url = redir_url
|
|
else:
|
|
flow.response = mitmproxy.http.Response.make(
|
|
307,
|
|
# Can't provide explanation in the body because this results in failing Range requests under
|
|
# mitmproxy that return garbage data. Chances are there's weird interactions
|
|
# between HTTP/1.x pipelining and range requests under mitmproxy that no other
|
|
# applications have hit. If that's a concern then Connection: close should be used.
|
|
b"",
|
|
{
|
|
"Connection": "keep-alive",
|
|
"Location": redir_url,
|
|
}
|
|
)
|
|
elif cap_data and cap_data.asset_server_cap:
|
|
# Both the wrapper request and the actual asset server request went through
|
|
# the proxy. Don't bother trying the redirect strategy anymore.
|
|
self._asset_server_proxied = True
|
|
LOG.warning("noproxy not used, switching to URI rewrite strategy")
|
|
elif cap_data and cap_data.cap_name == "EventQueueGet":
|
|
# HACK: The sim's EQ acking mechanism doesn't seem to actually work.
|
|
# if the client drops the connection due to timeout before we can
|
|
# proxy back the response then it will be lost forever. Keep around
|
|
# the last EQ response we got so we can re-send it if the client repeats
|
|
# its previous request.
|
|
req_ack_id = llsd.parse_xml(flow.request.content)["ack"]
|
|
eq_manager = cap_data.region().eq_manager
|
|
cached_resp = eq_manager.get_cached_poll_response(req_ack_id)
|
|
if cached_resp:
|
|
LOG.warning("Had to serve a cached EventQueueGet due to client desync")
|
|
flow.response = mitmproxy.http.Response.make(
|
|
200,
|
|
llsd.format_xml(cached_resp),
|
|
{
|
|
"Content-Type": "application/llsd+xml",
|
|
# So we can differentiate these in the log
|
|
"X-Hippo-Fake-EQ": "1",
|
|
"Connection": "close",
|
|
},
|
|
)
|
|
elif cap_data and cap_data.cap_name == "Seed":
|
|
# Drop any proxy-only caps from the seed request we send to the server,
|
|
# add those cap names as metadata so we know to send their urls in the response
|
|
parsed_seed: List[str] = llsd.parse_xml(flow.request.content)
|
|
flow.metadata['needed_proxy_caps'] = []
|
|
for known_cap_name, (known_cap_type, known_cap_url) in cap_data.region().caps.items():
|
|
if known_cap_type == CapType.PROXY_ONLY and known_cap_name in parsed_seed:
|
|
parsed_seed.remove(known_cap_name)
|
|
flow.metadata['needed_proxy_caps'].append(known_cap_name)
|
|
if flow.metadata['needed_proxy_caps']:
|
|
flow.request.content = llsd.format_xml(parsed_seed)
|
|
elif not cap_data:
|
|
if self._is_login_request(flow):
|
|
# Not strictly a Cap, but makes it easier to filter on.
|
|
flow.cap_data = CapData(cap_name="LoginRequest")
|
|
|
|
if cap_data and cap_data.type == CapType.PROXY_ONLY:
|
|
# A proxy addon was supposed to respond itself, but it didn't.
|
|
if not flow.taken and not flow.response_injected:
|
|
flow.response = mitmproxy.http.Response.make(
|
|
500,
|
|
b"Proxy didn't handle proxy-only Cap correctly",
|
|
{
|
|
"Content-Type": "text/plain",
|
|
"Connection": "close",
|
|
}
|
|
)
|
|
|
|
@classmethod
|
|
def _is_login_request(cls, flow: HippoHTTPFlow):
|
|
"""LoginURI independent login request sniffer"""
|
|
# Probably a request from the internal browser
|
|
if flow.from_browser:
|
|
return False
|
|
# Could it potentially be an XML-RPC POST?
|
|
if flow.request.headers.get("Content-Type") not in ("text/xml", "application/xml"):
|
|
return False
|
|
if flow.request.method != "POST" or not flow.request.content:
|
|
return False
|
|
# Probably SL login, allow without trying to sniff XML-RPC request body
|
|
if flow.request.pretty_url.endswith("/login.cgi"):
|
|
return True
|
|
if flow.request.content.startswith(
|
|
b'<?xml version="1.0"?><methodCall><methodName>login_to_simulator'):
|
|
return True
|
|
return False
|
|
|
|
def _handle_response(self, flow: HippoHTTPFlow):
|
|
message_logger = self.session_manager.message_logger
|
|
if message_logger:
|
|
try:
|
|
message_logger.log_http_response(flow)
|
|
except:
|
|
LOG.exception("Failed while logging HTTP flow")
|
|
|
|
# Don't process responses for requests or responses injected by the proxy.
|
|
# We already processed it, it came from us!
|
|
if flow.request_injected or flow.response_injected:
|
|
return
|
|
|
|
status = flow.response.status_code
|
|
cap_data: Optional[CapData] = flow.metadata["cap_data"]
|
|
if not cap_data:
|
|
# Make sure there's always cap data attached to the flow, even if it's
|
|
# empty. Some consumers expect it to always be there, when it might not
|
|
# be if the proxy barfed while handling the request.
|
|
cap_data = flow.metadata["cap_data"] = CapData()
|
|
|
|
if status == 200 and cap_data and cap_data.cap_name == "FirestormBridge":
|
|
# Fake FirestormBridge cap based on a bridge-like response coming from
|
|
# a non-browser HTTP request. Figure out what session it belongs to
|
|
# so it can be handled in the session and region HTTP MessageHandlers
|
|
agent_id_str = flow.response.headers.get("X-SecondLife-Owner-Key", "")
|
|
if not agent_id_str:
|
|
return
|
|
agent_id = UUID(agent_id_str)
|
|
for session in self.session_manager.sessions:
|
|
if session.pending:
|
|
continue
|
|
if session.agent_id == agent_id:
|
|
# Enrich the flow with the session and region info
|
|
cap_data = CapData(
|
|
cap_name="FirestormBridge",
|
|
region=weakref.ref(session.main_region),
|
|
session=weakref.ref(session),
|
|
)
|
|
flow.cap_data = cap_data
|
|
break
|
|
|
|
if AddonManager.handle_http_response(flow):
|
|
return
|
|
|
|
if status != 200 or not cap_data:
|
|
return
|
|
|
|
if cap_data.cap_name == "LoginRequest":
|
|
self._handle_login_flow(flow)
|
|
return
|
|
|
|
try:
|
|
session = cap_data.session and cap_data.session()
|
|
if not session:
|
|
return
|
|
session.http_message_handler.handle(flow)
|
|
|
|
region = cap_data.region and cap_data.region()
|
|
if not region:
|
|
return
|
|
region.http_message_handler.handle(flow)
|
|
|
|
if cap_data.cap_name == "Seed":
|
|
parsed = llsd.parse_xml(flow.response.content)
|
|
LOG.debug("Got seed cap for %r : %r" % (cap_data, parsed))
|
|
region.update_caps(parsed)
|
|
|
|
# On LL's grid these URIs aren't unique across sessions or regions,
|
|
# so we get request attribution by replacing them with a unique
|
|
# alias URI.
|
|
LOG.debug("Replacing GetMesh caps with wrapped versions")
|
|
wrappable_caps = {"GetMesh2", "GetMesh", "GetTexture", "ViewerAsset"}
|
|
for cap_name in wrappable_caps:
|
|
if cap_name in parsed:
|
|
parsed[cap_name] = region.register_wrapper_cap(cap_name)
|
|
# Send the client the URLs for any proxy-only caps it requested
|
|
for cap_name in flow.metadata['needed_proxy_caps']:
|
|
parsed[cap_name] = region.cap_urls[cap_name]
|
|
flow.response.content = llsd.format_xml(parsed)
|
|
elif cap_data.cap_name == "EventQueueGet":
|
|
parsed_eq_resp = llsd.parse_xml(flow.response.content)
|
|
if parsed_eq_resp:
|
|
old_events = parsed_eq_resp["events"]
|
|
new_events = []
|
|
for event in old_events:
|
|
if not self._handle_eq_event(cap_data.session(), region, event):
|
|
new_events.append(event)
|
|
# Add on any fake events that've been queued by addons
|
|
eq_manager = cap_data.region().eq_manager
|
|
new_events.extend(eq_manager.take_injected_events())
|
|
parsed_eq_resp["events"] = new_events
|
|
# Empty event list is an error, need to return undef instead.
|
|
if old_events and not new_events:
|
|
parsed_eq_resp = None
|
|
# HACK: see note in above request handler for EventQueueGet
|
|
req_ack_id = llsd.parse_xml(flow.request.content)["ack"]
|
|
eq_manager.cache_last_poll_response(req_ack_id, parsed_eq_resp)
|
|
flow.response.content = llsd.format_xml(parsed_eq_resp)
|
|
elif cap_data.cap_name in self.UPLOAD_CREATING_CAPS:
|
|
if not region:
|
|
return
|
|
parsed = llsd.parse_xml(flow.response.content)
|
|
if "uploader" in parsed:
|
|
region.register_cap(cap_data.cap_name + "Uploader", parsed["uploader"], CapType.TEMPORARY)
|
|
except:
|
|
LOG.exception("OOPS, blew up in HTTP proxy!")
|
|
|
|
def _handle_login_flow(self, flow: HippoHTTPFlow):
|
|
resp = xmlrpc.client.loads(flow.response.content)[0][0] # type: ignore
|
|
sess = self.session_manager.create_session(resp)
|
|
AddonManager.handle_session_init(sess)
|
|
flow.cap_data = CapData("LoginRequest", session=weakref.ref(sess))
|
|
|
|
def _handle_eq_event(self, session: Session, region: ProxiedRegion, event: Dict[str, Any]):
|
|
LOG.debug("Event received on %r: %r" % (self, event))
|
|
message_logger = self.session_manager.message_logger
|
|
if message_logger:
|
|
message_logger.log_eq_event(session, region, event)
|
|
|
|
if self.llsd_message_serializer.can_handle(event["message"]):
|
|
msg = self.llsd_message_serializer.deserialize(event)
|
|
else:
|
|
msg = Message.from_eq_event(event)
|
|
msg.sender = region.circuit_addr
|
|
msg.direction = Direction.IN
|
|
|
|
try:
|
|
region.message_handler.handle(msg)
|
|
except:
|
|
LOG.exception("Failed while handling EQ message")
|
|
|
|
handle_event = AddonManager.handle_eq_event(session, region, event)
|
|
if handle_event is True:
|
|
# Addon handled the event and didn't want it sent to the viewer
|
|
return True
|
|
|
|
# Handle events that inform us about new regions
|
|
sim_addr, sim_handle, sim_seed = None, None, None
|
|
# Sim is asking us to talk to a neighbour
|
|
if event["message"] == "EstablishAgentCommunication":
|
|
ip_split = event["body"]["sim-ip-and-port"].split(":")
|
|
sim_addr = (ip_split[0], int(ip_split[1]))
|
|
sim_seed = event["body"]["seed-capability"]
|
|
# We teleported or cross region, opening comms to new sim
|
|
elif msg and msg.name in ("TeleportFinish", "CrossedRegion"):
|
|
sim_block = msg.get_blocks("RegionData", msg.get_blocks("Info"))[0]
|
|
sim_addr = (sim_block["SimIP"], sim_block["SimPort"])
|
|
sim_handle = sim_block["RegionHandle"]
|
|
sim_seed = sim_block["SeedCapability"]
|
|
# Sim telling us about a neighbour
|
|
elif msg and msg.name == "EnableSimulator":
|
|
sim_block = msg["SimulatorInfo"][0]
|
|
sim_addr = (sim_block["IP"], sim_block["Port"])
|
|
sim_handle = sim_block["Handle"]
|
|
|
|
# Register a region if this message was telling us about a new one
|
|
if sim_addr is not None:
|
|
session.register_region(sim_addr, handle=sim_handle, seed_url=sim_seed)
|
|
return False
|