Files
copyparty/tests/test_dots.py

385 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import io
2023-12-17 22:30:22 +00:00
import json
import os
import shutil
import tarfile
import tempfile
import unittest
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
from copyparty.u2idx import U2idx
2024-02-10 23:50:17 +00:00
from copyparty.up2k import Up2k
from tests import util as tu
from tests.util import Cfg
2024-11-22 22:21:43 +00:00
try:
from typing import Optional
except:
pass
def hdr(query, uname):
h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n\r\n"
return (h % (query, uname)).encode("utf-8")
class TestDots(unittest.TestCase):
def __init__(self, *a, **ka):
super(TestDots, self).__init__(*a, **ka)
self.is_dut = True
def setUp(self):
2024-11-22 22:21:43 +00:00
self.conn: Optional[tu.VHttpConn] = None
self.td = tu.get_ramdisk()
def tearDown(self):
2024-11-22 22:21:43 +00:00
if self.conn:
self.conn.shutdown()
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
2024-11-22 22:21:43 +00:00
def cinit(self):
if self.conn:
self.conn.shutdown()
self.conn = None
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
def test_dots(self):
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
# topDir volA volA/*dirA .volB .volB/*dirB
spaths = " t .t a a/da a/.da .b .b/db .b/.db"
for n, dirpath in enumerate(spaths.split(" ")):
if dirpath:
os.makedirs(dirpath)
for pfx in "f", ".f":
filepath = pfx + str(n)
if dirpath:
filepath = os.path.join(dirpath, filepath)
with open(filepath, "wb") as f:
f.write(filepath.encode("utf-8"))
2024-03-21 18:51:23 +00:00
vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True)
self.asrv = AuthSrv(self.args, self.log)
2024-11-22 22:21:43 +00:00
self.cinit()
self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4")
self.assertEqual(self.tardir(".t", "u1"), "f2")
self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7")
zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4"
self.assertEqual(self.tardir("", "u2"), zs)
self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT")
2023-12-17 22:30:22 +00:00
##
## search
up2k = Up2k(self)
u2idx = U2idx(self)
allvols = list(self.asrv.vfs.all_vols.values())
x = u2idx.search("u1", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
x = u2idx.search("u2", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1")
2024-04-25 22:25:38 +00:00
u2idx.shutdown()
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True)
self.asrv = AuthSrv(self.args, self.log)
u2idx = U2idx(self)
2024-11-22 22:21:43 +00:00
self.cinit()
x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = "a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
2024-04-25 22:25:38 +00:00
u2idx.shutdown()
up2k.shutdown()
2023-12-17 22:30:22 +00:00
##
## dirkeys
os.mkdir("v")
with open("v/f1.txt", "wb") as f:
f.write(b"a")
os.rename("a", "v/a")
os.rename(".b", "v/.b")
vcfg = [
2024-10-02 21:59:53 +00:00
".::r.,u1:g,u2:c,dks",
"v/a:v/a:r.,u1:g,u2:c,dks",
"v/.b:v/.b:r.,u1:g,u2:c,dks",
2023-12-17 22:30:22 +00:00
]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
2024-11-22 22:21:43 +00:00
self.cinit()
2023-12-17 22:30:22 +00:00
zj = json.loads(self.curl("?ls", "u1")[1])
url = "?k=" + zj["dk"]
# should descend into folders, but not other volumes:
self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt")
zj = json.loads(self.curl("v?ls", "u1")[1])
url = "v?k=" + zj["dk"]
self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt")
shutil.rmtree("v")
def test_dk_fk(self):
# python3 -m unittest tests.test_dots.TestDots.test_dk_fk
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
vcfg = []
for k in "dk dks dky fk fka dk,fk dks,fk".split():
vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)]
zs = "%s/s1/s2" % (k,)
os.makedirs(zs)
with open("%s/f.t1" % (k,), "wb") as f:
f.write(b"f1")
with open("%s/s1/f.t2" % (k,), "wb") as f:
f.write(b"f2")
with open("%s/s1/s2/f.t3" % (k,), "wb") as f:
f.write(b"f3")
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
2024-11-22 22:21:43 +00:00
self.cinit()
dk = {}
for d in "dk dks dk,fk dks,fk".split():
zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1])
dk[d] = zj["dk"]
##
## dk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dk thumbs
self.assertIn('">folder</text>', self.curl("dk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk?th=x", "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dks"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dk/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dk/s1/f.t2?th=x", "u2")[1])
##
## dks
# should not be able to access dks with wrong dirkey,
zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
# dks should return correct dirkey of subfolders;
s1 = zj["dirs"][0]["href"]
self.assertEqual(s1.split("/")[0], "s1")
zs = self.curl("dks/%s&ls" % (s1), "u2")[1]
self.assertIn('"s2/?k=', zs)
##
## dks thumbs
self.assertIn('">folder</text>', self.curl("dks?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks?th=x", "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dks"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dks/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dks/s1/f.t2?th=x", "u2")[1])
##
## dky
# doesn't care about keys
zs = self.curl("dky?ls&k=ok", "u2")[1]
self.assertEqual(zs, self.curl("dky?ls", "u2")[1])
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dky thumbs
self.assertIn('">folder</text>', self.curl("dky?th=x", "u1")[1])
self.assertIn('">folder</text>', self.curl("dky?th=x", "u2")[1])
zs = "dky?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dky/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dky/s1/f.t2?th=x", "u2")[1])
##
## dk+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dk+fk thumbs
self.assertIn('">folder</text>', self.curl("dk,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk,fk?th=x", "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dk,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
##
## dks+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=")
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dks+fk thumbs
self.assertIn('">folder</text>', self.curl("dks,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks,fk?th=x", "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# subdir s1 without key
zs = "dks,fk/s1/?th=x"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with bad key
zs = "dks,fk/s1/?th=x&k=no"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with correct key
zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dks,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
# subdir
self.assertIn('">e403</text>', self.curl("dks,fk/s1/?th=x", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1])
zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"])
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
url = "dks,fk/%s" % zj["dirs"][0]["href"]
self.assertIn('"files"', self.curl(url + "&ls", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1])
def tardir(self, url, uname):
2023-12-17 22:30:22 +00:00
top = url.split("?")[0]
top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/"
url += ("&" if "?" in url else "?") + "tar"
h, b = self.curl(url, uname, True)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
2023-12-17 22:30:22 +00:00
if len(tar) != len([x for x in tar if x.startswith(top)]):
raise Exception("bad-prefix:", tar)
2024-03-21 18:51:23 +00:00
return " ".join([x[len(top) :] for x in tar])
2023-12-17 22:30:22 +00:00
def tarsel(self, url, uname, sel):
url += ("&" if "?" in url else "?") + "tar"
zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n'
zs += "\r\n".join(sel) + "\r\n--XD--\r\n"
2023-12-17 22:30:22 +00:00
zb = zs.encode("utf-8")
hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb
h, b = self.curl("/" + url, uname, True, req)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
return " ".join(tar)
def curl(self, url, uname, binary=False, req=b""):
req = req or hdr(url.replace("th=x", "th=j"), uname)
2024-11-22 22:21:43 +00:00
conn = self.conn.setbuf(req)
HttpCli(conn).run()
if binary:
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
return [h.decode("utf-8"), b]
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def log(self, src, msg, c=0):
2024-04-25 22:25:38 +00:00
print(msg, "\033[0m")