Skip to content

Commit

Permalink
better dbs managing
Browse files Browse the repository at this point in the history
  • Loading branch information
LbP22 committed Feb 28, 2024
1 parent ab2f161 commit 0aab629
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 51 deletions.
18 changes: 8 additions & 10 deletions application/backend/app/containerdb/containerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
Expand Down Expand Up @@ -58,22 +59,22 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [
}

err := db.Put([]byte(message_item[0]), []byte(message_item[1]), nil)
if err != nil {
tries := 0
for err != nil && tries < 10 {
db = util.GetDB(host, container, "logs")
err = db.Put([]byte(message_item[0]), []byte(message_item[1]), nil)
if err != nil {
panic(err)
}
time.Sleep(10 * time.Millisecond)
tries++
}
if err != nil {
panic(err)
}
return err
}

func GetLogsByStatus(host string, container string, message string, status string, limit int, startWith string, getPrev bool, include bool, caseSensetivity bool) [][]string {
logs_db := util.GetDB(host, container, "logs")
db := util.GetDB(host, container, "statuses")
if host != util.GetHost() || vars.ActiveDBs[container] == nil {
defer logs_db.Close()
}

iter := db.NewIterator(nil, nil)
defer iter.Release()
Expand Down Expand Up @@ -146,9 +147,6 @@ func GetLogsByStatus(host string, container string, message string, status strin

func GetLogs(getPrev bool, include bool, host string, container string, message string, limit int, startWith string, caseSensetivity bool) [][]string {
db := util.GetDB(host, container, "logs")
if host != util.GetHost() || vars.ActiveDBs[container] == nil {
defer db.Close()
}

iter := db.NewIterator(nil, nil)
defer iter.Release()
Expand Down
6 changes: 4 additions & 2 deletions application/backend/app/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func closeActiveStream(containerName string) {
newDaemonStreams = append(newDaemonStreams, stream)
}
}
vars.ActiveDBs[containerName].Close()
if vars.ActiveDBs[containerName] != nil {
vars.ActiveDBs[containerName].Close()
}
vars.ActiveDBs[containerName] = nil
vars.Active_Daemon_Streams = newDaemonStreams
}
Expand Down Expand Up @@ -109,8 +111,8 @@ func CreateDaemonToDBStream(containerName string) {
reader := bufio.NewReader(connection)
readHeader(*reader)

current_db := vars.ActiveDBs[containerName]
host := util.GetHost()
current_db := util.GetDB(host, containerName, "logs")
createLogMessage(current_db, host, containerName, "ONLOGS: Container listening started!")

defer current_db.Close()
Expand Down
2 changes: 0 additions & 2 deletions application/backend/app/db/db.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package db

import (
"fmt"
"time"

"github.com/devforth/OnLogs/app/util"
Expand All @@ -22,7 +21,6 @@ func CreateOnLogsToken() string {
panic(err)
}
}
fmt.Println("created token " + token)
return token
}

Expand Down
1 change: 1 addition & 0 deletions application/backend/app/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func AddHost(w http.ResponseWriter, req *http.Request) {
}

vars.AgentsActiveContainers[addReq.Hostname] = addReq.Services
fmt.Println("New host added: " + addReq.Hostname)
for _, container := range addReq.Services {
os.MkdirAll("leveldb/hosts/"+addReq.Hostname+"/containers/"+container, 0700)
}
Expand Down
18 changes: 5 additions & 13 deletions application/backend/app/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/syndtr/goleveldb/leveldb"
)

