diff --git a/hippolyzer/apps/proxy_leapagent.py b/hippolyzer/apps/proxy_leapagent.py deleted file mode 100644 index 5974306..0000000 --- a/hippolyzer/apps/proxy_leapagent.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Stub for forwarding LEAP stdin/stdout to hippolyzer over TCP using netcat. - -To be replaced with a nicer thing later. - -Really not much use to anyone but me until viewers correctly un-gate LEAP access :) -Hint: uncomment https://vcs.firestormviewer.org/phoenix-firestorm/files/cf85e854/indra/newview/llappviewer.cpp#L1398-1420 - -Usage: While hippolyzer-leapreceiver is running - ./firestorm --leap hippolyzer-leapagent -""" -import multiprocessing -import os -import shutil - - -def agent_main(): - nc_exe = None - for possible_cat in ["nc", "ncat", "netcat"]: - if cat_path := shutil.which(possible_cat): - nc_exe = cat_path - - if not nc_exe: - raise ValueError("Couldn't find an acceptable netcat in PATH!") - - os.execv(nc_exe, [nc_exe, "127.0.0.1", "9063"]) - - -if __name__ == "__main__": - multiprocessing.freeze_support() - agent_main() diff --git a/hippolyzer/apps/proxy_leapreceiver.py b/hippolyzer/apps/proxy_leapreceiver.py deleted file mode 100644 index 0a1d502..0000000 --- a/hippolyzer/apps/proxy_leapreceiver.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Simple stub for testing receiving inbound LEAP connections over TCP - -To be removed at some point once this is supported by the proxy itself. -For now it's a bunch of example uses of the APIs. -""" - -from typing import * - -import asyncio -import logging -import multiprocessing -import pprint - -from hippolyzer.lib.proxy.leap import ( - LEAPBridgeServer, - LEAPClient, - UIPath, - LLCommandDispatcherWrapper, - LLWindowWrapper, - LLUIWrapper, - LLViewerControlWrapper, -) - - -async def client_connected(client: LEAPClient): - # Kick off a request to get ops for each API supported by the viewer - # Won't wait for a response from the viewer between each send - api_futs: Dict[Awaitable, str] = {} - for api_name in (await client.sys_command("getAPIs")).keys(): - api_fut = client.sys_command("getAPI", {"api": api_name}) - api_futs[api_fut] = api_name - - # Wait for all of our getAPI commands to complete in parallel - for fut in (await asyncio.wait(api_futs.keys()))[0]: - # Print out which API this future even relates to - print("=" * 5, api_futs[fut], "=" * 5) - # List supported ops for this api - pprint.pprint(await fut) - - # Subscribe to StartupState events within this scope - async with client.listen_scoped("StartupState") as get_event: - # Get a single StartupState event then continue - pprint.pprint(await get_event()) - - # More manual version of above that gives you a Queue you can pass around - # A None gets posted to the mainloop every time the viewer restarts the main loop, - # so we can rely on _something_ being published to this. - llapp_queue = await client.listen("mainloop") - try: - pprint.pprint(await llapp_queue.get()) - llapp_queue.task_done() - finally: - await client.stop_listening(llapp_queue) - - # A simple command with a reply - pprint.pprint(await client.command("LLFloaterReg", "getBuildMap")) - - # A simple command that has no reply, or has a reply we don't care about. - client.void_command("LLFloaterReg", "showInstance", {"name": "preferences"}) - - # Some commands must be executed against the dynamically assigned command - # pump that's specific to our LEAP listener. `sys_command()` is the same as - # `command()` except it internally addresses whatever the system command pump is. - await client.sys_command("ping") - - # Print out all the commands supported by LLCommandDispatcher - cmd_dispatcher_api = LLCommandDispatcherWrapper(client) - pprint.pprint(await cmd_dispatcher_api.enumerate()) - - # Spawn the test textbox floater - client.void_command("LLFloaterReg", "showInstance", {"name": "test_textbox"}) - - # LEAP allows addressing UI elements by "path". We expose that through a pathlib-like interface - # to allow composing UI element paths. - textbox_path = UIPath.for_floater("floater_test_textbox") / "long_text_editor" - # Click the "long_text_editor" in the test textbox floater. - window_api = LLWindowWrapper(client) - await window_api.mouse_click(button="LEFT", path=textbox_path) - - # Clear out the textbox, note that this does _not_ work when path is specified! - # TODO: clearing a textbox isn't so nice. CTL+A doesn't work as expected even without a path, - # it leaves a capital "A" in the text editor. We get rid of it by doing backspace right after. - window_api.key_press(mask=["CTL"], keysym="a") - window_api.key_press(keysym="Backsp") - - # Type some text - window_api.text_input("Also I can type in here pretty good.") - - # Print out the value of the textbox we just typed in - ui_api = LLUIWrapper(client) - pprint.pprint(await ui_api.get_value(textbox_path)) - - # But you don't need to explicitly give input focus like above, you can send keypresses - # directly to a path. - monospace_path = UIPath.for_floater("floater_test_textbox") / "monospace_text_editor" - window_api.text_input("I typed in here by path.", path=monospace_path) - - # We can also access the viewer config to reason about viewer state. - viewer_control_api = LLViewerControlWrapper(client) - pprint.pprint(await viewer_control_api.get("Global", "StatsPilotFile")) - # Print the first ten vars in the "Global" group - pprint.pprint((await viewer_control_api.vars("Global"))[:10]) - - -def receiver_main(): - logging.basicConfig(level=logging.INFO) - loop = asyncio.get_event_loop() - - server = LEAPBridgeServer(client_connected) - coro = asyncio.start_server(server.handle_connection, "127.0.0.1", 9063) - loop.run_until_complete(coro) - loop.run_forever() - - -if __name__ == "__main__": - multiprocessing.freeze_support() - receiver_main() diff --git a/hippolyzer/lib/proxy/leap.py b/hippolyzer/lib/proxy/leap.py deleted file mode 100644 index c29f978..0000000 --- a/hippolyzer/lib/proxy/leap.py +++ /dev/null @@ -1,541 +0,0 @@ -""" -Tooling for working with the SL viewer's LEAP integration: now with 100% less eventlet - -TODO: split this out into its own package -TODO: Support playback of VITA event recording format snippets? Does anyone actually use those? -""" - -from __future__ import annotations - -from typing import * - -import abc -import asyncio -import collections -import contextlib -import dataclasses -import enum -import logging -import pathlib -import posixpath -import uuid -import weakref - -from hippolyzer.lib.base import llsd - - -class LEAPClient: - """Client for script -> viewer communication over the LEAP protocol""" - def __init__(self, protocol: AbstractLEAPProtocol): - self._protocol = protocol - # Pump used for receiving replies - self._reply_pump: Optional[str] = None - # Pump used for sending leap meta-commands to the viewer (getAPIs, etc.) - self.cmd_pump: Optional[str] = None - # Map of req id -> future held by requester to send responses to - self._reply_futs: Dict[uuid.UUID, asyncio.Future] = {} - self._pump_listeners: Dict[str, ListenerDetails] = collections.defaultdict(ListenerDetails) - self._connection_status = ConnectionStatus.READY - - @property - def connected(self) -> bool: - return self._connection_status == ConnectionStatus.CONNECTED - - async def connect(self) -> None: - """Receive the "hello" message from the viewer and start the message pump""" - assert self._connection_status == ConnectionStatus.READY - self._connection_status = ConnectionStatus.CONNECTING - - try: - welcome_message = await self._protocol.read_message() - self._reply_pump = welcome_message['pump'] - self.cmd_pump = welcome_message['data']['command'] - - self._connection_status = ConnectionStatus.CONNECTED - self._start_message_pump() - except: - self.disconnect() - raise - - def _start_message_pump(self) -> None: - """Read and handle inbound messages in a background task""" - async def _pump_messages_forever(): - try: - while not self._protocol.closed: - self.handle_message(await self._protocol.read_message()) - except asyncio.IncompleteReadError: - pass - finally: - self.disconnect() - - # Should naturally stop on its own when disconnect is called by virtue of - # the incomplete read. - asyncio.get_event_loop().create_task(_pump_messages_forever()) - - def disconnect(self) -> None: - """Close the connection and clean up any pending request futures""" - if self.connected: - logging.info('closing LEAP connection') - self._connection_status = ConnectionStatus.DISCONNECTED - self._protocol.close() - - # Clean up any pending request futures - for fut in list(self._reply_futs.values()): - if not fut.done(): - fut.cancel() - self._reply_futs.clear() - # TODO: Give anything that cares about disconnects a signal that it's happened - # keep around Task handles and cancel those instead? - self._pump_listeners.clear() - - def sys_command(self, op: str, data: Optional[Dict] = None) -> Optional[Awaitable]: - """Make a request to an internal LEAP method over the command pump""" - return self.command(self.cmd_pump, op, data) - - def command(self, pump: str, op: str, data: Optional[Dict] = None) -> Optional[Awaitable]: - """Make a request to an internal LEAP method using the standard command form (op in data)""" - data = data.copy() if data else {} - data['op'] = op - return self.post(pump, data, expect_reply=True) - - def void_command(self, pump: str, op: str, data: Optional[Dict] = None) -> None: - """Like `command()`, but we don't expect a reply.""" - data = data.copy() if data else {} - data['op'] = op - self.post(pump, data, expect_reply=False) - - def post(self, pump: str, data: Any, expect_reply: bool) -> Optional[Awaitable]: - """ - Post an event to the other side's `pump`. - - Post the event is done synchronously, only waiting for the reply is done async. - """ - assert self.connected - fut = None - # If we expect a reply to this event, we need to do some extra bookkeeping. - # There are apparently some commands for which we can never expect to get a reply. - # Don't add a reqid or reply fut map entry in that case, since it will never be resolved. - if expect_reply: - # If you don't pass in a dict for data, we have nowhere to stuff `reqid`. - # That means no reply tracking, meaning no future. - if not isinstance(data, dict): - raise ValueError(f"Must send a dict in `data` if you want a reply, you sent {data!r}") - # We need to mutate the dict, make a copy so that we don't mess with the caller's version. - data = data.copy() - # Tell the viewer the pump to send replies to - data["reply"] = self._reply_pump - - req_id = uuid.uuid4() - data["reqid"] = req_id - - fut = asyncio.Future() - # The future will be cleaned up when the Future is done. - fut.add_done_callback(self._cleanup_request_future) - self._reply_futs[req_id] = fut - - self._protocol.write_message(pump, data) - return fut - - @contextlib.asynccontextmanager - async def listen_scoped(self, source_pump: str) -> AsyncContextManager[Callable[[], Awaitable[Any]]]: - """Subscribe to events published on source_pump, allow awaiting them""" - msg_queue = await self.listen(source_pump) - - async def _get_wrapper(): - # TODO: handle disconnection while awaiting new Queue message - msg = await msg_queue.get() - - # Consumption is completion - msg_queue.task_done() - return msg - - try: - yield _get_wrapper - finally: - try: - await self.stop_listening(msg_queue) - except KeyError: - pass - - async def listen(self, source_pump: str) -> asyncio.Queue: - """Start listening to `source_pump`, placing its messages in the returned asyncio Queue""" - assert self.connected - - msg_queue = asyncio.Queue() - - listener_details = self._pump_listeners[source_pump] - had_listeners = bool(listener_details.queues) - listener_details.queues.add(msg_queue) - - if not had_listeners: - # Nothing was listening to this before, need to ask for its events to be - # sent over LEAP. - await self.sys_command("listen", { - "listener": listener_details.listener, - "source": source_pump, - }) - return msg_queue - - async def stop_listening(self, msg_queue: asyncio.Queue) -> None: - """Stop sending a pump's messages to msg_queue, potentially removing the listen on the pump""" - for source_pump, listener_details in self._pump_listeners.items(): - queues = listener_details.queues - if msg_queue in queues: - listener_details.queues.remove(msg_queue) - if self.connected and not queues: - # Nobody cares about these events anymore, ask LEAP to stop sending them - await self.sys_command("stoplistening", { - "listener": listener_details.listener, - "source": source_pump, - }) - return - raise KeyError(f"Couldn't find {msg_queue!r} in pump listeners") - - def handle_message(self, message: Any) -> bool: - """Handle an inbound message and try to route it to the right recipient""" - if not isinstance(message, dict): - logging.warning(f"Received a non-map message: {message!r}") - return False - - pump = message.get("pump") - data = message.get("data") - if pump == self._reply_pump: - # This is a reply for a request - if not isinstance(data, dict): - logging.warning(f"Received a non-map reply over the reply pump: {message!r}") - return False - - # reqid can tell us what future needs to be resolved, if any. - reqid = data.get("reqid") - fut = self._reply_futs.get(reqid) - if not fut: - logging.warning(f"Received a reply over the reply pump with no reqid or future: {message!r}") - return False - # We don't actually care about the reqid, pop it off - data.pop("reqid") - # Notify anyone awaiting the response - fut.set_result(data) - - # Might be related to a listener we registered - # Don't warn if we get a message with an empty listener_details.queues because - # We may still be receiving messages from before we stopped listening - # The main concerning case if is we receive a message for something we _never_ - # registered a listener for. - elif (listener_details := self._pump_listeners.get(pump)) is not None: - for queue in listener_details.queues: - queue.put_nowait(data) - else: - logging.warning(f"Received a message for unknown pump: {message!r}") - return True - - def _cleanup_request_future(self, req_fut: asyncio.Future) -> None: - """Remove a completed future from the reply map""" - for key, value in self._reply_futs.items(): - if value == req_fut: - del self._reply_futs[key] - return - - -class ConnectionStatus(enum.Enum): - READY = enum.auto() - CONNECTING = enum.auto() - CONNECTED = enum.auto() - DISCONNECTED = enum.auto() - - -@dataclasses.dataclass -class ListenerDetails: - # We can only have one listener with a given name active at a time. Give each listener a unique name. - listener: Optional[str] = dataclasses.field(default_factory=lambda: "PythonListener-%s" % uuid.uuid4()) - queues: Set[asyncio.Queue] = dataclasses.field(default_factory=set) - - -class AbstractLEAPProtocol(abc.ABC): - """Interface for a class representing communication with a LEAP peer""" - closed: bool - - @abc.abstractmethod - def close(self) -> None: - pass - - @abc.abstractmethod - def write_message(self, pump: str, data: Any) -> None: - pass - - @abc.abstractmethod - async def read_message(self) -> Dict: - pass - - -class LEAPProtocol(AbstractLEAPProtocol): - """Wrapper for communication with a LEAP peer over an asyncio reader/writer pair""" - def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - self._reader = reader - self._writer = writer - self._parser = llsd.LLSDNotationParser() - self._formatter = llsd.LLSDNotationFormatter() - self._drain_task = None - - @property - def closed(self) -> bool: - return self._writer.is_closing() or self._reader.at_eof() - - def close(self): - if not self._writer.is_closing(): - self._writer.write_eof() - self._writer.close() - - def write_message(self, pump: str, data: Any) -> None: - assert not self._writer.is_closing() - ser = self._formatter.format({"pump": pump, "data": data}) - payload = bytearray(str(len(ser)).encode("utf8")) - payload.extend(b":") - payload.extend(ser) - self._writer.write(payload) - # We're in sync context, we need to schedule draining the socket, which is async. - # If a drain is already scheduled then we don't need to reschedule. - if not self._drain_task: - self._drain_task = asyncio.create_task(self._drain_soon()) - - async def _drain_soon(self) -> None: - self._drain_task = None - await self._writer.drain() - - async def read_message(self) -> Dict: - assert not self._reader.at_eof() - - # Length is everything up until the first colon we see, stripping the colon off. - length = int((await self._reader.readuntil(b':'))[:-1].decode("utf8")) - if length > 0xffFFff: - raise ValueError(f"Unreasonable LEAP payload length of {length}") - # Everything after the colon is LLSD - parsed = self._parser.parse(await self._reader.readexactly(length)) - if not isinstance(parsed, dict): - raise ValueError(f"Expected LEAP message to be a dict, got {parsed!r}") - return parsed - - -class LEAPAPIWrapper(abc.ABC): - """Base class for classes wrapping specific LEAP APIs""" - PUMP_NAME: Optional[str] = None - - def __init__(self, client: LEAPClient, pump_name: Optional[str] = None): - super().__init__() - self._client = client - self._pump_name = pump_name or self.PUMP_NAME - assert self._pump_name - - -class UIPath(pathlib.PurePosixPath): - __slots__ = [] - - @classmethod - def for_floater(cls, floater_name: str) -> UIPath: - return cls("/main_view/menu_stack/world_panel/Floater View") / floater_name - - def __str__(self) -> str: - """Like the base __str__ except ".." and "." segments will be resolved.""" - return posixpath.normpath(super().__str__()) - - -async def _data_unwrapper(data_fut: Awaitable[Dict], inner_elem: str) -> Any: - """Unwraps part of the data future while allowing the request itself to remain synchronous""" - # We want the request to be sent immediately, without requiring the request to be `await`ed first, - # but that means that we have to return a `Coroutine` that will pull the value out of the dict - # rather than directly returning the `Future`. - return (await data_fut)[inner_elem] - - -UI_PATH_TYPE = Optional[Union[str, UIPath]] - - -class LLWindowWrapper(LEAPAPIWrapper): - PUMP_NAME = "LLWindow" - - MASK_TYPE = Optional[Collection[str]] - KEYCODE_TYPE = Optional[int] - KEYSYM_TYPE = Optional[str] - CHAR_TYPE = Optional[str] - MOUSE_COORD_TYPE: Optional[int] - - def _convert_key_payload(self, /, *, keycode: KEYCODE_TYPE, keysym: KEYSYM_TYPE, char: CHAR_TYPE, - mask: MASK_TYPE, path: UI_PATH_TYPE) -> Dict: - if keycode is not None: - payload = {"keycode": keycode} - elif keysym is not None: - payload = {"keysym": keysym} - elif char is not None: - payload = {"char": char} - else: - raise ValueError("Didn't have one of keycode, keysym or char") - - if path: - payload["path"] = str(path) - if mask: - payload["mask"] = mask - - return payload - - def key_down(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None, - keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None: - """Simulate a key being pressed down""" - payload = self._convert_key_payload(keysym=keysym, keycode=keycode, char=char, mask=mask, path=path) - self._client.void_command(self._pump_name, "keyDown", payload) - - def key_up(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None, - keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None: - """Simulate a key being released""" - payload = self._convert_key_payload(keysym=keysym, keycode=keycode, char=char, mask=mask, path=path) - self._client.void_command(self._pump_name, "keyUp", payload) - - def key_press(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None, - keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None: - """Simulate a key being pressed down and immediately released""" - self.key_down(mask=mask, keycode=keycode, keysym=keysym, char=char, path=path) - self.key_up(mask=mask, keycode=keycode, keysym=keysym, char=char, path=path) - - def text_input(self, text_input: str, path: UI_PATH_TYPE = None) -> None: - """Simulate a user typing a string of text""" - # TODO: Uhhhhh I can't see how the key* APIs could possibly handle i18n correctly, - # what with all the U8s. Maybe I'm just dumb? - for char in text_input: - self.key_press(char=char, path=path) - - async def get_paths(self, under: UI_PATH_TYPE = "") -> Dict: - """Get all UI paths under the root, or under a path if specified""" - return await self._client.command(self._pump_name, "getPaths", {"under": str(under)}) - - async def get_info(self, path: UI_PATH_TYPE) -> Dict: - """Get info about an element specified by path""" - return await self._client.command(self._pump_name, "getInfo", {"path": str(path)}) - - def _build_mouse_payload(self, /, *, x: MOUSE_COORD_TYPE, y: MOUSE_COORD_TYPE, path: UI_PATH_TYPE, - mask: MASK_TYPE, button: str = None) -> Dict: - if path is not None: - payload = {"path": str(path)} - elif x is not None and y is not None: - payload = {"x": x, "y": y} - else: - raise ValueError("Didn't have one of x + y or path") - - if mask: - payload["mask"] = mask - if button: - payload["button"] = button - - return payload - - def mouse_down(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None, - path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]: - """Simulate a mouse down event occurring at a coordinate or UI element path""" - payload = self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button) - return self._client.command(self._pump_name, "mouseDown", payload) - - def mouse_up(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None, - path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]: - """Simulate a mouse up event occurring at a coordinate or UI element path""" - payload = self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button) - return self._client.command(self._pump_name, "mouseUp", payload) - - def mouse_click(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None, - path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]: - """Simulate a mouse down and immediately following mouse up event""" - # We're going to ignore the mouseDown response, so use void_command instead. - # Most side effects are actually executed on mouseUp. - self._client.void_command(self._pump_name, "mouseDown", - self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button)) - return self.mouse_up(x=x, y=y, path=path, mask=mask, button=button) - - def mouse_move(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None, - path: UI_PATH_TYPE = None) -> Awaitable[Dict]: - """Move the mouse to the coordinates or path specified""" - payload = self._build_mouse_payload(x=x, y=y, path=path, mask=None) - return self._client.command(self._pump_name, "mouseMove", payload) - - def mouse_scroll(self, clicks: int) -> None: - """Act as if the scroll wheel has been moved `clicks` amount. May be negative""" - self._client.command(self._pump_name, "mouseScroll", {"clicks": clicks}) - - -class LLUIWrapper(LEAPAPIWrapper): - PUMP_NAME = "UI" - - def call(self, function: str, parameter: Any = None) -> None: - """ - Invoke the `function` operation as if from a menu or button click, passing `parameter` - - Can call most things registered through `LLUICtrl::CommitCallbackRegistry`. - """ - self._client.void_command(self._pump_name, "call", {"function": function, "parameter": parameter}) - - def get_value(self, path: UI_PATH_TYPE) -> Awaitable[Any]: - """For the UI control identified by `path`, return the current value in `value`""" - resp_fut = self._client.command(self._pump_name, "getValue", {"path": str(path)}) - return _data_unwrapper(resp_fut, "value") - - -class LLCommandDispatcherWrapper(LEAPAPIWrapper): - PUMP_NAME = "LLCommandDispatcher" - - def dispatch( - self, - cmd: str, - /, - *, - params: Optional[Collection[str]] = None, - query: Optional[Dict[str, str]] = None, - trusted: bool = True, - ) -> None: - """Execute a command registered as an LLCommandHandler""" - return self._client.void_command(self._pump_name, "dispatch", { - "cmd": cmd, - "params": params or [], - "query": query or {}, - "trusted": trusted, - }) - - def enumerate(self) -> Awaitable[Dict]: - """Get map of registered LLCommandHandlers, containing name, key, and (e.g.) untrusted flag""" - return self._client.command(self._pump_name, "enumerate") - - -class LLViewerControlWrapper(LEAPAPIWrapper): - PUMP_NAME = "LLViewerControl" - - def get(self, group: str, key: str) -> Awaitable[Any]: - """ - Get value of a Control (config) key - - `group` can be one of "CrashSettings", "Global", "PerAccount", "Warnings". - """ - return self._client.command(self._pump_name, "get", {"key": key, "group": group}) - - def vars(self, group: str) -> Awaitable[List[Dict]]: - """Return a list of dicts of controls in `group`""" - resp_fut = self._client.command(self._pump_name, "vars", {"group": group}) - return _data_unwrapper(resp_fut, "vars") - - def set(self, group: str, key: str, value: Any) -> None: - """ - Set a configuration value - - TODO: error handling based on "error" field in resp? - """ - self._client.void_command(self._pump_name, "set", {"key": key, "group": group, "value": value}) - - -class LEAPBridgeServer: - """LEAP Bridge TCP server to use with asyncio.start_server()""" - - def __init__(self, client_connected_cb: Optional[Callable[[LEAPClient]], Awaitable[Any]] = None): - self.clients: weakref.WeakSet[LEAPClient] = weakref.WeakSet() - self._client_connected_cb = client_connected_cb - - async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - client = LEAPClient(LEAPProtocol(reader, writer)) - logging.info('Accepting LEAP connection from %r' % (writer.get_extra_info("peername", None),)) - await client.connect() - - self.clients.add(client) - if self._client_connected_cb: - await self._client_connected_cb(client) diff --git a/setup.py b/setup.py index ce0919b..0a06c2c 100644 --- a/setup.py +++ b/setup.py @@ -76,8 +76,6 @@ setup( 'console_scripts': { 'hippolyzer-gui = hippolyzer.apps.proxy_gui:gui_main', 'hippolyzer-cli = hippolyzer.apps.proxy:main', - 'hippolyzer-leapagent = hippolyzer.apps.proxy_leapagent:agent_main', - 'hippolyzer-leapreceiver = hippolyzer.apps.proxy_leapreceiver:receiver_main', } }, zip_safe=False,