Files
copyparty/tests/util.py

185 lines
4.5 KiB
Python
Raw Normal View History

import os
2021-06-04 19:35:08 +02:00
import sys
import time
2021-06-01 05:49:41 +02:00
import shutil
import jinja2
2021-10-03 19:59:47 +02:00
import threading
import tempfile
2021-06-04 19:35:08 +02:00
import platform
import subprocess as sp
2022-07-27 00:15:49 +02:00
from argparse import Namespace
2021-06-04 19:35:08 +02:00
WINDOWS = platform.system() == "Windows"
ANYWIN = WINDOWS or sys.platform in ["msys"]
MACOS = platform.system() == "Darwin"
J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader)
J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}")
2021-06-04 19:35:08 +02:00
def nah(*a, **ka):
return False
if MACOS:
import posixpath
posixpath.islink = nah
os.path.islink = nah
# 25% faster; until any tests do symlink stuff
2021-10-03 19:59:47 +02:00
from copyparty.util import Unrecv, FHC
2021-06-04 19:35:08 +02:00
2021-07-28 01:18:38 +02:00
def runcmd(argv):
p = sp.Popen(argv, stdout=sp.PIPE, stderr=sp.PIPE)
stdout, stderr = p.communicate()
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
return [p.returncode, stdout, stderr]
2021-07-28 01:18:38 +02:00
def chkcmd(argv):
ok, sout, serr = runcmd(argv)
if ok != 0:
raise Exception(serr)
return sout, serr
def get_ramdisk():
2021-06-01 05:49:41 +02:00
def subdir(top):
ret = os.path.join(top, "cptd-{}".format(os.getpid()))
shutil.rmtree(ret, True)
os.mkdir(ret)
return ret
for vol in ["/dev/shm", "/Volumes/cptd"]: # nosec (singleton test)
if os.path.exists(vol):
2021-06-01 05:49:41 +02:00
return subdir(vol)
if os.path.exists("/Volumes"):
2021-06-01 05:49:41 +02:00
# hdiutil eject /Volumes/cptd/
2021-07-28 01:18:38 +02:00
devname, _ = chkcmd("hdiutil attach -nomount ram://131072".split())
devname = devname.strip()
print("devname: [{}]".format(devname))
for _ in range(10):
try:
2021-07-28 01:18:38 +02:00
_, _ = chkcmd(["diskutil", "eraseVolume", "HFS+", "cptd", devname])
2021-07-17 16:43:22 +02:00
with open("/Volumes/cptd/.metadata_never_index", "w") as f:
f.write("orz")
2021-07-17 16:45:25 +02:00
try:
shutil.rmtree("/Volumes/cptd/.fseventsd")
except:
pass
2021-06-01 05:49:41 +02:00
return subdir("/Volumes/cptd")
except Exception as ex:
print(repr(ex))
time.sleep(0.25)
raise Exception("ramdisk creation failed")
ret = os.path.join(tempfile.gettempdir(), "copyparty-test")
try:
os.mkdir(ret)
finally:
2021-06-01 05:49:41 +02:00
return subdir(ret)
2022-07-27 00:15:49 +02:00
class Cfg(Namespace):
def __init__(self, a=None, v=None, c=None):
ka = {}
2022-08-04 00:39:37 +02:00
ex = "e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp xdev xvol ed emp force_js ihead no_acode no_athumb no_del no_logues no_mv no_readme no_robots no_scandir no_thumb no_vthumb no_zip nid nih nw"
2022-07-27 00:15:49 +02:00
ka.update(**{k: False for k in ex.split()})
ex = "no_rescan no_sendfile no_voldump"
ka.update(**{k: True for k in ex.split()})
2022-09-07 23:06:12 +02:00
ex = "css_browser hist js_browser no_hash no_idx no_forget"
2022-07-27 00:15:49 +02:00
ka.update(**{k: None for k in ex.split()})
ex = "re_maxage rproxy rsp_slp s_wr_slp theme themes turbo df"
ka.update(**{k: 0 for k in ex.split()})
2022-08-02 20:26:51 +02:00
ex = "doctitle favico html_head mth textfiles log_fk"
2022-07-27 00:15:49 +02:00
ka.update(**{k: "" for k in ex.split()})
super(Cfg, self).__init__(
a=a or [],
v=v or [],
c=c,
s_wr_sz=512 * 1024,
unpost=600,
u2sort="s",
mtp=[],
mte="a",
lang="eng",
logout=573,
**ka
)
2021-04-24 03:35:58 +02:00
class NullBroker(object):
2022-06-16 01:07:15 +02:00
def say(*args):
pass
def ask(*args):
2021-04-24 03:35:58 +02:00
pass
class VSock(object):
def __init__(self, buf):
self._query = buf
self._reply = b""
self.sendall = self.send
def recv(self, sz):
ret = self._query[:sz]
self._query = self._query[sz:]
return ret
def send(self, buf):
self._reply += buf
return len(buf)
2022-01-18 22:28:33 +01:00
def getsockname(self):
return ("a", 1)
class VHttpSrv(object):
def __init__(self):
2021-04-24 03:35:58 +02:00
self.broker = NullBroker()
2021-11-06 23:27:21 +01:00
self.prism = None
2021-04-24 03:35:58 +02:00
aliases = ["splash", "browser", "browser2", "msg", "md", "mde"]
self.j2 = {x: J2_FILES for x in aliases}
def cachebuster(self):
return "a"
class VHttpConn(object):
2021-06-11 23:01:13 +02:00
def __init__(self, args, asrv, log, buf):
self.s = VSock(buf)
self.sr = Unrecv(self.s, None)
self.addr = ("127.0.0.1", "42069")
self.args = args
2021-06-11 23:01:13 +02:00
self.asrv = asrv
self.nid = None
self.log_func = log
self.log_src = "a"
self.lf_url = None
self.hsrv = VHttpSrv()
2021-10-03 19:59:47 +02:00
self.u2fh = FHC()
self.mutex = threading.Lock()
2021-06-30 01:00:00 +02:00
self.nreq = 0
self.nbyte = 0
self.ico = None
self.thumbcli = None
2021-10-03 19:59:47 +02:00
self.t0 = time.time()