func restartStats(host string, container string, current_db *leveldb.DB) {
func restartStats(host string, container string) {
current_db := util.GetDB(host, container, "statistics")

var used_storage map[string]map[string]uint64
var location string
if container == "" {
Expand Down Expand Up @@ -41,15 +42,9 @@ func restartStats(host string, container string, current_db *leveldb.DB) {
func RunStatisticForContainer(host string, container string) {
location := host + "/" + container
vars.Counters_For_Containers_Last_30_Min[location] = map[string]uint64{"error": 0, "debug": 0, "info": 0, "warn": 0, "meta": 0, "other": 0}
if vars.Stat_Containers_DBs[location] == nil {
current_db := util.GetDB(host, container, "statistics")
// defer current_db.Close()
vars.Stat_Containers_DBs[location] = current_db
}
defer delete(vars.Stat_Containers_DBs, location)
defer restartStats(host, container, vars.Stat_Containers_DBs[location])
defer restartStats(host, container)
for {
restartStats(host, container, vars.Stat_Containers_DBs[location])
restartStats(host, container)
time.Sleep(30 * time.Minute)
}
}
Expand All @@ -72,9 +67,6 @@ func GetStatisticsByService(host string, service string, value int) map[string]u
searchTo := time.Now().Add(-(time.Hour * time.Duration(value/2))).UTC()
var tmp_stats map[string]uint64
current_db := util.GetDB(host, service, "statistics")
if vars.Stat_Containers_DBs[location] == nil {
defer current_db.Close()
}
iter := current_db.NewIterator(nil, nil)
defer iter.Release()
iter.Last()
Expand Down
17 changes: 0 additions & 17 deletions application/backend/app/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,14 @@ import (
"github.com/devforth/OnLogs/app/statistics"
"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/syndtr/goleveldb/leveldb"
)

func createStreams(containers []string) {
for _, container := range vars.DockerContainers {
if !util.Contains(container, vars.Active_Daemon_Streams) {
go statistics.RunStatisticForContainer(util.GetHost(), container)
newDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil)
if err != nil {
fmt.Println("ERROR: " + container + ": " + err.Error())
newDB, err = leveldb.RecoverFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil)
fmt.Println("INFO: " + container + ": recovering db...")
if err == nil {
fmt.Println("INFO: " + container + ": db recovered!")
} else {
fmt.Println("ERROR: " + container + ": " + err.Error())
}
}
if vars.Statuses_DBs[util.GetHost()+"/"+container] == nil {
vars.Statuses_DBs[util.GetHost()+"/"+container] = util.GetDB(util.GetHost(), container, "/statuses")
}
vars.ActiveDBs[container] = newDB
vars.Active_Daemon_Streams = append(vars.Active_Daemon_Streams, container)
if os.Getenv("AGENT") != "" {
vars.BrokenLogs_DBs[container] = util.GetDB(util.GetHost(), container, "/brokenlogs")
go daemon.CreateDaemonToHostStream(container)
} else {
go daemon.CreateDaemonToDBStream(container)
Expand Down
25 changes: 18 additions & 7 deletions application/backend/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,27 @@ func GetDB(host string, container string, dbType string) *leveldb.DB {
res_db = vars.Statuses_DBs[host+"/"+container]
} else if dbType == "statistics" {
res_db = vars.Stat_Containers_DBs[host+"/"+container]
} else if dbType == "brokenlogs" {
res_db = vars.BrokenLogs_DBs[container]
}

if res_db != nil {
return res_db
}

var err error
if res_db == nil {
path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType
res_db, err = leveldb.OpenFile(path, nil)
if err != nil {
res_db, err = leveldb.RecoverFile(path, nil)
}
tries := 0
path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType
res_db, err = leveldb.OpenFile(path, nil)
for (err != nil && res_db == nil) && tries < 10 {
res_db, err = leveldb.RecoverFile(path, nil)
fmt.Println(path, err)
time.Sleep(10 * time.Millisecond)
tries++
}

if err != nil {
fmt.Println("ERROR: unable to open db for "+host+"/"+container+"/"+dbType, err)
panic("ERROR: unable to open db for " + host + "/" + container + "/" + dbType + "\n" + err.Error())
}

if dbType == "logs" {
Expand All @@ -108,6 +117,8 @@ func GetDB(host string, container string, dbType string) *leveldb.DB {
vars.Statuses_DBs[host+"/"+container] = res_db
} else if dbType == "statistics" {
vars.Stat_Containers_DBs[host+"/"+container] = res_db
} else if dbType == "brokenlogs" {
vars.BrokenLogs_DBs[container] = res_db
}

return res_db
Expand Down

0 comments on commit 0aab629

Please sign in to comment.