Files
copyparty/tests/test_httpcli.py

192 lines
6.2 KiB
Python
Raw Normal View History

2021-09-16 00:28:38 +02:00
#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import io
import os
import time
import shutil
import pprint
import tarfile
2021-06-01 05:49:41 +02:00
import tempfile
import unittest
from tests import util as tu
2022-07-27 00:15:49 +02:00
from tests.util import Cfg
2021-06-04 19:35:08 +02:00
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
def hdr(query):
h = "GET /{} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\n\r\n"
return h.format(query).encode("utf-8")
class TestHttpCli(unittest.TestCase):
2021-06-01 05:49:41 +02:00
def setUp(self):
self.td = tu.get_ramdisk()
def tearDown(self):
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
2021-06-01 05:49:41 +02:00
def test(self):
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
self.dtypes = ["ra", "ro", "rx", "wa", "wo", "wx", "aa", "ao", "ax"]
self.can_read = ["ra", "ro", "aa", "ao"]
self.can_write = ["wa", "wo", "aa", "ao"]
self.fn = "g{:x}g".format(int(time.time() * 3))
allfiles = []
allvols = []
for top in self.dtypes:
allvols.append(top)
allfiles.append("/".join([top, self.fn]))
for s1 in self.dtypes:
p = "/".join([top, s1])
allvols.append(p)
allfiles.append(p + "/" + self.fn)
allfiles.append(p + "/n/" + self.fn)
for s2 in self.dtypes:
p = "/".join([top, s1, "n", s2])
os.makedirs(p)
allvols.append(p)
allfiles.append(p + "/" + self.fn)
for fp in allfiles:
with open(fp, "w") as f:
f.write("ok {}\n".format(fp))
for top in self.dtypes:
vcfg = []
for vol in allvols:
if not vol.startswith(top):
continue
2021-09-15 01:01:20 +02:00
mode = vol[-2].replace("a", "rw")
usr = vol[-1]
if usr == "a":
usr = ""
if "/" not in vol:
vol += "/"
top, sub = vol.split("/", 1)
2021-07-22 23:48:29 +02:00
vcfg.append("{0}/{1}:{1}:{2},{3}".format(top, sub, mode, usr))
pprint.pprint(vcfg)
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
2021-06-11 23:01:13 +02:00
self.asrv = AuthSrv(self.args, self.log)
vfiles = [x for x in allfiles if x.startswith(top)]
for fp in vfiles:
rok, wok = self.can_rw(fp)
furl = fp.split("/", 1)[1]
durl = furl.rsplit("/", 1)[0] if "/" in furl else ""
# file download
2021-04-24 03:35:58 +02:00
h, ret = self.curl(furl)
res = "ok " + fp in ret
print("[{}] {} {} = {}".format(fp, rok, wok, res))
if rok != res:
print("\033[33m{}\n# {}\033[0m".format(ret, furl))
self.fail()
# file browser: html
2021-04-24 03:35:58 +02:00
h, ret = self.curl(durl)
res = "'{}'".format(self.fn) in ret
print(res)
if rok != res:
print("\033[33m{}\n# {}\033[0m".format(ret, durl))
self.fail()
# file browser: json
url = durl + "?ls"
2021-04-24 03:35:58 +02:00
h, ret = self.curl(url)
res = '"{}"'.format(self.fn) in ret
print(res)
if rok != res:
print("\033[33m{}\n# {}\033[0m".format(ret, url))
self.fail()
# tar
url = durl + "?tar"
h, b = self.curl(url, True)
# with open(os.path.join(td, "tar"), "wb") as f:
# f.write(b)
try:
tar = tarfile.open(fileobj=io.BytesIO(b)).getnames()
except:
tar = []
2021-09-08 00:16:08 +02:00
tar = [x[4:] if x.startswith("top/") else x for x in tar]
tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar]
tar = [[x] + self.can_rw(x) for x in tar]
tar_ok = [x[0] for x in tar if x[1]]
tar_ng = [x[0] for x in tar if not x[1]]
self.assertEqual([], tar_ng)
2021-04-24 03:35:58 +02:00
if durl.split("/")[-1] in self.can_read:
ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)]
for f in ref:
print("{}: {}".format("ok" if f in tar_ok else "NG", f))
ref.sort()
tar_ok.sort()
self.assertEqual(ref, tar_ok)
2021-04-24 03:35:58 +02:00
# stash
h, ret = self.put(url)
res = h.startswith("HTTP/1.1 201 ")
2021-04-24 03:35:58 +02:00
self.assertEqual(res, wok)
def can_rw(self, fp):
# lowest non-neutral folder declares permissions
expect = fp.split("/")[:-1]
for x in reversed(expect):
if x != "n":
expect = x
break
return [expect in self.can_read, expect in self.can_write]
def in_dive(self, top, fp):
# archiver bails at first inaccessible subvolume
top = top.strip("/").split("/")
fp = fp.split("/")
for f1, f2 in zip(top, fp):
if f1 != f2:
return False
for f in fp[len(top) :]:
if f == self.fn:
return True
if f not in self.can_read and f != "n":
return False
return True
2021-04-24 03:35:58 +02:00
def put(self, url):
buf = "PUT /{0} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\nContent-Length: {1}\r\n\r\nok {0}\n"
buf = buf.format(url, len(url) + 4).encode("utf-8")
print("PUT -->", buf)
2021-06-11 23:01:13 +02:00
conn = tu.VHttpConn(self.args, self.asrv, self.log, buf)
2021-04-24 03:35:58 +02:00
HttpCli(conn).run()
ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
print("PUT <--", ret)
return ret
2021-04-24 03:35:58 +02:00
def curl(self, url, binary=False):
2021-06-11 23:01:13 +02:00
conn = tu.VHttpConn(self.args, self.asrv, self.log, hdr(url))
HttpCli(conn).run()
if binary:
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
2021-04-24 03:35:58 +02:00
return [h.decode("utf-8"), b]
2021-04-24 03:35:58 +02:00
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def log(self, src, msg, c=0):
print(msg)