re-org files into packages. update copyright. accept multiple ports.

This commit is contained in:
Shizun Ge
2024-01-16 21:06:03 -08:00
parent df4cd39c57
commit fee1f1a67d
5 changed files with 234 additions and 176 deletions

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2021 Shizun Ge // Copyright (C) 2021-2024 Shizun Ge
// //
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
@@ -14,11 +14,12 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
// //
package main package client
import ( import (
"math/rand" "math/rand"
"net" "net"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -65,10 +66,14 @@ func NewClient(conn net.Conn, interval time.Duration, maxClients int64) *Client
} }
} }
func (c *Client) IpAddr() string { func (c *Client) RemoteIpAddr() string {
return c.conn.RemoteAddr().(*net.TCPAddr).IP.String() return c.conn.RemoteAddr().(*net.TCPAddr).IP.String()
} }
func (c *Client) LocalPort() string {
return strconv.Itoa(c.conn.LocalAddr().(*net.TCPAddr).Port)
}
func (c *Client) Send(bannerMaxLength int64) (int, error) { func (c *Client) Send(bannerMaxLength int64) (int, error) {
if time.Now().Before(c.next) { if time.Now().Before(c.next) {
time.Sleep(c.next.Sub(time.Now())) time.Sleep(c.next.Sub(time.Now()))

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 Shizun Ge // Copyright (C) 2023-2024 Shizun Ge
// //
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
@@ -14,16 +14,16 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
// //
package coordinates package geoip
// Map country's ISO to their capital's latitude and longitude. // Map country's ISO to their capital's latitude and longitude.
// Country's ISO see https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 // Country's ISO see https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2
type Location struct { type location struct {
Latitude float64 Latitude float64
Longitude float64 Longitude float64
} }
var Country = map[string]Location{ var countryToLocation = map[string]location{
"AD": {42.5, 1.5}, "AD": {42.5, 1.5},
"AE": {24.4511, 54.3969}, "AE": {24.4511, 54.3969},
"AF": {34.5328, 69.1658}, "AF": {34.5328, 69.1658},

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2021-2023 Shizun Ge // Copyright (C) 2021-2024 Shizun Ge
// //
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
@@ -14,7 +14,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
// //
package main package geoip
import ( import (
"encoding/json" "encoding/json"
@@ -24,8 +24,6 @@ import (
"net/http" "net/http"
"strings" "strings"
"endlessh-go/coordinates"
"github.com/oschwald/geoip2-golang" "github.com/oschwald/geoip2-golang"
"github.com/pierrre/geohash" "github.com/pierrre/geohash"
) )
@@ -35,8 +33,6 @@ type GeoOption struct {
MaxMindDbFileName string MaxMindDbFileName string
} }
var ()
func composeLocation(country string, region string, city string) string { func composeLocation(country string, region string, city string) string {
var locations []string var locations []string
for _, s := range []string{country, region, city} { for _, s := range []string{country, region, city} {
@@ -120,7 +116,7 @@ func geohashAndLocationFromMaxMindDb(ipAddr, maxMindDbFileName string) (string,
iso := cityRecord.Country.IsoCode iso := cityRecord.Country.IsoCode
if latitude == 0 && longitude == 0 { if latitude == 0 && longitude == 0 {
// In case of using Country DB, city is not available. // In case of using Country DB, city is not available.
loc, ok := coordinates.Country[iso] loc, ok := countryToLocation[iso]
if ok { if ok {
latitude = loc.Latitude latitude = loc.Latitude
longitude = loc.Longitude longitude = loc.Longitude
@@ -138,7 +134,7 @@ func geohashAndLocationFromMaxMindDb(ipAddr, maxMindDbFileName string) (string,
return gh, country, location, nil return gh, country, location, nil
} }
func geohashAndLocation(ipAddr string, option GeoOption) (string, string, string, error) { func GeohashAndLocation(ipAddr string, option GeoOption) (string, string, string, error) {
switch option.GeoipSupplier { switch option.GeoipSupplier {
case "off": case "off":
return "s000", "Geohash off", "Geohash off", nil return "s000", "Geohash off", "Geohash off", nil

197
main.go
View File

@@ -1,4 +1,4 @@
// Copyright (C) 2021-2023 Shizun Ge // Copyright (C) 2021-2024 Shizun Ge
// //
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
@@ -17,144 +17,21 @@
package main package main
import ( import (
"endlessh-go/client"
"endlessh-go/geoip"
"endlessh-go/metrics"
"flag" "flag"
"fmt" "fmt"
"net" "net"
"net/http"
"os" "os"
"sync/atomic" "strings"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
var ( func startSending(maxClients int64, bannerMaxLength int64, records chan<- metrics.RecordEntry) chan *client.Client {
numTotalClients int64 clients := make(chan *client.Client, maxClients)
numTotalClientsClosed int64
numTotalBytes int64
numTotalMilliseconds int64
totalClients prometheus.CounterFunc
totalClientsClosed prometheus.CounterFunc
totalBytes prometheus.CounterFunc
totalSeconds prometheus.CounterFunc
clientIP *prometheus.CounterVec
clientSeconds *prometheus.CounterVec
)
func initPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
totalClients = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_client_open_count_total",
Help: "Total number of clients that tried to connect to this host.",
}, func() float64 {
return float64(numTotalClients)
},
)
totalClientsClosed = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_client_closed_count_total",
Help: "Total number of clients that stopped connecting to this host.",
}, func() float64 {
return float64(numTotalClientsClosed)
},
)
totalBytes = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_sent_bytes_total",
Help: "Total bytes sent to clients that tried to connect to this host.",
}, func() float64 {
return float64(numTotalBytes)
},
)
totalSeconds = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_trapped_time_seconds_total",
Help: "Total seconds clients spent on endlessh.",
}, func() float64 {
return float64(numTotalMilliseconds) / 1000
},
)
clientIP = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "endlessh_client_open_count",
Help: "Number of connections of clients.",
},
[]string{"ip", "geohash", "country", "location"},
)
clientSeconds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "endlessh_client_trapped_time_seconds",
Help: "Seconds a client spends on endlessh.",
},
[]string{"ip"},
)
promReg := prometheus.NewRegistry()
promReg.MustRegister(totalClients)
promReg.MustRegister(totalClientsClosed)
promReg.MustRegister(totalBytes)
promReg.MustRegister(totalSeconds)
promReg.MustRegister(clientIP)
promReg.MustRegister(clientSeconds)
handler := promhttp.HandlerFor(promReg, promhttp.HandlerOpts{EnableOpenMetrics: true})
http.Handle("/"+prometheusEntry, handler)
go func() {
glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry)
http.ListenAndServe(prometheusHost+":"+prometheusPort, nil)
}()
}
const (
recordTypeStart = iota
recordTypeSend = iota
recordTypeStop = iota
)
type recordEntry struct {
RecordType int
IpAddr string
BytesSent int
MillisecondsSpent int64
}
func startRecording(maxClients int64, prometheusEnabled bool, geoOption GeoOption) chan recordEntry {
records := make(chan recordEntry, maxClients)
go func() {
for {
r, more := <-records
if !more {
return
}
if !prometheusEnabled {
continue
}
switch r.RecordType {
case recordTypeStart:
geohash, country, location, err := geohashAndLocation(r.IpAddr, geoOption)
if err != nil {
glog.Warningf("Failed to obatin the geohash of %v: %v.", r.IpAddr, err)
}
clientIP.With(prometheus.Labels{
"ip": r.IpAddr,
"geohash": geohash,
"country": country,
"location": location}).Inc()
atomic.AddInt64(&numTotalClients, 1)
case recordTypeSend:
clientSeconds.With(prometheus.Labels{"ip": r.IpAddr}).Add(float64(r.MillisecondsSpent) / 1000)
atomic.AddInt64(&numTotalBytes, int64(r.BytesSent))
atomic.AddInt64(&numTotalMilliseconds, r.MillisecondsSpent)
case recordTypeStop:
atomic.AddInt64(&numTotalClientsClosed, 1)
}
}
}()
return records
}
func startSending(maxClients int64, bannerMaxLength int64, records chan<- recordEntry) chan *Client {
clients := make(chan *Client, maxClients)
go func() { go func() {
for { for {
c, more := <-clients c, more := <-clients
@@ -163,20 +40,23 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- record
} }
go func() { go func() {
bytesSent, err := c.Send(bannerMaxLength) bytesSent, err := c.Send(bannerMaxLength)
ipAddr := c.IpAddr() remoteIpAddr := c.RemoteIpAddr()
localPort := c.LocalPort()
if err != nil { if err != nil {
c.Close() c.Close()
records <- recordEntry{ records <- metrics.RecordEntry{
RecordType: recordTypeStop, RecordType: metrics.RecordEntryTypeStop,
IpAddr: ipAddr, IpAddr: remoteIpAddr,
LocalPort: localPort,
} }
return return
} }
millisecondsSpent := c.MillisecondsSinceLast() millisecondsSpent := c.MillisecondsSinceLast()
clients <- c clients <- c
records <- recordEntry{ records <- metrics.RecordEntry{
RecordType: recordTypeSend, RecordType: metrics.RecordEntryTypeSend,
IpAddr: ipAddr, IpAddr: remoteIpAddr,
LocalPort: localPort,
BytesSent: bytesSent, BytesSent: bytesSent,
MillisecondsSpent: millisecondsSpent, MillisecondsSpent: millisecondsSpent,
} }
@@ -186,7 +66,8 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- record
return clients return clients
} }
func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *Client, records chan<- recordEntry) { func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *client.Client, records chan<- metrics.RecordEntry) {
go func() {
l, err := net.Listen(connType, connHost+":"+connPort) l, err := net.Listen(connType, connHost+":"+connPort)
if err != nil { if err != nil {
glog.Errorf("Error listening: %v", err) glog.Errorf("Error listening: %v", err)
@@ -202,23 +83,40 @@ func startAccepting(maxClients int64, connType, connHost, connPort string, inter
glog.Errorf("Error accepting connection from port %v: %v", connPort, err) glog.Errorf("Error accepting connection from port %v: %v", connPort, err)
os.Exit(1) os.Exit(1)
} }
c := NewClient(conn, interval, maxClients) c := client.NewClient(conn, interval, maxClients)
ipAddr := c.IpAddr() remoteIpAddr := c.RemoteIpAddr()
records <- recordEntry{ records <- metrics.RecordEntry{
RecordType: recordTypeStart, RecordType: metrics.RecordEntryTypeStart,
IpAddr: ipAddr, IpAddr: remoteIpAddr,
LocalPort: connPort,
} }
clients <- c clients <- c
} }
}()
} }
type arrayStrings []string
func (a *arrayStrings) String() string {
return strings.Join(*a, ", ")
}
func (a *arrayStrings) Set(value string) error {
*a = append(*a, value)
return nil
}
const defaultPort = "2222"
var connPorts arrayStrings
func main() { func main() {
intervalMs := flag.Int("interval_ms", 1000, "Message millisecond delay") intervalMs := flag.Int("interval_ms", 1000, "Message millisecond delay")
bannerMaxLength := flag.Int64("line_length", 32, "Maximum banner line length") bannerMaxLength := flag.Int64("line_length", 32, "Maximum banner line length")
maxClients := flag.Int64("max_clients", 4096, "Maximum number of clients") maxClients := flag.Int64("max_clients", 4096, "Maximum number of clients")
connType := flag.String("conn_type", "tcp", "Connection type. Possible values are tcp, tcp4, tcp6") connType := flag.String("conn_type", "tcp", "Connection type. Possible values are tcp, tcp4, tcp6")
connHost := flag.String("host", "0.0.0.0", "SSH listening address") connHost := flag.String("host", "0.0.0.0", "SSH listening address")
connPort := flag.String("port", "2222", "SSH listening port") flag.Var(&connPorts, "port", "SSH listening port")
prometheusEnabled := flag.Bool("enable_prometheus", false, "Enable prometheus") prometheusEnabled := flag.Bool("enable_prometheus", false, "Enable prometheus")
prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus") prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus")
prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus") prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus")
@@ -236,10 +134,10 @@ func main() {
if *connType == "tcp6" && *prometheusHost == "0.0.0.0" { if *connType == "tcp6" && *prometheusHost == "0.0.0.0" {
*prometheusHost = "[::]" *prometheusHost = "[::]"
} }
initPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry) metrics.InitPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry)
} }
records := startRecording(*maxClients, *prometheusEnabled, GeoOption{ records := metrics.StartRecording(*maxClients, *prometheusEnabled, geoip.GeoOption{
GeoipSupplier: *geoipSupplier, GeoipSupplier: *geoipSupplier,
MaxMindDbFileName: *maxMindDbFileName, MaxMindDbFileName: *maxMindDbFileName,
}) })
@@ -250,7 +148,12 @@ func main() {
if *connType == "tcp6" && *connHost == "0.0.0.0" { if *connType == "tcp6" && *connHost == "0.0.0.0" {
*connHost = "[::]" *connHost = "[::]"
} }
go startAccepting(*maxClients, *connType, *connHost, *connPort, interval, clients, records) if len(connPorts) == 0 {
connPorts = append(connPorts, defaultPort)
}
for _, connPort := range connPorts {
startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records)
}
for { for {
time.Sleep(time.Duration(1<<63 - 1)) time.Sleep(time.Duration(1<<63 - 1))
} }

154
metrics/metrics.go Normal file
View File

@@ -0,0 +1,154 @@
// Copyright (C) 2024 Shizun Ge
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
package metrics
import (
"endlessh-go/geoip"
"net/http"
"sync/atomic"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
numTotalClients int64
numTotalClientsClosed int64
numTotalBytes int64
numTotalMilliseconds int64
totalClients prometheus.CounterFunc
totalClientsClosed prometheus.CounterFunc
totalBytes prometheus.CounterFunc
totalSeconds prometheus.CounterFunc
clientIP *prometheus.CounterVec
clientSeconds *prometheus.CounterVec
)
func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
totalClients = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_client_open_count_total",
Help: "Total number of clients that tried to connect to this host.",
}, func() float64 {
return float64(numTotalClients)
},
)
totalClientsClosed = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_client_closed_count_total",
Help: "Total number of clients that stopped connecting to this host.",
}, func() float64 {
return float64(numTotalClientsClosed)
},
)
totalBytes = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_sent_bytes_total",
Help: "Total bytes sent to clients that tried to connect to this host.",
}, func() float64 {
return float64(numTotalBytes)
},
)
totalSeconds = prometheus.NewCounterFunc(
prometheus.CounterOpts{
Name: "endlessh_trapped_time_seconds_total",
Help: "Total seconds clients spent on endlessh.",
}, func() float64 {
return float64(numTotalMilliseconds) / 1000
},
)
clientIP = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "endlessh_client_open_count",
Help: "Number of connections of clients.",
},
[]string{"ip", "local_port", "geohash", "country", "location"},
)
clientSeconds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "endlessh_client_trapped_time_seconds",
Help: "Seconds a client spends on endlessh.",
},
[]string{"ip", "local_port"},
)
promReg := prometheus.NewRegistry()
promReg.MustRegister(totalClients)
promReg.MustRegister(totalClientsClosed)
promReg.MustRegister(totalBytes)
promReg.MustRegister(totalSeconds)
promReg.MustRegister(clientIP)
promReg.MustRegister(clientSeconds)
handler := promhttp.HandlerFor(promReg, promhttp.HandlerOpts{EnableOpenMetrics: true})
http.Handle("/"+prometheusEntry, handler)
go func() {
glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry)
http.ListenAndServe(prometheusHost+":"+prometheusPort, nil)
}()
}
const (
RecordEntryTypeStart = iota
RecordEntryTypeSend = iota
RecordEntryTypeStop = iota
)
type RecordEntry struct {
RecordType int
IpAddr string
LocalPort string
BytesSent int
MillisecondsSpent int64
}
func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.GeoOption) chan RecordEntry {
records := make(chan RecordEntry, maxClients)
go func() {
for {
r, more := <-records
if !more {
return
}
if !prometheusEnabled {
continue
}
switch r.RecordType {
case RecordEntryTypeStart:
geohash, country, location, err := geoip.GeohashAndLocation(r.IpAddr, geoOption)
if err != nil {
glog.Warningf("Failed to obatin the geohash of %v: %v.", r.IpAddr, err)
}
clientIP.With(prometheus.Labels{
"ip": r.IpAddr,
"local_port": r.LocalPort,
"geohash": geohash,
"country": country,
"location": location}).Inc()
atomic.AddInt64(&numTotalClients, 1)
case RecordEntryTypeSend:
clientSeconds.With(prometheus.Labels{
"ip": r.IpAddr,
"local_port": r.LocalPort}).Add(float64(r.MillisecondsSpent) / 1000)
atomic.AddInt64(&numTotalBytes, int64(r.BytesSent))
atomic.AddInt64(&numTotalMilliseconds, r.MillisecondsSpent)
case RecordEntryTypeStop:
atomic.AddInt64(&numTotalClientsClosed, 1)
}
}
}()
return records
}