196 lines
6.3 KiB
Go
196 lines
6.3 KiB
Go
// Copyright (C) 2024 Shizun Ge
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
//
|
|
|
|
package metrics
|
|
|
|
import (
|
|
"endlessh-go/geoip"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
"strings"
|
|
"net"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
)
|
|
|
|
var (
|
|
pq *UpdatablePriorityQueue
|
|
totalClients *prometheus.CounterVec
|
|
totalClientsClosed *prometheus.CounterVec
|
|
totalBytes *prometheus.CounterVec
|
|
totalSeconds *prometheus.CounterVec
|
|
clientIP *prometheus.CounterVec
|
|
clientSeconds *prometheus.CounterVec
|
|
)
|
|
|
|
func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
|
|
pq = NewUpdatablePriorityQueue()
|
|
totalClients = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_client_open_count_total",
|
|
Help: "Total number of clients that tried to connect to this host.",
|
|
}, []string{"local_port"},
|
|
)
|
|
totalClientsClosed = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_client_closed_count_total",
|
|
Help: "Total number of clients that stopped connecting to this host.",
|
|
}, []string{"local_port"},
|
|
)
|
|
totalBytes = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_sent_bytes_total",
|
|
Help: "Total bytes sent to clients that tried to connect to this host.",
|
|
}, []string{"local_port"},
|
|
)
|
|
totalSeconds = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_trapped_time_seconds_total",
|
|
Help: "Total seconds clients spent on endlessh.",
|
|
}, []string{"local_port"},
|
|
)
|
|
clientIP = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_client_open_count",
|
|
Help: "Number of connections of clients.",
|
|
},
|
|
[]string{"ip", "local_port", "geohash", "country", "location"},
|
|
)
|
|
clientSeconds = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endlessh_client_trapped_time_seconds",
|
|
Help: "Seconds a client spends on endlessh.",
|
|
},
|
|
[]string{"ip", "local_port"},
|
|
)
|
|
promReg := prometheus.NewRegistry()
|
|
promReg.MustRegister(totalClients)
|
|
promReg.MustRegister(totalClientsClosed)
|
|
promReg.MustRegister(totalBytes)
|
|
promReg.MustRegister(totalSeconds)
|
|
promReg.MustRegister(clientIP)
|
|
promReg.MustRegister(clientSeconds)
|
|
handler := promhttp.HandlerFor(promReg, promhttp.HandlerOpts{EnableOpenMetrics: true})
|
|
http.Handle("/"+prometheusEntry, handler)
|
|
go func() {
|
|
|
|
if strings.HasPrefix(prometheusHost, "unix:") {
|
|
socketPath := prometheusHost[5:] // trim the "unix:" prefix
|
|
glog.Infof("Starting Prometheus on Unix socket %v, entry point is /%v", socketPath, prometheusEntry)
|
|
serveOnUnixSocket(socketPath)
|
|
} else {
|
|
ipPort := prometheusHost+":"+prometheusPort
|
|
glog.Infof("Starting Prometheus on IP port %v, entry point is /%v", ipPort, prometheusEntry)
|
|
serveOnIpPort(ipPort)
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
func serveOnUnixSocket(socketPath string) {
|
|
_ = os.Remove(socketPath) // allow failure
|
|
unixListener, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
glog.Errorf("Error starting Prometheus on socket %v: %v", socketPath, err)
|
|
os.Exit(1)
|
|
}
|
|
if err := http.Serve(unixListener, nil); err != nil {
|
|
glog.Errorf("Error starting Prometheus at socket %v: %v", socketPath, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func serveOnIpPort(ipPort string) {
|
|
if err := http.ListenAndServe(ipPort, nil); err != nil {
|
|
glog.Errorf("Error starting Prometheus at IP port %v: %v", ipPort, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
const (
|
|
RecordEntryTypeStart = iota
|
|
RecordEntryTypeSend = iota
|
|
RecordEntryTypeStop = iota
|
|
RecordEntryTypeClean = iota
|
|
)
|
|
|
|
type RecordEntry struct {
|
|
RecordType int
|
|
IpAddr string
|
|
LocalPort string
|
|
MillisecondsSpent int64
|
|
BytesSent int
|
|
}
|
|
|
|
func StartRecording(maxClients int64, prometheusEnabled bool, prometheusCleanUnseenSeconds int, geoOption geoip.GeoOption) chan RecordEntry {
|
|
records := make(chan RecordEntry, maxClients)
|
|
go func() {
|
|
for {
|
|
r, more := <-records
|
|
if !more {
|
|
return
|
|
}
|
|
if !prometheusEnabled {
|
|
continue
|
|
}
|
|
switch r.RecordType {
|
|
case RecordEntryTypeStart:
|
|
geohash, country, location, err := geoip.GeohashAndLocation(r.IpAddr, geoOption)
|
|
if err != nil {
|
|
glog.Warningf("Failed to obtain the geohash of %v: %v.", r.IpAddr, err)
|
|
}
|
|
clientIP.With(prometheus.Labels{
|
|
"ip": r.IpAddr,
|
|
"local_port": r.LocalPort,
|
|
"geohash": geohash,
|
|
"country": country,
|
|
"location": location}).Inc()
|
|
totalClients.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
|
|
pq.Update(r.IpAddr, time.Now())
|
|
case RecordEntryTypeSend:
|
|
secondsSpent := float64(r.MillisecondsSpent) / 1000
|
|
clientSeconds.With(prometheus.Labels{
|
|
"ip": r.IpAddr,
|
|
"local_port": r.LocalPort}).Add(secondsSpent)
|
|
totalSeconds.With(prometheus.Labels{"local_port": r.LocalPort}).Add(secondsSpent)
|
|
totalBytes.With(prometheus.Labels{"local_port": r.LocalPort}).Add(float64(r.BytesSent))
|
|
pq.Update(r.IpAddr, time.Now())
|
|
case RecordEntryTypeStop:
|
|
secondsSpent := float64(r.MillisecondsSpent) / 1000
|
|
clientSeconds.With(prometheus.Labels{
|
|
"ip": r.IpAddr,
|
|
"local_port": r.LocalPort}).Add(secondsSpent)
|
|
totalSeconds.With(prometheus.Labels{"local_port": r.LocalPort}).Add(secondsSpent)
|
|
totalClientsClosed.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
|
|
pq.Update(r.IpAddr, time.Now())
|
|
case RecordEntryTypeClean:
|
|
top := pq.Peek()
|
|
deadline := time.Now().Add(-time.Second * time.Duration(prometheusCleanUnseenSeconds))
|
|
for top != nil && top.Value.Before(deadline) {
|
|
clientIP.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
|
|
clientSeconds.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
|
|
pq.Pop()
|
|
top = pq.Peek()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return records
|
|
}
|