move tcp accept into mp-worker

This commit is contained in:
ed
2021-07-09 15:49:36 +02:00
parent b32d1f8ad3
commit 8fcde2a579
10 changed files with 76 additions and 233 deletions

View File

@@ -48,17 +48,16 @@ class HttpSrv(object):
self.log = broker.log
self.asrv = broker.asrv
self.disconnect_func = None
self.mutex = threading.Lock()
self.stopping = False
self.tp_nthr = 0 # actual
self.tp_ncli = 0 # fading
self.tp_time = None # latest worker collect
self.tp_q = None if self.args.no_htp else queue.LifoQueue()
self.srvs = []
self.clients = {}
self.workload = 0
self.workload_thr_alive = False
self.cb_ts = 0
self.cb_v = 0
@@ -111,11 +110,47 @@ class HttpSrv(object):
if self.tp_nthr > self.tp_ncli + 8:
self.stop_threads(4)
def listen(self, sck):
self.srvs.append(sck)
t = threading.Thread(target=self.thr_listen, args=(sck,))
t.daemon = True
t.start()
def thr_listen(self, srv_sck):
"""listens on a shared tcp server"""
ip, port = srv_sck.getsockname()
fno = srv_sck.fileno()
msg = "subscribed @ {}:{} f{},p{}".format(ip, port, fno, os.getpid())
self.log("httpsrv", msg)
while not self.stopping:
if self.args.log_conn:
self.log("httpsrv", "|%sC-ncli" % ("-" * 1,), c="1;30")
if len(self.clients) >= self.args.nc:
time.sleep(0.1)
continue
if self.args.log_conn:
self.log("httpsrv", "|%sC-acc1" % ("-" * 2,), c="1;30")
try:
sck, addr = srv_sck.accept()
except (OSError, socket.error) as ex:
self.log("httpsrv", "accept({}): {}".format(fno, ex), c=6)
if ex.errno not in [10038, 10054, 107, 57, 49, 9]:
raise
continue
if self.args.log_conn:
m = "|{}C-acc2 \033[0;36m{} \033[3{}m{}".format(
"-" * 3, ip, port % 8, port
)
self.log("%s %s" % addr, m, c="1;30")
self.accept(sck, addr)
def accept(self, sck, addr):
"""takes an incoming tcp connection and creates a thread to handle it"""
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-cthr" % ("-" * 5,), c="1;30")
now = time.time()
if self.tp_time and now - self.tp_time > 300:
@@ -167,6 +202,13 @@ class HttpSrv(object):
return len(self.clients)
def shutdown(self):
self.stopping = True
for srv in self.srvs:
try:
srv.close()
except:
pass
clients = list(self.clients.keys())
for cli in clients:
try:
@@ -184,25 +226,15 @@ class HttpSrv(object):
with self.mutex:
self.clients[cli] = 0
if self.is_mp:
self.workload += 50
if not self.workload_thr_alive:
self.workload_thr_alive = True
thr = threading.Thread(
target=self.thr_workload, name="httpsrv-workload"
)
thr.daemon = True
thr.start()
fno = sck.fileno()
try:
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-crun" % ("-" * 6,), c="1;30")
self.log("%s %s" % addr, "|%sC-crun" % ("-" * 4,), c="1;30")
cli.run()
except (OSError, socket.error) as ex:
if ex.errno not in [10038, 10054, 107, 57, 9]:
if ex.errno not in [10038, 10054, 107, 57, 49, 9]:
self.log(
"%s %s" % addr,
"run({}): {}".format(fno, ex),
@@ -212,7 +244,7 @@ class HttpSrv(object):
finally:
sck = cli.s
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 7,), c="1;30")
self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 5,), c="1;30")
try:
fno = sck.fileno()
@@ -237,35 +269,6 @@ class HttpSrv(object):
with self.mutex:
del self.clients[cli]
if self.disconnect_func:
self.disconnect_func(addr) # pylint: disable=not-callable
def thr_workload(self):
"""indicates the python interpreter workload caused by this HttpSrv"""
# avoid locking in extract_filedata by tracking difference here
while True:
time.sleep(0.2)
with self.mutex:
if not self.clients:
# no clients rn, termiante thread
self.workload_thr_alive = False
self.workload = 0
return
total = 0
with self.mutex:
for cli in self.clients.keys():
now = cli.workload
delta = now - self.clients[cli]
if delta < 0:
# was reset in HttpCli to prevent overflow
delta = now
total += delta
self.clients[cli] = now
self.workload = total
def cachebuster(self):
if time.time() - self.cb_ts < 1:
return self.cb_v