Remove LEAP-related code
It lives in https://github.com/SaladDais/outleap now. Hippolyzer-specific integration will be added back in later.
This commit is contained in:
@@ -1,31 +0,0 @@
|
||||
"""
|
||||
Stub for forwarding LEAP stdin/stdout to hippolyzer over TCP using netcat.
|
||||
|
||||
To be replaced with a nicer thing later.
|
||||
|
||||
Really not much use to anyone but me until viewers correctly un-gate LEAP access :)
|
||||
Hint: uncomment https://vcs.firestormviewer.org/phoenix-firestorm/files/cf85e854/indra/newview/llappviewer.cpp#L1398-1420
|
||||
|
||||
Usage: While hippolyzer-leapreceiver is running
|
||||
./firestorm --leap hippolyzer-leapagent
|
||||
"""
|
||||
import multiprocessing
|
||||
import os
|
||||
import shutil
|
||||
|
||||
|
||||
def agent_main():
|
||||
nc_exe = None
|
||||
for possible_cat in ["nc", "ncat", "netcat"]:
|
||||
if cat_path := shutil.which(possible_cat):
|
||||
nc_exe = cat_path
|
||||
|
||||
if not nc_exe:
|
||||
raise ValueError("Couldn't find an acceptable netcat in PATH!")
|
||||
|
||||
os.execv(nc_exe, [nc_exe, "127.0.0.1", "9063"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
multiprocessing.freeze_support()
|
||||
agent_main()
|
||||
@@ -1,118 +0,0 @@
|
||||
"""
|
||||
Simple stub for testing receiving inbound LEAP connections over TCP
|
||||
|
||||
To be removed at some point once this is supported by the proxy itself.
|
||||
For now it's a bunch of example uses of the APIs.
|
||||
"""
|
||||
|
||||
from typing import *
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import multiprocessing
|
||||
import pprint
|
||||
|
||||
from hippolyzer.lib.proxy.leap import (
|
||||
LEAPBridgeServer,
|
||||
LEAPClient,
|
||||
UIPath,
|
||||
LLCommandDispatcherWrapper,
|
||||
LLWindowWrapper,
|
||||
LLUIWrapper,
|
||||
LLViewerControlWrapper,
|
||||
)
|
||||
|
||||
|
||||
async def client_connected(client: LEAPClient):
|
||||
# Kick off a request to get ops for each API supported by the viewer
|
||||
# Won't wait for a response from the viewer between each send
|
||||
api_futs: Dict[Awaitable, str] = {}
|
||||
for api_name in (await client.sys_command("getAPIs")).keys():
|
||||
api_fut = client.sys_command("getAPI", {"api": api_name})
|
||||
api_futs[api_fut] = api_name
|
||||
|
||||
# Wait for all of our getAPI commands to complete in parallel
|
||||
for fut in (await asyncio.wait(api_futs.keys()))[0]:
|
||||
# Print out which API this future even relates to
|
||||
print("=" * 5, api_futs[fut], "=" * 5)
|
||||
# List supported ops for this api
|
||||
pprint.pprint(await fut)
|
||||
|
||||
# Subscribe to StartupState events within this scope
|
||||
async with client.listen_scoped("StartupState") as get_event:
|
||||
# Get a single StartupState event then continue
|
||||
pprint.pprint(await get_event())
|
||||
|
||||
# More manual version of above that gives you a Queue you can pass around
|
||||
# A None gets posted to the mainloop every time the viewer restarts the main loop,
|
||||
# so we can rely on _something_ being published to this.
|
||||
llapp_queue = await client.listen("mainloop")
|
||||
try:
|
||||
pprint.pprint(await llapp_queue.get())
|
||||
llapp_queue.task_done()
|
||||
finally:
|
||||
await client.stop_listening(llapp_queue)
|
||||
|
||||
# A simple command with a reply
|
||||
pprint.pprint(await client.command("LLFloaterReg", "getBuildMap"))
|
||||
|
||||
# A simple command that has no reply, or has a reply we don't care about.
|
||||
client.void_command("LLFloaterReg", "showInstance", {"name": "preferences"})
|
||||
|
||||
# Some commands must be executed against the dynamically assigned command
|
||||
# pump that's specific to our LEAP listener. `sys_command()` is the same as
|
||||
# `command()` except it internally addresses whatever the system command pump is.
|
||||
await client.sys_command("ping")
|
||||
|
||||
# Print out all the commands supported by LLCommandDispatcher
|
||||
cmd_dispatcher_api = LLCommandDispatcherWrapper(client)
|
||||
pprint.pprint(await cmd_dispatcher_api.enumerate())
|
||||
|
||||
# Spawn the test textbox floater
|
||||
client.void_command("LLFloaterReg", "showInstance", {"name": "test_textbox"})
|
||||
|
||||
# LEAP allows addressing UI elements by "path". We expose that through a pathlib-like interface
|
||||
# to allow composing UI element paths.
|
||||
textbox_path = UIPath.for_floater("floater_test_textbox") / "long_text_editor"
|
||||
# Click the "long_text_editor" in the test textbox floater.
|
||||
window_api = LLWindowWrapper(client)
|
||||
await window_api.mouse_click(button="LEFT", path=textbox_path)
|
||||
|
||||
# Clear out the textbox, note that this does _not_ work when path is specified!
|
||||
# TODO: clearing a textbox isn't so nice. CTL+A doesn't work as expected even without a path,
|
||||
# it leaves a capital "A" in the text editor. We get rid of it by doing backspace right after.
|
||||
window_api.key_press(mask=["CTL"], keysym="a")
|
||||
window_api.key_press(keysym="Backsp")
|
||||
|
||||
# Type some text
|
||||
window_api.text_input("Also I can type in here pretty good.")
|
||||
|
||||
# Print out the value of the textbox we just typed in
|
||||
ui_api = LLUIWrapper(client)
|
||||
pprint.pprint(await ui_api.get_value(textbox_path))
|
||||
|
||||
# But you don't need to explicitly give input focus like above, you can send keypresses
|
||||
# directly to a path.
|
||||
monospace_path = UIPath.for_floater("floater_test_textbox") / "monospace_text_editor"
|
||||
window_api.text_input("I typed in here by path.", path=monospace_path)
|
||||
|
||||
# We can also access the viewer config to reason about viewer state.
|
||||
viewer_control_api = LLViewerControlWrapper(client)
|
||||
pprint.pprint(await viewer_control_api.get("Global", "StatsPilotFile"))
|
||||
# Print the first ten vars in the "Global" group
|
||||
pprint.pprint((await viewer_control_api.vars("Global"))[:10])
|
||||
|
||||
|
||||
def receiver_main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
server = LEAPBridgeServer(client_connected)
|
||||
coro = asyncio.start_server(server.handle_connection, "127.0.0.1", 9063)
|
||||
loop.run_until_complete(coro)
|
||||
loop.run_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
multiprocessing.freeze_support()
|
||||
receiver_main()
|
||||
@@ -1,541 +0,0 @@
|
||||
"""
|
||||
Tooling for working with the SL viewer's LEAP integration: now with 100% less eventlet
|
||||
|
||||
TODO: split this out into its own package
|
||||
TODO: Support playback of VITA event recording format snippets? Does anyone actually use those?
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import *
|
||||
|
||||
import abc
|
||||
import asyncio
|
||||
import collections
|
||||
import contextlib
|
||||
import dataclasses
|
||||
import enum
|
||||
import logging
|
||||
import pathlib
|
||||
import posixpath
|
||||
import uuid
|
||||
import weakref
|
||||
|
||||
from hippolyzer.lib.base import llsd
|
||||
|
||||
|
||||
class LEAPClient:
|
||||
"""Client for script -> viewer communication over the LEAP protocol"""
|
||||
def __init__(self, protocol: AbstractLEAPProtocol):
|
||||
self._protocol = protocol
|
||||
# Pump used for receiving replies
|
||||
self._reply_pump: Optional[str] = None
|
||||
# Pump used for sending leap meta-commands to the viewer (getAPIs, etc.)
|
||||
self.cmd_pump: Optional[str] = None
|
||||
# Map of req id -> future held by requester to send responses to
|
||||
self._reply_futs: Dict[uuid.UUID, asyncio.Future] = {}
|
||||
self._pump_listeners: Dict[str, ListenerDetails] = collections.defaultdict(ListenerDetails)
|
||||
self._connection_status = ConnectionStatus.READY
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
return self._connection_status == ConnectionStatus.CONNECTED
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Receive the "hello" message from the viewer and start the message pump"""
|
||||
assert self._connection_status == ConnectionStatus.READY
|
||||
self._connection_status = ConnectionStatus.CONNECTING
|
||||
|
||||
try:
|
||||
welcome_message = await self._protocol.read_message()
|
||||
self._reply_pump = welcome_message['pump']
|
||||
self.cmd_pump = welcome_message['data']['command']
|
||||
|
||||
self._connection_status = ConnectionStatus.CONNECTED
|
||||
self._start_message_pump()
|
||||
except:
|
||||
self.disconnect()
|
||||
raise
|
||||
|
||||
def _start_message_pump(self) -> None:
|
||||
"""Read and handle inbound messages in a background task"""
|
||||
async def _pump_messages_forever():
|
||||
try:
|
||||
while not self._protocol.closed:
|
||||
self.handle_message(await self._protocol.read_message())
|
||||
except asyncio.IncompleteReadError:
|
||||
pass
|
||||
finally:
|
||||
self.disconnect()
|
||||
|
||||
# Should naturally stop on its own when disconnect is called by virtue of
|
||||
# the incomplete read.
|
||||
asyncio.get_event_loop().create_task(_pump_messages_forever())
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close the connection and clean up any pending request futures"""
|
||||
if self.connected:
|
||||
logging.info('closing LEAP connection')
|
||||
self._connection_status = ConnectionStatus.DISCONNECTED
|
||||
self._protocol.close()
|
||||
|
||||
# Clean up any pending request futures
|
||||
for fut in list(self._reply_futs.values()):
|
||||
if not fut.done():
|
||||
fut.cancel()
|
||||
self._reply_futs.clear()
|
||||
# TODO: Give anything that cares about disconnects a signal that it's happened
|
||||
# keep around Task handles and cancel those instead?
|
||||
self._pump_listeners.clear()
|
||||
|
||||
def sys_command(self, op: str, data: Optional[Dict] = None) -> Optional[Awaitable]:
|
||||
"""Make a request to an internal LEAP method over the command pump"""
|
||||
return self.command(self.cmd_pump, op, data)
|
||||
|
||||
def command(self, pump: str, op: str, data: Optional[Dict] = None) -> Optional[Awaitable]:
|
||||
"""Make a request to an internal LEAP method using the standard command form (op in data)"""
|
||||
data = data.copy() if data else {}
|
||||
data['op'] = op
|
||||
return self.post(pump, data, expect_reply=True)
|
||||
|
||||
def void_command(self, pump: str, op: str, data: Optional[Dict] = None) -> None:
|
||||
"""Like `command()`, but we don't expect a reply."""
|
||||
data = data.copy() if data else {}
|
||||
data['op'] = op
|
||||
self.post(pump, data, expect_reply=False)
|
||||
|
||||
def post(self, pump: str, data: Any, expect_reply: bool) -> Optional[Awaitable]:
|
||||
"""
|
||||
Post an event to the other side's `pump`.
|
||||
|
||||
Post the event is done synchronously, only waiting for the reply is done async.
|
||||
"""
|
||||
assert self.connected
|
||||
fut = None
|
||||
# If we expect a reply to this event, we need to do some extra bookkeeping.
|
||||
# There are apparently some commands for which we can never expect to get a reply.
|
||||
# Don't add a reqid or reply fut map entry in that case, since it will never be resolved.
|
||||
if expect_reply:
|
||||
# If you don't pass in a dict for data, we have nowhere to stuff `reqid`.
|
||||
# That means no reply tracking, meaning no future.
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"Must send a dict in `data` if you want a reply, you sent {data!r}")
|
||||
# We need to mutate the dict, make a copy so that we don't mess with the caller's version.
|
||||
data = data.copy()
|
||||
# Tell the viewer the pump to send replies to
|
||||
data["reply"] = self._reply_pump
|
||||
|
||||
req_id = uuid.uuid4()
|
||||
data["reqid"] = req_id
|
||||
|
||||
fut = asyncio.Future()
|
||||
# The future will be cleaned up when the Future is done.
|
||||
fut.add_done_callback(self._cleanup_request_future)
|
||||
self._reply_futs[req_id] = fut
|
||||
|
||||
self._protocol.write_message(pump, data)
|
||||
return fut
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def listen_scoped(self, source_pump: str) -> AsyncContextManager[Callable[[], Awaitable[Any]]]:
|
||||
"""Subscribe to events published on source_pump, allow awaiting them"""
|
||||
msg_queue = await self.listen(source_pump)
|
||||
|
||||
async def _get_wrapper():
|
||||
# TODO: handle disconnection while awaiting new Queue message
|
||||
msg = await msg_queue.get()
|
||||
|
||||
# Consumption is completion
|
||||
msg_queue.task_done()
|
||||
return msg
|
||||
|
||||
try:
|
||||
yield _get_wrapper
|
||||
finally:
|
||||
try:
|
||||
await self.stop_listening(msg_queue)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
async def listen(self, source_pump: str) -> asyncio.Queue:
|
||||
"""Start listening to `source_pump`, placing its messages in the returned asyncio Queue"""
|
||||
assert self.connected
|
||||
|
||||
msg_queue = asyncio.Queue()
|
||||
|
||||
listener_details = self._pump_listeners[source_pump]
|
||||
had_listeners = bool(listener_details.queues)
|
||||
listener_details.queues.add(msg_queue)
|
||||
|
||||
if not had_listeners:
|
||||
# Nothing was listening to this before, need to ask for its events to be
|
||||
# sent over LEAP.
|
||||
await self.sys_command("listen", {
|
||||
"listener": listener_details.listener,
|
||||
"source": source_pump,
|
||||
})
|
||||
return msg_queue
|
||||
|
||||
async def stop_listening(self, msg_queue: asyncio.Queue) -> None:
|
||||
"""Stop sending a pump's messages to msg_queue, potentially removing the listen on the pump"""
|
||||
for source_pump, listener_details in self._pump_listeners.items():
|
||||
queues = listener_details.queues
|
||||
if msg_queue in queues:
|
||||
listener_details.queues.remove(msg_queue)
|
||||
if self.connected and not queues:
|
||||
# Nobody cares about these events anymore, ask LEAP to stop sending them
|
||||
await self.sys_command("stoplistening", {
|
||||
"listener": listener_details.listener,
|
||||
"source": source_pump,
|
||||
})
|
||||
return
|
||||
raise KeyError(f"Couldn't find {msg_queue!r} in pump listeners")
|
||||
|
||||
def handle_message(self, message: Any) -> bool:
|
||||
"""Handle an inbound message and try to route it to the right recipient"""
|
||||
if not isinstance(message, dict):
|
||||
logging.warning(f"Received a non-map message: {message!r}")
|
||||
return False
|
||||
|
||||
pump = message.get("pump")
|
||||
data = message.get("data")
|
||||
if pump == self._reply_pump:
|
||||
# This is a reply for a request
|
||||
if not isinstance(data, dict):
|
||||
logging.warning(f"Received a non-map reply over the reply pump: {message!r}")
|
||||
return False
|
||||
|
||||
# reqid can tell us what future needs to be resolved, if any.
|
||||
reqid = data.get("reqid")
|
||||
fut = self._reply_futs.get(reqid)
|
||||
if not fut:
|
||||
logging.warning(f"Received a reply over the reply pump with no reqid or future: {message!r}")
|
||||
return False
|
||||
# We don't actually care about the reqid, pop it off
|
||||
data.pop("reqid")
|
||||
# Notify anyone awaiting the response
|
||||
fut.set_result(data)
|
||||
|
||||
# Might be related to a listener we registered
|
||||
# Don't warn if we get a message with an empty listener_details.queues because
|
||||
# We may still be receiving messages from before we stopped listening
|
||||
# The main concerning case if is we receive a message for something we _never_
|
||||
# registered a listener for.
|
||||
elif (listener_details := self._pump_listeners.get(pump)) is not None:
|
||||
for queue in listener_details.queues:
|
||||
queue.put_nowait(data)
|
||||
else:
|
||||
logging.warning(f"Received a message for unknown pump: {message!r}")
|
||||
return True
|
||||
|
||||
def _cleanup_request_future(self, req_fut: asyncio.Future) -> None:
|
||||
"""Remove a completed future from the reply map"""
|
||||
for key, value in self._reply_futs.items():
|
||||
if value == req_fut:
|
||||
del self._reply_futs[key]
|
||||
return
|
||||
|
||||
|
||||
class ConnectionStatus(enum.Enum):
|
||||
READY = enum.auto()
|
||||
CONNECTING = enum.auto()
|
||||
CONNECTED = enum.auto()
|
||||
DISCONNECTED = enum.auto()
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class ListenerDetails:
|
||||
# We can only have one listener with a given name active at a time. Give each listener a unique name.
|
||||
listener: Optional[str] = dataclasses.field(default_factory=lambda: "PythonListener-%s" % uuid.uuid4())
|
||||
queues: Set[asyncio.Queue] = dataclasses.field(default_factory=set)
|
||||
|
||||
|
||||
class AbstractLEAPProtocol(abc.ABC):
|
||||
"""Interface for a class representing communication with a LEAP peer"""
|
||||
closed: bool
|
||||
|
||||
@abc.abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def write_message(self, pump: str, data: Any) -> None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def read_message(self) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
class LEAPProtocol(AbstractLEAPProtocol):
|
||||
"""Wrapper for communication with a LEAP peer over an asyncio reader/writer pair"""
|
||||
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
self._parser = llsd.LLSDNotationParser()
|
||||
self._formatter = llsd.LLSDNotationFormatter()
|
||||
self._drain_task = None
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
return self._writer.is_closing() or self._reader.at_eof()
|
||||
|
||||
def close(self):
|
||||
if not self._writer.is_closing():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
|
||||
def write_message(self, pump: str, data: Any) -> None:
|
||||
assert not self._writer.is_closing()
|
||||
ser = self._formatter.format({"pump": pump, "data": data})
|
||||
payload = bytearray(str(len(ser)).encode("utf8"))
|
||||
payload.extend(b":")
|
||||
payload.extend(ser)
|
||||
self._writer.write(payload)
|
||||
# We're in sync context, we need to schedule draining the socket, which is async.
|
||||
# If a drain is already scheduled then we don't need to reschedule.
|
||||
if not self._drain_task:
|
||||
self._drain_task = asyncio.create_task(self._drain_soon())
|
||||
|
||||
async def _drain_soon(self) -> None:
|
||||
self._drain_task = None
|
||||
await self._writer.drain()
|
||||
|
||||
async def read_message(self) -> Dict:
|
||||
assert not self._reader.at_eof()
|
||||
|
||||
# Length is everything up until the first colon we see, stripping the colon off.
|
||||
length = int((await self._reader.readuntil(b':'))[:-1].decode("utf8"))
|
||||
if length > 0xffFFff:
|
||||
raise ValueError(f"Unreasonable LEAP payload length of {length}")
|
||||
# Everything after the colon is LLSD
|
||||
parsed = self._parser.parse(await self._reader.readexactly(length))
|
||||
if not isinstance(parsed, dict):
|
||||
raise ValueError(f"Expected LEAP message to be a dict, got {parsed!r}")
|
||||
return parsed
|
||||
|
||||
|
||||
class LEAPAPIWrapper(abc.ABC):
|
||||
"""Base class for classes wrapping specific LEAP APIs"""
|
||||
PUMP_NAME: Optional[str] = None
|
||||
|
||||
def __init__(self, client: LEAPClient, pump_name: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._client = client
|
||||
self._pump_name = pump_name or self.PUMP_NAME
|
||||
assert self._pump_name
|
||||
|
||||
|
||||
class UIPath(pathlib.PurePosixPath):
|
||||
__slots__ = []
|
||||
|
||||
@classmethod
|
||||
def for_floater(cls, floater_name: str) -> UIPath:
|
||||
return cls("/main_view/menu_stack/world_panel/Floater View") / floater_name
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Like the base __str__ except ".." and "." segments will be resolved."""
|
||||
return posixpath.normpath(super().__str__())
|
||||
|
||||
|
||||
async def _data_unwrapper(data_fut: Awaitable[Dict], inner_elem: str) -> Any:
|
||||
"""Unwraps part of the data future while allowing the request itself to remain synchronous"""
|
||||
# We want the request to be sent immediately, without requiring the request to be `await`ed first,
|
||||
# but that means that we have to return a `Coroutine` that will pull the value out of the dict
|
||||
# rather than directly returning the `Future`.
|
||||
return (await data_fut)[inner_elem]
|
||||
|
||||
|
||||
UI_PATH_TYPE = Optional[Union[str, UIPath]]
|
||||
|
||||
|
||||
class LLWindowWrapper(LEAPAPIWrapper):
|
||||
PUMP_NAME = "LLWindow"
|
||||
|
||||
MASK_TYPE = Optional[Collection[str]]
|
||||
KEYCODE_TYPE = Optional[int]
|
||||
KEYSYM_TYPE = Optional[str]
|
||||
CHAR_TYPE = Optional[str]
|
||||
MOUSE_COORD_TYPE: Optional[int]
|
||||
|
||||
def _convert_key_payload(self, /, *, keycode: KEYCODE_TYPE, keysym: KEYSYM_TYPE, char: CHAR_TYPE,
|
||||
mask: MASK_TYPE, path: UI_PATH_TYPE) -> Dict:
|
||||
if keycode is not None:
|
||||
payload = {"keycode": keycode}
|
||||
elif keysym is not None:
|
||||
payload = {"keysym": keysym}
|
||||
elif char is not None:
|
||||
payload = {"char": char}
|
||||
else:
|
||||
raise ValueError("Didn't have one of keycode, keysym or char")
|
||||
|
||||
if path:
|
||||
payload["path"] = str(path)
|
||||
if mask:
|
||||
payload["mask"] = mask
|
||||
|
||||
return payload
|
||||
|
||||
def key_down(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None,
|
||||
keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None:
|
||||
"""Simulate a key being pressed down"""
|
||||
payload = self._convert_key_payload(keysym=keysym, keycode=keycode, char=char, mask=mask, path=path)
|
||||
self._client.void_command(self._pump_name, "keyDown", payload)
|
||||
|
||||
def key_up(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None,
|
||||
keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None:
|
||||
"""Simulate a key being released"""
|
||||
payload = self._convert_key_payload(keysym=keysym, keycode=keycode, char=char, mask=mask, path=path)
|
||||
self._client.void_command(self._pump_name, "keyUp", payload)
|
||||
|
||||
def key_press(self, /, *, mask: MASK_TYPE = None, keycode: KEYCODE_TYPE = None,
|
||||
keysym: KEYSYM_TYPE = None, char: CHAR_TYPE = None, path: UI_PATH_TYPE = None) -> None:
|
||||
"""Simulate a key being pressed down and immediately released"""
|
||||
self.key_down(mask=mask, keycode=keycode, keysym=keysym, char=char, path=path)
|
||||
self.key_up(mask=mask, keycode=keycode, keysym=keysym, char=char, path=path)
|
||||
|
||||
def text_input(self, text_input: str, path: UI_PATH_TYPE = None) -> None:
|
||||
"""Simulate a user typing a string of text"""
|
||||
# TODO: Uhhhhh I can't see how the key* APIs could possibly handle i18n correctly,
|
||||
# what with all the U8s. Maybe I'm just dumb?
|
||||
for char in text_input:
|
||||
self.key_press(char=char, path=path)
|
||||
|
||||
async def get_paths(self, under: UI_PATH_TYPE = "") -> Dict:
|
||||
"""Get all UI paths under the root, or under a path if specified"""
|
||||
return await self._client.command(self._pump_name, "getPaths", {"under": str(under)})
|
||||
|
||||
async def get_info(self, path: UI_PATH_TYPE) -> Dict:
|
||||
"""Get info about an element specified by path"""
|
||||
return await self._client.command(self._pump_name, "getInfo", {"path": str(path)})
|
||||
|
||||
def _build_mouse_payload(self, /, *, x: MOUSE_COORD_TYPE, y: MOUSE_COORD_TYPE, path: UI_PATH_TYPE,
|
||||
mask: MASK_TYPE, button: str = None) -> Dict:
|
||||
if path is not None:
|
||||
payload = {"path": str(path)}
|
||||
elif x is not None and y is not None:
|
||||
payload = {"x": x, "y": y}
|
||||
else:
|
||||
raise ValueError("Didn't have one of x + y or path")
|
||||
|
||||
if mask:
|
||||
payload["mask"] = mask
|
||||
if button:
|
||||
payload["button"] = button
|
||||
|
||||
return payload
|
||||
|
||||
def mouse_down(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None,
|
||||
path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]:
|
||||
"""Simulate a mouse down event occurring at a coordinate or UI element path"""
|
||||
payload = self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button)
|
||||
return self._client.command(self._pump_name, "mouseDown", payload)
|
||||
|
||||
def mouse_up(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None,
|
||||
path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]:
|
||||
"""Simulate a mouse up event occurring at a coordinate or UI element path"""
|
||||
payload = self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button)
|
||||
return self._client.command(self._pump_name, "mouseUp", payload)
|
||||
|
||||
def mouse_click(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None,
|
||||
path: UI_PATH_TYPE = None, mask: MASK_TYPE = None, button: str) -> Awaitable[Dict]:
|
||||
"""Simulate a mouse down and immediately following mouse up event"""
|
||||
# We're going to ignore the mouseDown response, so use void_command instead.
|
||||
# Most side effects are actually executed on mouseUp.
|
||||
self._client.void_command(self._pump_name, "mouseDown",
|
||||
self._build_mouse_payload(x=x, y=y, path=path, mask=mask, button=button))
|
||||
return self.mouse_up(x=x, y=y, path=path, mask=mask, button=button)
|
||||
|
||||
def mouse_move(self, /, *, x: MOUSE_COORD_TYPE = None, y: MOUSE_COORD_TYPE = None,
|
||||
path: UI_PATH_TYPE = None) -> Awaitable[Dict]:
|
||||
"""Move the mouse to the coordinates or path specified"""
|
||||
payload = self._build_mouse_payload(x=x, y=y, path=path, mask=None)
|
||||
return self._client.command(self._pump_name, "mouseMove", payload)
|
||||
|
||||
def mouse_scroll(self, clicks: int) -> None:
|
||||
"""Act as if the scroll wheel has been moved `clicks` amount. May be negative"""
|
||||
self._client.command(self._pump_name, "mouseScroll", {"clicks": clicks})
|
||||
|
||||
|
||||
class LLUIWrapper(LEAPAPIWrapper):
|
||||
PUMP_NAME = "UI"
|
||||
|
||||
def call(self, function: str, parameter: Any = None) -> None:
|
||||
"""
|
||||
Invoke the `function` operation as if from a menu or button click, passing `parameter`
|
||||
|
||||
Can call most things registered through `LLUICtrl::CommitCallbackRegistry`.
|
||||
"""
|
||||
self._client.void_command(self._pump_name, "call", {"function": function, "parameter": parameter})
|
||||
|
||||
def get_value(self, path: UI_PATH_TYPE) -> Awaitable[Any]:
|
||||
"""For the UI control identified by `path`, return the current value in `value`"""
|
||||
resp_fut = self._client.command(self._pump_name, "getValue", {"path": str(path)})
|
||||
return _data_unwrapper(resp_fut, "value")
|
||||
|
||||
|
||||
class LLCommandDispatcherWrapper(LEAPAPIWrapper):
|
||||
PUMP_NAME = "LLCommandDispatcher"
|
||||
|
||||
def dispatch(
|
||||
self,
|
||||
cmd: str,
|
||||
/,
|
||||
*,
|
||||
params: Optional[Collection[str]] = None,
|
||||
query: Optional[Dict[str, str]] = None,
|
||||
trusted: bool = True,
|
||||
) -> None:
|
||||
"""Execute a command registered as an LLCommandHandler"""
|
||||
return self._client.void_command(self._pump_name, "dispatch", {
|
||||
"cmd": cmd,
|
||||
"params": params or [],
|
||||
"query": query or {},
|
||||
"trusted": trusted,
|
||||
})
|
||||
|
||||
def enumerate(self) -> Awaitable[Dict]:
|
||||
"""Get map of registered LLCommandHandlers, containing name, key, and (e.g.) untrusted flag"""
|
||||
return self._client.command(self._pump_name, "enumerate")
|
||||
|
||||
|
||||
class LLViewerControlWrapper(LEAPAPIWrapper):
|
||||
PUMP_NAME = "LLViewerControl"
|
||||
|
||||
def get(self, group: str, key: str) -> Awaitable[Any]:
|
||||
"""
|
||||
Get value of a Control (config) key
|
||||
|
||||
`group` can be one of "CrashSettings", "Global", "PerAccount", "Warnings".
|
||||
"""
|
||||
return self._client.command(self._pump_name, "get", {"key": key, "group": group})
|
||||
|
||||
def vars(self, group: str) -> Awaitable[List[Dict]]:
|
||||
"""Return a list of dicts of controls in `group`"""
|
||||
resp_fut = self._client.command(self._pump_name, "vars", {"group": group})
|
||||
return _data_unwrapper(resp_fut, "vars")
|
||||
|
||||
def set(self, group: str, key: str, value: Any) -> None:
|
||||
"""
|
||||
Set a configuration value
|
||||
|
||||
TODO: error handling based on "error" field in resp?
|
||||
"""
|
||||
self._client.void_command(self._pump_name, "set", {"key": key, "group": group, "value": value})
|
||||
|
||||
|
||||
class LEAPBridgeServer:
|
||||
"""LEAP Bridge TCP server to use with asyncio.start_server()"""
|
||||
|
||||
def __init__(self, client_connected_cb: Optional[Callable[[LEAPClient]], Awaitable[Any]] = None):
|
||||
self.clients: weakref.WeakSet[LEAPClient] = weakref.WeakSet()
|
||||
self._client_connected_cb = client_connected_cb
|
||||
|
||||
async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
client = LEAPClient(LEAPProtocol(reader, writer))
|
||||
logging.info('Accepting LEAP connection from %r' % (writer.get_extra_info("peername", None),))
|
||||
await client.connect()
|
||||
|
||||
self.clients.add(client)
|
||||
if self._client_connected_cb:
|
||||
await self._client_connected_cb(client)
|
||||
2
setup.py
2
setup.py
@@ -76,8 +76,6 @@ setup(
|
||||
'console_scripts': {
|
||||
'hippolyzer-gui = hippolyzer.apps.proxy_gui:gui_main',
|
||||
'hippolyzer-cli = hippolyzer.apps.proxy:main',
|
||||
'hippolyzer-leapagent = hippolyzer.apps.proxy_leapagent:agent_main',
|
||||
'hippolyzer-leapreceiver = hippolyzer.apps.proxy_leapreceiver:receiver_main',
|
||||
}
|
||||
},
|
||||
zip_safe=False,
|
||||
|
||||
Reference in New Issue
Block a user