Add support for asset upload via xfer

Still needed for shapes.
This commit is contained in:
Salad Dais
2021-05-14 03:56:11 +00:00
parent 37e8f8a20e
commit 5eae956750
3 changed files with 77 additions and 3 deletions

View File

@@ -8,7 +8,7 @@ from hippolyzer.lib.proxy.commands import handle_command
from hippolyzer.lib.proxy.message import ProxiedMessage
from hippolyzer.lib.proxy.region import ProxiedRegion
from hippolyzer.lib.proxy.sessions import Session
from hippolyzer.lib.proxy.templates import XferFilePath
from hippolyzer.lib.proxy.templates import XferFilePath, AssetType
class XferExampleAddon(BaseAddon):

View File

@@ -40,7 +40,7 @@ class Block:
"""
__slots__ = ('name', 'size', 'vars', 'message_name', '_ser_cache', 'fill_missing',)
def __init__(self, name, /, fill_missing=False, **kwargs):
def __init__(self, name, *, /, fill_missing=False, **kwargs):
self.name = name
self.size = 0
self.message_name: Optional[str] = None

View File

@@ -25,7 +25,7 @@ _XFER_MESSAGES = {"AbortXfer", "ConfirmXferPacket", "RequestXfer", "SendXferPack
class Xfer:
def __init__(self, xfer_id: int):
def __init__(self, xfer_id: Optional[int] = None):
super().__init__()
self.xfer_id: Optional[int] = xfer_id
self.chunks: Dict[int, bytes] = {}
@@ -144,3 +144,77 @@ class XferManager:
xfer.chunks[packet_id.PacketID] = packet_data
if packet_id.IsEOF and not xfer.done():
xfer.mark_done()
def upload_asset(
self,
asset_type: AssetType,
data: bytes,
store_local: bool = False,
temp_file: bool = False,
transaction_id: Optional[UUID] = None,
):
"""Upload an asset through the Xfer upload path"""
if not transaction_id:
transaction_id = UUID.random()
xfer = Xfer()
chunk_num = 0
# prepend the length expected length
data = TemplateDataPacker.pack(len(data), MsgType.MVT_S32) + data
while data:
xfer.chunks[chunk_num] = data[:1200]
data = data[1200:]
self._region.circuit.send_message(ProxiedMessage(
"AssetUploadRequest",
Block(
"AssetBlock",
TransactionID=transaction_id,
Type=asset_type,
Tempfile=temp_file,
StoreLocal=store_local,
# Can shove in here for assets smaller than 1200 bytes, always
# use the long, Xfer path for now.
AssetData=b'',
)
))
fut = asyncio.Future()
asyncio.create_task(self._pump_asset_upload(xfer, transaction_id, fut))
return fut
async def _pump_asset_upload(self, xfer: Xfer, transaction_id: UUID, fut: asyncio.Future):
message_handler = self._region.message_handler
# We'll receive an Xfer request for the asset we're uploading.
# asset ID is determined by hashing secure session ID with chosen transaction ID.
asset_id: UUID = self._region.session().tid_to_assetid(transaction_id)
try:
def request_predicate(request_msg: ProxiedMessage):
return request_msg["XferID"]["VFileID"] == asset_id
msg = await message_handler.wait_for(
'RequestXfer', predicate=request_predicate, timeout=5000)
xfer.xfer_id = msg["XferID"]["ID"]
packet_id = 0
# TODO: No resend yet. If it's lost, it's lost.
while xfer.chunks:
chunk = xfer.chunks.pop(packet_id)
# EOF if there are no chunks left
packet_val = XferPacket(PacketID=packet_id, IsEOF=not bool(xfer.chunks))
self._region.circuit.send_message(ProxiedMessage(
"SendXferPacket",
Block("XferID", ID=xfer.xfer_id, Packet_=packet_val),
Block("DataPacket", Data=chunk),
))
# Don't care about the value, just want to know it was confirmed.
await message_handler.wait_for(
"ConfirmXferPacket", predicate=xfer.is_our_message, timeout=5000)
packet_id += 1
def complete_predicate(complete_msg: ProxiedMessage):
return complete_msg["AssetBlock"]["UUID"] == asset_id
msg = await message_handler.wait_for('AssetUploadComplete', predicate=complete_predicate)
if msg["AssetBlock"]["Success"] == 1:
fut.set_result(asset_id)
else:
fut.set_exception(RuntimeError(f"Xfer for transaction {transaction_id} failed"))
except asyncio.TimeoutError as e:
fut.set_exception(e)