Add base addon for creating proxy-only caps based on ASGI apps
This commit is contained in:
@@ -7,72 +7,43 @@ while developing the viewer-side pieces of it.
|
||||
Implements a cap that accepts an `obj_id` UUID query parameter and returns
|
||||
the name of the object.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import asgiref.wsgi
|
||||
|
||||
from mitmproxy import http
|
||||
from flask import Flask, Response, request
|
||||
|
||||
from hippolyzer.lib.base.datatypes import UUID
|
||||
from hippolyzer.lib.proxy.addon_utils import BaseAddon
|
||||
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
|
||||
from hippolyzer.lib.proxy.region import ProxiedRegion
|
||||
from hippolyzer.lib.proxy.sessions import Session, SessionManager
|
||||
from hippolyzer.lib.proxy import addon_ctx
|
||||
from hippolyzer.lib.proxy.webapp_cap_addon import WebAppCapAddon
|
||||
|
||||
app = Flask("GetObjectNameCapApp")
|
||||
|
||||
|
||||
class MockProxyCapExampleAddon(BaseAddon):
|
||||
def handle_region_registered(self, session: Session, region: ProxiedRegion):
|
||||
# Register a fake URL for our cap. This will add the cap URL to the Seed
|
||||
# response that gets sent back to the client if that cap name was requested.
|
||||
if "GetObjectNameExample" not in region.cap_urls:
|
||||
region.register_proxy_cap("GetObjectNameExample")
|
||||
@app.route('/')
|
||||
async def get_object_name():
|
||||
# Should always have the current region, the cap handler is bound to one.
|
||||
# Just need to pull it from the `addon_ctx` module's global.
|
||||
obj_mgr = addon_ctx.region.get().objects
|
||||
obj_id = UUID(request.args['obj_id'])
|
||||
obj = obj_mgr.lookup_fullid(obj_id)
|
||||
if not obj:
|
||||
return Response(f"Couldn't find {obj_id!r}", status=404, mimetype="text/plain")
|
||||
|
||||
def handle_http_request(self, session_manager: SessionManager, flow: HippoHTTPFlow):
|
||||
if flow.cap_data.cap_name != "GetObjectNameExample":
|
||||
return
|
||||
if flow.request.method != "GET":
|
||||
return
|
||||
# This request may take a while to generate a response for, take it out of the normal
|
||||
# HTTP handling flow and handle it in a async task.
|
||||
# TODO: Make all HTTP handling hooks async so this isn't necessary
|
||||
self._schedule_task(self._handle_request(flow.take()))
|
||||
try:
|
||||
await asyncio.wait_for(obj_mgr.request_object_properties(obj)[0], 1.0)
|
||||
except asyncio.TimeoutError:
|
||||
return Response(f"Timed out requesting {obj_id!r}'s properties", status=500, mimetype="text/plain")
|
||||
|
||||
async def _handle_request(self, flow: HippoHTTPFlow):
|
||||
try:
|
||||
obj_id = UUID(flow.request.query['obj_id'])
|
||||
obj_mgr = flow.cap_data.region().objects
|
||||
obj = obj_mgr.lookup_fullid(obj_id)
|
||||
if not obj:
|
||||
flow.response = http.Response.make(
|
||||
status_code=404,
|
||||
content=f"Couldn't find {obj_id!r}".encode("utf8"),
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
flow.release()
|
||||
return
|
||||
return Response(obj.Name, mimetype="text/plain")
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(obj_mgr.request_object_properties(obj)[0], 1.0)
|
||||
except asyncio.TimeoutError:
|
||||
flow.response = http.Response.make(
|
||||
status_code=404,
|
||||
content=f"Timed out requesting {obj_id!r}".encode("utf8"),
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
flow.release()
|
||||
return
|
||||
|
||||
flow.response = http.Response.make(
|
||||
content=obj.Name.encode("utf8"),
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
flow.release()
|
||||
except:
|
||||
flow.response = http.Response.make(
|
||||
status_code=500,
|
||||
content=b"The server is on fire",
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
flow.release()
|
||||
class MockProxyCapExampleAddon(WebAppCapAddon):
|
||||
# A cap URL with this name will be tied to each region when
|
||||
# the sim is first connected to. The URL will be returned to the
|
||||
# viewer in the Seed if the viewer requests it by name.
|
||||
CAP_NAME = "GetObjectNameExample"
|
||||
# Any asgi app should be fine.
|
||||
APP = asgiref.wsgi.WsgiToAsgi(app)
|
||||
|
||||
|
||||
addons = [MockProxyCapExampleAddon()]
|
||||
|
||||
@@ -7,6 +7,7 @@ import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import mitmproxy.ctx
|
||||
import mitmproxy.exceptions
|
||||
|
||||
from hippolyzer.lib.base import llsd
|
||||
@@ -131,6 +132,9 @@ def start_proxy(session_manager: SessionManager, extra_addons: Optional[list] =
|
||||
daemon=True,
|
||||
)
|
||||
http_proc.start()
|
||||
# These need to be set for mitmproxy's ASGIApp serving code to work.
|
||||
mitmproxy.ctx.master = None
|
||||
mitmproxy.ctx.log = logging.getLogger("mitmproxy log")
|
||||
|
||||
server = SLSOCKS5Server(session_manager)
|
||||
coro = asyncio.start_server(server.handle_connection, proxy_host, udp_proxy_port)
|
||||
|
||||
@@ -83,8 +83,10 @@ class MITMProxyEventManager:
|
||||
finally:
|
||||
# If someone has taken this request out of the regular callback flow,
|
||||
# they'll manually send a callback at some later time.
|
||||
if not flow.taken:
|
||||
self.to_proxy_queue.put(("callback", flow.id, flow.get_state()))
|
||||
if not flow.taken and not flow.resumed:
|
||||
# Addon hasn't taken ownership of this flow, send it back to mitmproxy
|
||||
# ourselves.
|
||||
flow.resume()
|
||||
|
||||
def _handle_request(self, flow: HippoHTTPFlow):
|
||||
url = flow.request.url
|
||||
|
||||
@@ -22,13 +22,14 @@ class HippoHTTPFlow:
|
||||
Hides the nastiness of writing to flow.metadata so we can pass
|
||||
state back and forth between the two proxies
|
||||
"""
|
||||
__slots__ = ("flow", "callback_queue")
|
||||
__slots__ = ("flow", "callback_queue", "resumed", "taken")
|
||||
|
||||
def __init__(self, flow: HTTPFlow, callback_queue: Optional[multiprocessing.Queue] = None):
|
||||
self.flow: HTTPFlow = flow
|
||||
self.resumed = False
|
||||
self.taken = False
|
||||
self.callback_queue = weakref.ref(callback_queue) if callback_queue else None
|
||||
meta = self.flow.metadata
|
||||
meta.setdefault("taken", False)
|
||||
meta.setdefault("can_stream", True)
|
||||
meta.setdefault("response_injected", False)
|
||||
meta.setdefault("request_injected", False)
|
||||
@@ -98,19 +99,18 @@ class HippoHTTPFlow:
|
||||
# context is kind of janky. The HTTP callback handling code should probably
|
||||
# be made totally async, including the addon hooks. Would coroutine per-callback
|
||||
# be expensive?
|
||||
self.metadata["taken"] = True
|
||||
assert not self.taken and not self.resumed
|
||||
self.taken = True
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
def resume(self):
|
||||
"""Release the HTTP flow back to the normal processing flow"""
|
||||
assert self.taken and self.callback_queue
|
||||
self.metadata["taken"] = False
|
||||
assert self.callback_queue
|
||||
assert not self.resumed
|
||||
self.taken = False
|
||||
self.resumed = True
|
||||
self.callback_queue().put(("callback", self.flow.id, self.get_state()))
|
||||
|
||||
@property
|
||||
def taken(self) -> bool:
|
||||
return self.metadata["taken"]
|
||||
|
||||
@property
|
||||
def is_replay(self) -> bool:
|
||||
return bool(self.flow.is_replay)
|
||||
|
||||
@@ -136,9 +136,6 @@ class IPCInterceptionAddon:
|
||||
if event_type == "callback":
|
||||
orig_flow = self.intercepted_flows.pop(flow_id)
|
||||
orig_flow.set_state(flow_state)
|
||||
# Remove the taken flag from the flow if present, the flow by definition
|
||||
# isn't take()n anymore once it's been passed back to the proxy.
|
||||
orig_flow.metadata.pop("taken", None)
|
||||
elif event_type == "replay":
|
||||
flow: HTTPFlow = HTTPFlow.from_state(flow_state)
|
||||
# mitmproxy won't replay intercepted flows, this is an old flow so
|
||||
|
||||
46
hippolyzer/lib/proxy/webapp_cap_addon.py
Normal file
46
hippolyzer/lib/proxy/webapp_cap_addon.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import abc
|
||||
|
||||
from mitmproxy.addons import asgiapp
|
||||
from mitmproxy.controller import DummyReply
|
||||
|
||||
from hippolyzer.lib.proxy.addon_utils import BaseAddon
|
||||
from hippolyzer.lib.proxy.http_flow import HippoHTTPFlow
|
||||
from hippolyzer.lib.proxy.region import ProxiedRegion
|
||||
from hippolyzer.lib.proxy.sessions import Session, SessionManager
|
||||
|
||||
|
||||
async def serve(app, flow: HippoHTTPFlow):
|
||||
"""Serve a request based on a Hippolyzer HTTP flow using a provided app"""
|
||||
# Shove this on mitmproxy's flow object so asgiapp doesn't explode when it tries
|
||||
# to commit the flow reply. Our take / commit semantics are different than mitmproxy
|
||||
# proper, so we ignore what mitmproxy sets here anyhow.
|
||||
flow.flow.reply = DummyReply()
|
||||
flow.flow.reply.take()
|
||||
await asgiapp.serve(app, flow.flow)
|
||||
flow.flow.reply = None
|
||||
# Send the modified flow object back to mitmproxy
|
||||
flow.resume()
|
||||
|
||||
|
||||
class WebAppCapAddon(BaseAddon, abc.ABC):
|
||||
"""
|
||||
Addon that provides a cap via an ASGI webapp
|
||||
|
||||
Handles all registration of the cap URL and routing of the request.
|
||||
"""
|
||||
CAP_NAME: str
|
||||
APP: any
|
||||
|
||||
def handle_region_registered(self, session: Session, region: ProxiedRegion):
|
||||
# Register a fake URL for our cap. This will add the cap URL to the Seed
|
||||
# response that gets sent back to the client if that cap name was requested.
|
||||
if self.CAP_NAME not in region.cap_urls:
|
||||
region.register_proxy_cap(self.CAP_NAME)
|
||||
|
||||
def handle_http_request(self, session_manager: SessionManager, flow: HippoHTTPFlow):
|
||||
if flow.cap_data.cap_name != self.CAP_NAME:
|
||||
return
|
||||
# This request may take a while to generate a response for, take it out of the normal
|
||||
# HTTP handling flow and handle it in a async task.
|
||||
# TODO: Make all HTTP handling hooks async so this isn't necessary
|
||||
self._schedule_task(serve(self.APP, flow.take()))
|
||||
Reference in New Issue
Block a user