Skip to content

Commit

Permalink
add connection limits
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Jan 23, 2024
1 parent ef7a615 commit 4f97333
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 73 deletions.
11 changes: 11 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ A: `v<openbmclapi 版本>-<go-openbmclapi 构建计数>`, 例: `v1.6.7-60`
debug: false
# 是否打印访问信息, 默认为否 (这个选项对于压缩日志文件十分有用)
record_serve_info: false
# 是否禁用 https
nohttps: false
# 是否仅从主服务器下载文件
noopen: false
# 实际开放的公网主机名, 同 CLUSTER_IP
public_host: example.com
# 实际开放的公网端口, 同 CLUSTER_PUBLIC_PORT
Expand All @@ -83,6 +86,14 @@ cluster_id: ${CLUSTER_ID}
cluster_secret: ${CLUSTER_SECRET}
# 同步文件时最多打开的连接数量
download_max_conn: 64
# 服务器上行限制
serve_limit:
# 是否启用上行限制
enable: false
# 最大连接数量
max_conn: 16384
# 上行速率限制 (KiB/s), 0 表示无限制
upload_rate: 0

## 特殊要求: 重定向到 OSS
oss:
Expand Down
29 changes: 22 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ func (cr *Cluster) Connect(ctx context.Context) bool {
cr.socket.ErrorHandle = func(*Socket) {
connected()
go func() {
logWarn("Reconnecting due to SIO error")
if cr.Disable(ctx) {
if cr.disconnected() {
logWarn("Reconnecting due to SIO error")
if !cr.Connect(ctx) {
logError("Cannot reconnect to server, exit.")
os.Exit(1)
os.Exit(0x08)
}
if err := cr.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err, "; exit.")
os.Exit(1)
os.Exit(0x08)
}
}
}()
Expand Down Expand Up @@ -238,8 +238,6 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
ctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
defer cancel()
if !cr.KeepAlive(ctx) {
logError("TODO: Keep alive failed, exit.")
os.Exit(0x80)
if keepaliveCtx.Err() == nil {
logInfo("Reconnecting due to keepalive failed")
cr.Disable(keepaliveCtx)
Expand Down Expand Up @@ -282,6 +280,22 @@ func (cr *Cluster) KeepAlive(ctx context.Context) (ok bool) {
return true
}

func (cr *Cluster) disconnected() bool {
cr.mux.Lock()
defer cr.mux.Unlock()

if !cr.enabled.Swap(false) {
return false
}
if cr.cancelKeepalive != nil {
cr.cancelKeepalive()
cr.cancelKeepalive = nil
}
cr.socket.Close()
cr.socket = nil
return true
}

func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
cr.mux.Lock()
defer cr.mux.Unlock()
Expand All @@ -290,14 +304,14 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
logDebug("Extra disable")
return false
}
logInfo("Disabling cluster")
if cr.cancelKeepalive != nil {
cr.cancelKeepalive()
cr.cancelKeepalive = nil
}
if cr.socket == nil {
return false
}
logInfo("Disabling cluster")
{
logInfo("Making keepalive before disable")
tctx, cancel := context.WithTimeout(ctx, time.Second*10)
Expand Down Expand Up @@ -326,6 +340,7 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
logError("Disable failed: ack non true value")
return false
}
logWarn("Cluster disabled")
return true
}

Expand Down
36 changes: 24 additions & 12 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type OSSItem struct {
working atomic.Bool
}

type ServeLimitConfig struct {
Enable bool `yaml:"enable"`
MaxConn int `yaml:"max_conn"`
UploadRate int `yaml:"upload_rate"`
}

type OSSConfig struct {
Enable bool `yaml:"enable"`
List []*OSSItem `yaml:"list"`
Expand All @@ -51,18 +57,19 @@ type HijackConfig struct {
}

type Config struct {
Debug bool `yaml:"debug"`
RecordServeInfo bool `yaml:"record_serve_info"`
Nohttps bool `yaml:"nohttps"`
NoOpen bool `yaml:"noopen"`
PublicHost string `yaml:"public_host"`
PublicPort uint16 `yaml:"public_port"`
Port uint16 `yaml:"port"`
ClusterId string `yaml:"cluster_id"`
ClusterSecret string `yaml:"cluster_secret"`
DownloadMaxConn int `yaml:"download_max_conn"`
Oss OSSConfig `yaml:"oss"`
Hijack HijackConfig `yaml:"hijack_port"`
Debug bool `yaml:"debug"`
RecordServeInfo bool `yaml:"record_serve_info"`
Nohttps bool `yaml:"nohttps"`
NoOpen bool `yaml:"noopen"`
PublicHost string `yaml:"public_host"`
PublicPort uint16 `yaml:"public_port"`
Port uint16 `yaml:"port"`
ClusterId string `yaml:"cluster_id"`
ClusterSecret string `yaml:"cluster_secret"`
DownloadMaxConn int `yaml:"download_max_conn"`
ServeLimit ServeLimitConfig `yaml:"serve_limit"`
Oss OSSConfig `yaml:"oss"`
Hijack HijackConfig `yaml:"hijack_port"`
}

func readConfig() (config Config) {
Expand All @@ -78,6 +85,11 @@ func readConfig() (config Config) {
ClusterId: "${CLUSTER_ID}",
ClusterSecret: "${CLUSTER_SECRET}",
DownloadMaxConn: 64,
ServeLimit: ServeLimitConfig{
Enable: false,
MaxConn: 16384,
UploadRate: 1024 * 12, // 12MB
},

Oss: OSSConfig{
Enable: false,
Expand Down
5 changes: 5 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
debug: false
record_serve_info: false
nohttps: false
noopen: false
public_host: example.com
public_port: 8080
port: 4000
cluster_id: ${CLUSTER_ID}
cluster_secret: ${CLUSTER_SECRET}
download_max_conn: 64
serve_limit:
enable: false
max_conn: 0
upload_rate: 0
oss:
enable: false
list:
Expand Down
40 changes: 27 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,29 @@ func (r *countReader) Read(buf []byte) (n int, err error) {
type statusResponseWriter struct {
http.ResponseWriter
status int
wrote int64
}

func (w *statusResponseWriter) WriteHeader(status int) {
w.status = status
w.ResponseWriter.WriteHeader(status)
}

func (w *statusResponseWriter) Write(buf []byte) (n int, err error) {
n, err = w.ResponseWriter.Write(buf)
w.wrote += (int64)(n)
return
}

func (cr *Cluster) GetHandler() (handler http.Handler) {
cr.handlerAPIv0 = http.StripPrefix("/api/v0", cr.initAPIv0())

handler = cr
{
type record struct {
used float64
ua string
used float64
bytes float64
ua string
}
recordCh := make(chan record, 1024)

Expand All @@ -86,7 +94,10 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
} else if used > time.Second {
used = used.Truncate(time.Microsecond)
}
logInfof("Serve %d | %12v | %-15s | %s | %-4s %s | %q", srw.status, used, addr, req.Proto, req.Method, req.RequestURI, ua)
logInfof("Serve %d | %12v | %7s | %-15s | %s | %-4s %s | %q",
srw.status, used, bytesToUnit((float64)(srw.wrote)),
addr, req.Proto,
req.Method, req.RequestURI, ua)
}
if srw.status < 200 && 400 <= srw.status {
return
Expand All @@ -96,6 +107,7 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
}
var rec record
rec.used = used.Seconds()
rec.bytes = (float64)(srw.wrote)
rec.ua, _ = split(ua, '/')
select {
case recordCh <- rec:
Expand All @@ -110,33 +122,35 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
defer updateTicker.Stop()

var (
total int64
totalUsed float64
uas = make(map[string]int, 10)
total int
totalUsed float64
totalBytes float64
uas = make(map[string]int, 10)
)
for {
select {
case <-updateTicker.C:
cr.stats.mux.Lock()
total = 0
totalUsed = 0

logInfof("Served %d requests, %s, used %.2fs, %s/s", total, bytesToUnit(totalBytes), totalUsed, bytesToUnit(totalBytes/totalUsed))
for ua, v := range uas {
if ua == "" {
ua = "[Unknown]"
}
cr.stats.Accesses[ua] += v
}

total = 0
totalUsed = 0
totalBytes = 0
clear(uas)

cr.stats.mux.Unlock()
case rec := <-recordCh:
total++
totalUsed += rec.used
totalBytes += rec.bytes
uas[rec.ua]++

if total%100 == 0 {
avg := (time.Duration)(totalUsed / (float64)(total) * (float64)(time.Second))
logInfof("Served %d requests, total used %.2fs, avg %v", total, totalUsed, avg)
}
case <-disabled:
return
}
Expand Down
Loading

0 comments on commit 4f97333

Please sign in to comment.