From c8a53278af644916a060dcd579ef0beeb5db825e Mon Sep 17 00:00:00 2001 From: zyxkad Date: Sat, 2 Mar 2024 21:17:27 -0700 Subject: [PATCH] fix infinity reconnect at log.io --- api.go | 75 ++++++------- dashboard/src/api/log.io.ts | 4 +- dashboard/src/components/LogBlock.vue | 83 +++++++++++---- dashboard/src/lang/index.ts | 2 +- dashboard/src/utils/ring.ts | 146 +++++++++++++++++++++++++- dashboard/src/views/HomeView.vue | 6 +- 6 files changed, 246 insertions(+), 70 deletions(-) diff --git a/api.go b/api.go index 0f14e282..a949435b 100644 --- a/api.go +++ b/api.go @@ -429,6 +429,7 @@ func (cr *Cluster) apiV0LogIO(rw http.ResponseWriter, req *http.Request) { for { clear(data) if err := conn.ReadJSON(&data); err != nil { + log.Errorf("[log.io]: Cannot read from peer:", err) return } typ, ok := data["type"].(string) @@ -493,48 +494,50 @@ func (cr *Cluster) apiV0LogIO(rw http.ResponseWriter, req *http.Request) { defer forceSendTimer.Stop() batchMsg := make([]any, 0, 64) - select { - case v := <-sendMsgCh: - batchMsg = append(batchMsg, v) - if !forceSendTimer.Stop() { - <-forceSendTimer.C - } - forceSendTimer.Reset(time.Second) - WAIT_MORE: - for { - select { - case v := <-sendMsgCh: - batchMsg = append(batchMsg, v) - case <-time.After(time.Millisecond * 20): - break WAIT_MORE - case <-forceSendTimer.C: - break WAIT_MORE + for { + select { + case v := <-sendMsgCh: + batchMsg = append(batchMsg, v) + if !forceSendTimer.Stop() { + <-forceSendTimer.C } - } - if len(batchMsg) == 1 { - if err := conn.WriteJSON(batchMsg[0]); err != nil { - return + forceSendTimer.Reset(time.Second) + WAIT_MORE: + for { + select { + case v := <-sendMsgCh: + batchMsg = append(batchMsg, v) + case <-time.After(time.Millisecond * 20): + break WAIT_MORE + case <-forceSendTimer.C: + break WAIT_MORE + } } - } else { - if err := conn.WriteJSON(batchMsg); err != nil { + if len(batchMsg) == 1 { + if err := conn.WriteJSON(batchMsg[0]); err != nil { + return + } + } else { + if err := conn.WriteJSON(batchMsg); err != nil { + return + } + } + // release objects + for i, _ := range batchMsg { + batchMsg[i] = nil + } + batchMsg = batchMsg[:0] + case <-pingTicker.C: + if err := conn.WriteJSON(Map{ + "type": "ping", + "data": time.Now().UnixMilli(), + }); err != nil { + log.Errorf("[log.io]: Error when sending ping packet: %v", err) return } - } - // release objects - for i, _ := range batchMsg { - batchMsg[i] = nil - } - batchMsg = batchMsg[:0] - case <-pingTicker.C: - if err := conn.WriteJSON(Map{ - "type": "ping", - "data": time.Now().UnixMilli(), - }); err != nil { - log.Errorf("[log.io]: Error when sending ping packet: %v", err) + case <-ctx.Done(): return } - case <-ctx.Done(): - return } } diff --git a/dashboard/src/api/log.io.ts b/dashboard/src/api/log.io.ts index a8d11aaf..603c7f7b 100644 --- a/dashboard/src/api/log.io.ts +++ b/dashboard/src/api/log.io.ts @@ -125,9 +125,7 @@ export class LogIO { } static async dial(token: string): Promise { - const wsTarget = `${httpToWs(window.location.protocol)}//${ - window.location.host - }/api/v0/log.io?level=debug` + const wsTarget = `${httpToWs(window.location.protocol)}//${window.location.host}/api/v0/log.io` const ws = new WebSocket(wsTarget) var connTimeout: ReturnType diff --git a/dashboard/src/components/LogBlock.vue b/dashboard/src/components/LogBlock.vue index 87fb5797..5bdc0e03 100644 --- a/dashboard/src/components/LogBlock.vue +++ b/dashboard/src/components/LogBlock.vue @@ -1,5 +1,6 @@