Files
copyparty/tests/test_httpcli.py

255 lines
8.7 KiB
Python
Raw Normal View History

2021-09-16 00:28:38 +02:00
#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import io
import os
import pprint
2023-11-30 17:33:07 +00:00
import shutil
import tarfile
2021-06-01 05:49:41 +02:00
import tempfile
2023-11-30 17:33:07 +00:00
import time
import unittest
2024-07-30 22:44:29 +00:00
import zipfile
2021-06-04 19:35:08 +02:00
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
2023-11-30 17:33:07 +00:00
from tests import util as tu
from tests.util import Cfg, eprint, pfind2ls
def hdr(query):
h = "GET /{} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\n\r\n"
return h.format(query).encode("utf-8")
class TestHttpCli(unittest.TestCase):
2021-06-01 05:49:41 +02:00
def setUp(self):
self.td = tu.get_ramdisk()
2024-10-02 21:59:53 +00:00
self.maxDiff = 99999
2021-06-01 05:49:41 +02:00
def tearDown(self):
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
2021-06-01 05:49:41 +02:00
def test(self):
2024-07-30 22:44:29 +00:00
test_tar = True
test_zip = True
2021-06-01 05:49:41 +02:00
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
2024-10-02 21:59:53 +00:00
# "perm+user"; r/w/a (a=rw) for user a/o/x (a=all)
self.dtypes = ["ra", "ro", "rx", "wa", "wo", "wx", "aa", "ao", "ax"]
self.can_read = ["ra", "ro", "aa", "ao"]
self.can_write = ["wa", "wo", "aa", "ao"]
self.fn = "g{:x}g".format(int(time.time() * 3))
2024-07-30 22:44:29 +00:00
tctr = 0
allfiles = []
allvols = []
for top in self.dtypes:
allvols.append(top)
allfiles.append("/".join([top, self.fn]))
for s1 in self.dtypes:
p = "/".join([top, s1])
allvols.append(p)
allfiles.append(p + "/" + self.fn)
allfiles.append(p + "/n/" + self.fn)
for s2 in self.dtypes:
p = "/".join([top, s1, "n", s2])
os.makedirs(p)
allvols.append(p)
allfiles.append(p + "/" + self.fn)
for fp in allfiles:
with open(fp, "w") as f:
f.write("ok {}\n".format(fp))
for top in self.dtypes:
vcfg = []
for vol in allvols:
if not vol.startswith(top):
continue
2021-09-15 01:01:20 +02:00
mode = vol[-2].replace("a", "rw")
usr = vol[-1]
if usr == "a":
usr = ""
if "/" not in vol:
vol += "/"
top, sub = vol.split("/", 1)
2021-07-22 23:48:29 +02:00
vcfg.append("{0}/{1}:{1}:{2},{3}".format(top, sub, mode, usr))
pprint.pprint(vcfg)
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
2021-06-11 23:01:13 +02:00
self.asrv = AuthSrv(self.args, self.log)
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
vfiles = [x for x in allfiles if x.startswith(top)]
for fp in vfiles:
2024-07-30 22:44:29 +00:00
tctr += 1
rok, wok = self.can_rw(fp)
furl = fp.split("/", 1)[1]
durl = furl.rsplit("/", 1)[0] if "/" in furl else ""
# file download
2021-04-24 03:35:58 +02:00
h, ret = self.curl(furl)
res = "ok " + fp in ret
print("[{}] {} {} = {}".format(fp, rok, wok, res))
if rok != res:
2023-08-20 17:58:06 +00:00
eprint("\033[33m{}\n# {}\033[0m".format(ret, furl))
self.fail()
# file browser: html
2021-04-24 03:35:58 +02:00
h, ret = self.curl(durl)
res = "'{}'".format(self.fn) in ret
print(res)
if rok != res:
2023-08-20 17:58:06 +00:00
eprint("\033[33m{}\n# {}\033[0m".format(ret, durl))
self.fail()
# file browser: json
url = durl + "?ls"
2021-04-24 03:35:58 +02:00
h, ret = self.curl(url)
res = '"{}"'.format(self.fn) in ret
print(res)
if rok != res:
2023-08-20 17:58:06 +00:00
eprint("\033[33m{}\n# {}\033[0m".format(ret, url))
self.fail()
2024-07-30 22:44:29 +00:00
# expected files in archives
if rok:
2024-10-02 21:59:53 +00:00
zs = top + "/" + durl
ref = [x for x in vfiles if self.in_dive(zs, x)]
2021-04-24 03:35:58 +02:00
ref.sort()
2024-07-30 22:44:29 +00:00
else:
ref = []
h, b = self.propfind(durl, 1)
fns = [x for x in pfind2ls(b) if not x.endswith("/")]
if ref:
self.assertIn("<D:propstat>", b)
elif not rok and not wok:
self.assertListEqual([], fns)
else:
self.assertIn("<D:multistatus", b)
h, b = self.propfind(durl, 0)
fns = [x for x in pfind2ls(b) if not x.endswith("/")]
if ref:
self.assertIn("<D:propstat>", b)
elif not rok:
self.assertListEqual([], fns)
else:
self.assertIn("<D:multistatus", b)
2024-07-30 22:44:29 +00:00
if test_tar:
url = durl + "?tar"
h, b = self.curl(url, True)
try:
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
except:
if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT":
eprint("bad tar?", url, h, b)
raise
tar = []
tar = [x.split("/", 1)[1] for x in tar]
tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar]
tar = [[x] + self.can_rw(x) for x in tar]
tar_ok = [x[0] for x in tar if x[1]]
tar_ng = [x[0] for x in tar if not x[1]]
2021-04-24 03:35:58 +02:00
tar_ok.sort()
self.assertEqual(ref, tar_ok)
2024-07-30 22:44:29 +00:00
self.assertEqual([], tar_ng)
if test_zip:
url = durl + "?zip"
h, b = self.curl(url, True)
try:
with zipfile.ZipFile(io.BytesIO(b), "r") as zf:
zfi = zf.infolist()
except:
if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT":
eprint("bad zip?", url, h, b)
raise
zfi = []
zfn = [x.filename.split("/", 1)[1] for x in zfi]
zfn = ["/".join([y for y in [top, durl, x] if y]) for x in zfn]
zfn = [[x] + self.can_rw(x) for x in zfn]
zf_ok = [x[0] for x in zfn if x[1]]
zf_ng = [x[0] for x in zfn if not x[1]]
zf_ok.sort()
self.assertEqual(ref, zf_ok)
self.assertEqual([], zf_ng)
2021-04-24 03:35:58 +02:00
# stash
2024-10-02 21:59:53 +00:00
h, ret = self.put(durl)
res = h.startswith("HTTP/1.1 201 ")
2021-04-24 03:35:58 +02:00
self.assertEqual(res, wok)
2024-07-31 17:51:53 +00:00
if wok:
vp = h.split("\nLocation: http://a:1/")[1].split("\r")[0]
vn, rem = self.asrv.vfs.get(vp, "*", False, False)
ap = os.path.join(vn.realpath, rem)
os.unlink(ap)
2024-11-22 22:21:43 +00:00
self.conn.shutdown()
def can_rw(self, fp):
# lowest non-neutral folder declares permissions
expect = fp.split("/")[:-1]
for x in reversed(expect):
if x != "n":
expect = x
break
return [expect in self.can_read, expect in self.can_write]
def in_dive(self, top, fp):
# archiver bails at first inaccessible subvolume
top = top.strip("/").split("/")
fp = fp.split("/")
for f1, f2 in zip(top, fp):
if f1 != f2:
return False
for f in fp[len(top) :]:
if f == self.fn:
return True
if f not in self.can_read and f != "n":
return False
return True
2021-04-24 03:35:58 +02:00
def put(self, url):
buf = "PUT /{0} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\nContent-Length: {1}\r\n\r\nok {0}\n"
buf = buf.format(url, len(url) + 4).encode("utf-8")
print("PUT -->", buf)
conn = self.conn.setbuf(buf)
2021-04-24 03:35:58 +02:00
HttpCli(conn).run()
ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
print("PUT <--", ret)
return ret
2021-04-24 03:35:58 +02:00
def curl(self, url, binary=False):
conn = self.conn.setbuf(hdr(url))
HttpCli(conn).run()
if binary:
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
2021-04-24 03:35:58 +02:00
return [h.decode("utf-8"), b]
2021-04-24 03:35:58 +02:00
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def propfind(self, url, depth=1):
zs = "PROPFIND /%s HTTP/1.1\r\nDepth: %d\r\nPW: o\r\nConnection: close\r\n\r\n"
buf = zs % (url, depth)
conn = self.conn.setbuf(buf.encode("utf-8"))
HttpCli(conn).run()
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def log(self, src, msg, c=0):
print(msg)