Skip to content

Commit

Permalink
fix infinity reconnect at log.io
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Mar 3, 2024
1 parent 8aa1685 commit c8a5327
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 70 deletions.
75 changes: 39 additions & 36 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ func (cr *Cluster) apiV0LogIO(rw http.ResponseWriter, req *http.Request) {
for {
clear(data)
if err := conn.ReadJSON(&data); err != nil {
log.Errorf("[log.io]: Cannot read from peer:", err)

Check failure on line 432 in api.go

View workflow job for this annotation

GitHub Actions / build

github.com/LiterMC/go-openbmclapi/log.Errorf call has arguments but no formatting directives
return
}
typ, ok := data["type"].(string)
Expand Down Expand Up @@ -493,48 +494,50 @@ func (cr *Cluster) apiV0LogIO(rw http.ResponseWriter, req *http.Request) {
defer forceSendTimer.Stop()

batchMsg := make([]any, 0, 64)
select {
case v := <-sendMsgCh:
batchMsg = append(batchMsg, v)
if !forceSendTimer.Stop() {
<-forceSendTimer.C
}
forceSendTimer.Reset(time.Second)
WAIT_MORE:
for {
select {
case v := <-sendMsgCh:
batchMsg = append(batchMsg, v)
case <-time.After(time.Millisecond * 20):
break WAIT_MORE
case <-forceSendTimer.C:
break WAIT_MORE
for {
select {
case v := <-sendMsgCh:
batchMsg = append(batchMsg, v)
if !forceSendTimer.Stop() {
<-forceSendTimer.C
}
}
if len(batchMsg) == 1 {
if err := conn.WriteJSON(batchMsg[0]); err != nil {
return
forceSendTimer.Reset(time.Second)
WAIT_MORE:
for {
select {
case v := <-sendMsgCh:
batchMsg = append(batchMsg, v)
case <-time.After(time.Millisecond * 20):
break WAIT_MORE
case <-forceSendTimer.C:
break WAIT_MORE
}
}
} else {
if err := conn.WriteJSON(batchMsg); err != nil {
if len(batchMsg) == 1 {
if err := conn.WriteJSON(batchMsg[0]); err != nil {
return
}
} else {
if err := conn.WriteJSON(batchMsg); err != nil {
return
}
}
// release objects
for i, _ := range batchMsg {
batchMsg[i] = nil
}
batchMsg = batchMsg[:0]
case <-pingTicker.C:
if err := conn.WriteJSON(Map{
"type": "ping",
"data": time.Now().UnixMilli(),
}); err != nil {
log.Errorf("[log.io]: Error when sending ping packet: %v", err)
return
}
}
// release objects
for i, _ := range batchMsg {
batchMsg[i] = nil
}
batchMsg = batchMsg[:0]
case <-pingTicker.C:
if err := conn.WriteJSON(Map{
"type": "ping",
"data": time.Now().UnixMilli(),
}); err != nil {
log.Errorf("[log.io]: Error when sending ping packet: %v", err)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}

Expand Down
4 changes: 1 addition & 3 deletions dashboard/src/api/log.io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ export class LogIO {
}

static async dial(token: string): Promise<LogIO> {
const wsTarget = `${httpToWs(window.location.protocol)}//${
window.location.host
}/api/v0/log.io?level=debug`
const wsTarget = `${httpToWs(window.location.protocol)}//${window.location.host}/api/v0/log.io`
const ws = new WebSocket(wsTarget)

var connTimeout: ReturnType<typeof setTimeout>
Expand Down
83 changes: 61 additions & 22 deletions dashboard/src/components/LogBlock.vue
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<script setup lang="ts">
import { ref, reactive, watch, nextTick, onMounted, onBeforeUnmount } from 'vue'
import { ref, reactive, nextTick, getCurrentInstance, onMounted, onBeforeUnmount } from 'vue'
import { RingBuffer } from '@/utils/ring'
interface Log {
_inc?: number
Expand All @@ -8,21 +9,74 @@ interface Log {
log: string
}
const MAX_LOG_LENGTH = 1024 * 2
const box = ref<HTMLElement>()
const logs = reactive<Log[]>([])
const MAX_LOG_LENGTH = 1024 * 4
const logs = reactive(RingBuffer.create<Log>(MAX_LOG_LENGTH))
var logInc = 0
var focusLastLog = true
var justSplicedLog = false
var logDelayWatcher: Promise<void> | null = null
var scrollYTarget: number | null = null
function registerAnimationFrame(callback: (dt: number) => void | boolean): () => boolean {
var n: ReturnType<typeof window.requestAnimationFrame> | null = null
var last: number = document.timeline.currentTime as number
const step = (ts: number) => {
n = null
const dt = ts - last
last = ts
if (callback(dt) === false) {
return
}
n = window.requestAnimationFrame(step)
}
n = window.requestAnimationFrame(step)
return () => {
if (n === null) {
return true
}
window.cancelAnimationFrame(n)
n = null
return false
}
}
function activeScroller(): void {
const MIN_SCROLL_SPEED = 100 // 100px per second
registerAnimationFrame((dt: number): boolean => {
if (!focusLastLog || !scrollYTarget || document.hidden || !box.value) {
return false
}
const diff = scrollYTarget - (box.value.scrollTop + box.value.clientHeight)
const minDist = (dt / 1000) * MIN_SCROLL_SPEED
if (diff <= minDist) {
box.value.scrollTop = scrollYTarget - box.value.clientHeight
return false
}
box.value.scrollTop = scrollYTarget - box.value.clientHeight - diff * 0.9
return true
})
}
function pushLog(log: Log): void {
log._inc = logInc = (logInc + 1) % 65536
logs.push(log)
const minI = logs.length - MAX_LOG_LENGTH
if (minI > 0) {
logs.splice(0, minI)
justSplicedLog = true
if (!logDelayWatcher) {
logDelayWatcher = nextTick().then(() => {
if (document.hidden || !box.value) {
return
}
const diff = box.value.scrollHeight - (box.value.scrollTop + box.value.clientHeight)
focusLastLog ||= diff < 5
if (focusLastLog && !document.hidden) {
scrollYTarget = box.value.scrollHeight
activeScroller()
}
})
}
}
Expand Down Expand Up @@ -53,21 +107,6 @@ function onVisibilityChange(): void {
})
}
watch(logs, async (logs: Log[]) => {
if (document.hidden || !box.value) {
return
}
const diff = box.value.scrollHeight - (box.value.scrollTop + box.value.clientHeight)
focusLastLog ||= diff < 5
if (focusLastLog && !document.hidden) {
await nextTick()
box.value.scroll({
top: box.value.scrollHeight,
behavior: diff < box.value.clientHeight ? 'smooth' : 'auto',
})
}
})
function formatDate(date: Date): string {
const pad2 = (n: number) => n.toString().padStart(2, '0')
return `${pad2(date.getHours())}:${pad2(date.getMinutes())}:${pad2(date.getSeconds())}`
Expand Down
2 changes: 1 addition & 1 deletion dashboard/src/lang/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export function setLang(lang: Lang | string): Lang | null {
}

export function tr(key: string, ...values: unknown[]): string {
console.debug('translating:', key)
// console.debug('translating:', key)
const item = currentLang.value
let cur: string | LangMap | null = currentTr.value
if (!cur || (key && typeof cur === 'string')) {
Expand Down
146 changes: 141 additions & 5 deletions dashboard/src/utils/ring.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,148 @@

export class RingBuffer<T> {
private readonly arr: T[]
export class RingBuffer<T> extends Array<T> {
private start: number
private end: number

constructor(size: number) {
this.arr = Array(size)
private constructor(size: number) {
super(size + 1)
this.start = 0
this.end = 0
}

get _realLength(): number {
return this.length
}

_realGet(i: number): T {
return this[i]
}

_realSet(i: number, v: T): void {
this[i] = v
}

_realReplace(start: number, items: T[]): void {
for (let i = 0; i < items.length; i++) {
this._realSet(start + i, items[i])
}
// this.splice(start, items.length, ...items)
}

get size(): number {
return this._realLength - 1
}

get avaliable(): number {
if (this.start <= this.end) {
return this.end - this.start
}
return this.length - this.start + this.end
}

at(i: number): T {
return this._realGet((this.start + i) % this.length)
}

set(i: number, v: T): void {
this._realSet((this.start + i) % this.length, v)
}

push(...items: T[]): number {
if (items.length >= this.size) {
this._realReplace(0, items.slice(-this.size))
this.start = 0
this.end = this.size
return this.avaliable
}
const adding = items.length
const right = this._realLength - this.end
if (adding > right) {
this._realReplace(this.end, items.slice(0, right))
this._realReplace(0, items.slice(right))
} else {
this._realReplace(this.end, items)
}
const oldend = this.end
this.end = (oldend + adding) % this._realLength
if (
(this.start <= this.end && this.end < oldend) ||
(oldend < this.start && this.start <= this.end)
) {
this.start = (this.end + 1) % this._realLength
}
return this.avaliable
}

*keys(): IterableIterator<number> {
const start = this.start
const end = this.end
const length = this._realLength
if (start <= end) {
for (let i = start; i < end; i++) {
yield i
}
} else {
for (let i = start; i < length; i++) {
yield i
}
for (let i = 0; i < end; i++) {
yield i
}
}
}

*values(): IterableIterator<T> {
for (let i of this.keys()) {
yield this._realGet(i)
}
}

[Symbol.iterator](): IterableIterator<T> {
return this.values()
}

static create<T>(size: number): T[] {
const ring = new this<T>(size)
return new Proxy(ring, {
get(target: RingBuffer<T>, key: string | symbol): any {
if (typeof key === 'string') {
const index = parseInt(key, 10)
if (!isNaN(index)) {
return target.at(index)
}
}
switch (key) {
case 'length':
return target.avaliable
case '_realLength':
return target.length
case '_realGet':
// case '_realSet':
// case '_realReplace':
return Reflect.get(target, key).bind(target)
}
return Reflect.get(target, key)
},
set(
target: RingBuffer<T>,
key: string | symbol,
value: any,
recvier: RingBuffer<T>,
): boolean {
// if (typeof key === 'string' && /^[1-9][0-9]*$/.test(key)) {
// recvier.set(parseInt(key, 10), value)
// return true
// }
return Reflect.set(target, key, value)
},
ownKeys(target: RingBuffer<T>): (string | symbol)[] {
const keys: (string | symbol)[] = Array.of(...target.keys()).map((n) => n.toString())
for (let k of Reflect.ownKeys(target)) {
if (typeof k !== 'string' || !/^[1-9][0-9]*$/.test(k)) {
keys.push(k)
}
}
return keys
},
})
}
}
Loading

0 comments on commit c8a5327

Please sign in to comment.