diff --git a/application/Dockerfile b/application/Dockerfile index d1cb6d5..1f47cf2 100644 --- a/application/Dockerfile +++ b/application/Dockerfile @@ -12,8 +12,6 @@ RUN npm run build COPY . /code/ FROM alpine -RUN apk add bash curl -# tmp COPY --from=frontbuilder /code/dist/ /backend/dist/ @@ -26,8 +24,6 @@ RUN go mod download \ && go build -o main . FROM alpine -RUN apk add bash curl -# tmp COPY --from=frontbuilder /code/dist/ /dist/ COPY --from=backendbuilder /backend/main /backend/main diff --git a/application/backend/app/containerdb/containerdb.go b/application/backend/app/containerdb/containerdb.go index dc0e964..a7d4931 100644 --- a/application/backend/app/containerdb/containerdb.go +++ b/application/backend/app/containerdb/containerdb.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/devforth/OnLogs/app/util" "github.com/devforth/OnLogs/app/vars" @@ -28,6 +29,9 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [ panic("Host is not mentioned!") } location := host + "/" + container + if vars.Statuses_DBs[location] == nil { + vars.Statuses_DBs[location] = util.GetDB(host, container, "statuses") + } if strings.Contains(message_item[1], "ERROR") || strings.Contains(message_item[1], "ERR") || // const statuses_errors = ["ERROR", "ERR", "Error", "Err"]; strings.Contains(message_item[1], "Error") || strings.Contains(message_item[1], "Err") { @@ -49,21 +53,28 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [ } else if strings.Contains(message_item[1], "ONLOGS") { vars.Counters_For_Containers_Last_30_Min[location]["meta"]++ vars.Statuses_DBs[location].Put([]byte(message_item[0]), []byte("meta"), nil) - } else { vars.Counters_For_Containers_Last_30_Min[location]["other"]++ vars.Statuses_DBs[location].Put([]byte(message_item[0]), []byte("other"), nil) } - return db.Put([]byte(message_item[0]), []byte(message_item[1]), nil) + err := db.Put([]byte(message_item[0]), []byte(message_item[1]), nil) + tries := 0 + for err != nil && tries < 10 { + db = util.GetDB(host, container, "logs") + err = db.Put([]byte(message_item[0]), []byte(message_item[1]), nil) + time.Sleep(10 * time.Millisecond) + tries++ + } + if err != nil { + panic(err) + } + return err } func GetLogsByStatus(host string, container string, message string, status string, limit int, startWith string, getPrev bool, include bool, caseSensetivity bool) [][]string { logs_db := util.GetDB(host, container, "logs") db := util.GetDB(host, container, "statuses") - if host != util.GetHost() || vars.ActiveDBs[container] == nil { - defer logs_db.Close() - } iter := db.NewIterator(nil, nil) defer iter.Release() @@ -136,9 +147,6 @@ func GetLogsByStatus(host string, container string, message string, status strin func GetLogs(getPrev bool, include bool, host string, container string, message string, limit int, startWith string, caseSensetivity bool) [][]string { db := util.GetDB(host, container, "logs") - if host != util.GetHost() || vars.ActiveDBs[container] == nil { - defer db.Close() - } iter := db.NewIterator(nil, nil) defer iter.Release() @@ -215,18 +223,15 @@ func DeleteContainer(host string, container string, fullDelete bool) { if vars.ActiveDBs[container] != nil { vars.ActiveDBs[container].Close() - newActiveDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/logs", nil) - vars.ActiveDBs[container] = newActiveDB + vars.ActiveDBs[container] = util.GetDB(host, container, "active") } if vars.Statuses_DBs[host+"/"+container] != nil { vars.Statuses_DBs[host+"/"+container].Close() - newStatusesDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statuses", nil) - vars.Statuses_DBs[host+"/"+container] = newStatusesDB + vars.Statuses_DBs[host+"/"+container] = util.GetDB(host, container, "statuses") } if vars.Stat_Containers_DBs[host+"/"+container] != nil { vars.Stat_Containers_DBs[host+"/"+container].Close() - newStatDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statistics", nil) - vars.Statuses_DBs[host+"/"+container] = newStatDB + vars.Statuses_DBs[host+"/"+container] = util.GetDB(host, container, "statistics") } vars.Counters_For_Containers_Last_30_Min[host+"/"+container] = map[string]uint64{"error": 0, "debug": 0, "info": 0, "warn": 0, "meta": 0, "other": 0} } diff --git a/application/backend/app/daemon/daemon.go b/application/backend/app/daemon/daemon.go index cb1a281..32b4908 100644 --- a/application/backend/app/daemon/daemon.go +++ b/application/backend/app/daemon/daemon.go @@ -66,7 +66,9 @@ func closeActiveStream(containerName string) { newDaemonStreams = append(newDaemonStreams, stream) } } - vars.ActiveDBs[containerName].Close() + if vars.ActiveDBs[containerName] != nil { + vars.ActiveDBs[containerName].Close() + } vars.ActiveDBs[containerName] = nil vars.Active_Daemon_Streams = newDaemonStreams } @@ -109,8 +111,8 @@ func CreateDaemonToDBStream(containerName string) { reader := bufio.NewReader(connection) readHeader(*reader) - current_db := vars.ActiveDBs[containerName] host := util.GetHost() + current_db := util.GetDB(host, containerName, "logs") createLogMessage(current_db, host, containerName, "ONLOGS: Container listening started!") defer current_db.Close() @@ -171,17 +173,23 @@ func GetContainersList() []string { json.Unmarshal([]byte(body), &result) var names []string - containersDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containersMeta", nil) - if err != nil { - panic(err) + + containersMetaDB := vars.ContainersMeta_DBs[util.GetHost()] + if containersMetaDB == nil { + containersMetaDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containersMeta", nil) + if err != nil { + panic(err) + } + vars.ContainersMeta_DBs[util.GetHost()] = containersMetaDB } - defer containersDB.Close() + containersMetaDB = vars.ContainersMeta_DBs[util.GetHost()] + for i := 0; i < len(result); i++ { name := fmt.Sprintf("%v", result[i]["Names"].([]interface{})[0].(string))[1:] id := result[i]["Id"].(string) names = append(names, name) - containersDB.Put([]byte(name), []byte(id), nil) + containersMetaDB.Put([]byte(name), []byte(id), nil) } return names diff --git a/application/backend/app/db/db.go b/application/backend/app/db/db.go index 93686db..2f50a59 100644 --- a/application/backend/app/db/db.go +++ b/application/backend/app/db/db.go @@ -4,47 +4,46 @@ import ( "time" "github.com/devforth/OnLogs/app/util" + "github.com/devforth/OnLogs/app/vars" "github.com/syndtr/goleveldb/leveldb" ) func CreateOnLogsToken() string { - tokensDB, _ := leveldb.OpenFile("leveldb/tokens", nil) - defer tokensDB.Close() - token := util.GenerateJWTSecret() to_put := time.Now().UTC().Add(24 * time.Hour).String() - tokensDB.Put([]byte(token), []byte(to_put), nil) + err := vars.TokensDB.Put([]byte(token), []byte(to_put), nil) + if err != nil { + vars.TokensDB.Close() + vars.TokensDB, vars.TokensDBErr = leveldb.OpenFile("leveldb/tokens", nil) + + err = vars.TokensDB.Put([]byte(token), []byte(to_put), nil) + if err != nil { + panic(err) + } + } return token } func IsTokenExists(token string) bool { - tokensDB, _ := leveldb.OpenFile("leveldb/tokens", nil) - defer tokensDB.Close() - - iter := tokensDB.NewIterator(nil, nil) + iter := vars.TokensDB.NewIterator(nil, nil) defer iter.Release() iter.First() if string(iter.Key()) == token { - tokensDB.Put([]byte(token), []byte("was used"), nil) + vars.TokensDB.Put([]byte(token), []byte("was used"), nil) return true } for iter.Next() { if string(iter.Key()) == token { - tokensDB.Put([]byte(token), []byte("was used"), nil) + vars.TokensDB.Put([]byte(token), []byte("was used"), nil) return true } } - return false } func DeleteUnusedTokens() { for { - db, r := leveldb.OpenFile("leveldb/tokens", nil) - if r != nil { - panic(r) - } - + db := vars.TokensDB iter := db.NewIterator(nil, nil) for iter.Next() { wasUsed := string(iter.Value()) diff --git a/application/backend/app/routes/routes.go b/application/backend/app/routes/routes.go index 086d160..b18730f 100644 --- a/application/backend/app/routes/routes.go +++ b/application/backend/app/routes/routes.go @@ -22,7 +22,6 @@ import ( "github.com/devforth/OnLogs/app/util" "github.com/devforth/OnLogs/app/vars" "github.com/gorilla/websocket" - "github.com/syndtr/goleveldb/leveldb" ) func enableCors(w *http.ResponseWriter) { @@ -128,16 +127,14 @@ func AddLogLine(w http.ResponseWriter, req *http.Request) { if vars.Counters_For_Hosts_Last_30_Min[logItem.Host] == nil { go statistics.RunStatisticForContainer(logItem.Host, logItem.Container) } - location := logItem.Host + "/" + logItem.Container - if vars.Statuses_DBs[location] == nil { - vars.Statuses_DBs[location] = util.GetDB(logItem.Host, logItem.Container, "statuses") + err := containerdb.PutLogMessage(util.GetDB(logItem.Host, logItem.Container, "logs"), logItem.Host, logItem.Container, logItem.LogLine) + if err != nil { + defer w.WriteHeader(http.StatusInternalServerError) + panic(err) } - current_db, _ := leveldb.OpenFile("leveldb/hosts/"+logItem.Host+"/containers/"+logItem.Container+"/logs", nil) - containerdb.PutLogMessage(current_db, logItem.Host, logItem.Container, logItem.LogLine) - defer current_db.Close() to_send, _ := json.Marshal([]string{logItem.LogLine[0], logItem.LogLine[1]}) - for _, c := range vars.Connections[location] { + for _, c := range vars.Connections[logItem.Host+"/"+logItem.Container] { c.WriteMessage(1, to_send) } } @@ -162,6 +159,7 @@ func AddHost(w http.ResponseWriter, req *http.Request) { } vars.AgentsActiveContainers[addReq.Hostname] = addReq.Services + fmt.Println("New host added: " + addReq.Hostname) for _, container := range addReq.Services { os.MkdirAll("leveldb/hosts/"+addReq.Hostname+"/containers/"+container, 0700) } diff --git a/application/backend/app/statistics/statistics.go b/application/backend/app/statistics/statistics.go index 1d237e9..fa9b88b 100644 --- a/application/backend/app/statistics/statistics.go +++ b/application/backend/app/statistics/statistics.go @@ -7,10 +7,11 @@ import ( "github.com/devforth/OnLogs/app/util" "github.com/devforth/OnLogs/app/vars" - "github.com/syndtr/goleveldb/leveldb" ) -func restartStats(host string, container string, current_db *leveldb.DB) { +func restartStats(host string, container string) { + current_db := util.GetDB(host, container, "statistics") + var used_storage map[string]map[string]uint64 var location string if container == "" { @@ -41,15 +42,9 @@ func restartStats(host string, container string, current_db *leveldb.DB) { func RunStatisticForContainer(host string, container string) { location := host + "/" + container vars.Counters_For_Containers_Last_30_Min[location] = map[string]uint64{"error": 0, "debug": 0, "info": 0, "warn": 0, "meta": 0, "other": 0} - if vars.Stat_Containers_DBs[location] == nil { - current_db, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statistics", nil) - defer current_db.Close() - vars.Stat_Containers_DBs[location] = current_db - } - defer delete(vars.Stat_Containers_DBs, location) - defer restartStats(host, container, vars.Stat_Containers_DBs[location]) + defer restartStats(host, container) for { - restartStats(host, container, vars.Stat_Containers_DBs[location]) + restartStats(host, container) time.Sleep(30 * time.Minute) } } @@ -72,9 +67,6 @@ func GetStatisticsByService(host string, service string, value int) map[string]u searchTo := time.Now().Add(-(time.Hour * time.Duration(value/2))).UTC() var tmp_stats map[string]uint64 current_db := util.GetDB(host, service, "statistics") - if vars.Stat_Containers_DBs[location] == nil { - defer current_db.Close() - } iter := current_db.NewIterator(nil, nil) defer iter.Release() iter.Last() diff --git a/application/backend/app/streamer/streamer.go b/application/backend/app/streamer/streamer.go index b66e0ee..f4358f2 100644 --- a/application/backend/app/streamer/streamer.go +++ b/application/backend/app/streamer/streamer.go @@ -11,32 +11,14 @@ import ( "github.com/devforth/OnLogs/app/statistics" "github.com/devforth/OnLogs/app/util" "github.com/devforth/OnLogs/app/vars" - "github.com/syndtr/goleveldb/leveldb" ) func createStreams(containers []string) { for _, container := range vars.DockerContainers { if !util.Contains(container, vars.Active_Daemon_Streams) { go statistics.RunStatisticForContainer(util.GetHost(), container) - newDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil) - if err != nil { - fmt.Println("ERROR: " + container + ": " + err.Error()) - newDB, err = leveldb.RecoverFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil) - fmt.Println("INFO: " + container + ": recovering db...") - if err == nil { - fmt.Println("INFO: " + container + ": db recovered!") - } else { - fmt.Println("ERROR: " + container + ": " + err.Error()) - } - } - if vars.Statuses_DBs[util.GetHost()+"/"+container] == nil { - statusesDB, _ := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/statuses", nil) - vars.Statuses_DBs[util.GetHost()+"/"+container] = statusesDB - } - vars.ActiveDBs[container] = newDB vars.Active_Daemon_Streams = append(vars.Active_Daemon_Streams, container) if os.Getenv("AGENT") != "" { - vars.BrokenLogs_DBs[container] = util.GetDB(util.GetHost(), container, "/brokenlogs") go daemon.CreateDaemonToHostStream(container) } else { go daemon.CreateDaemonToDBStream(container) diff --git a/application/backend/app/util/util.go b/application/backend/app/util/util.go index a9e66e4..1f2bc0a 100644 --- a/application/backend/app/util/util.go +++ b/application/backend/app/util/util.go @@ -88,18 +88,37 @@ func GetDB(host string, container string, dbType string) *leveldb.DB { res_db = vars.Statuses_DBs[host+"/"+container] } else if dbType == "statistics" { res_db = vars.Stat_Containers_DBs[host+"/"+container] + } else if dbType == "brokenlogs" { + res_db = vars.BrokenLogs_DBs[container] + } + + if res_db != nil { + return res_db } var err error - if res_db == nil { - path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType - res_db, err = leveldb.OpenFile(path, nil) - if err != nil { - res_db, err = leveldb.RecoverFile(path, nil) - } + tries := 0 + path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType + res_db, err = leveldb.OpenFile(path, nil) + for (err != nil && res_db == nil) && tries < 10 { + res_db, err = leveldb.RecoverFile(path, nil) + fmt.Println(path, err) + time.Sleep(10 * time.Millisecond) + tries++ } + if err != nil { - fmt.Println("ERROR: unable to open db for "+host+"/"+container+"/"+dbType, err) + panic("ERROR: unable to open db for " + host + "/" + container + "/" + dbType + "\n" + err.Error()) + } + + if dbType == "logs" { + vars.ActiveDBs[container] = res_db + } else if dbType == "statuses" { + vars.Statuses_DBs[host+"/"+container] = res_db + } else if dbType == "statistics" { + vars.Stat_Containers_DBs[host+"/"+container] = res_db + } else if dbType == "brokenlogs" { + vars.BrokenLogs_DBs[container] = res_db } return res_db @@ -185,9 +204,17 @@ func GetDockerContainerID(host string, container string) string { return "" } - idDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containersMeta", nil) - defer idDB.Close() - iter := idDB.NewIterator(nil, nil) + containersMetaDB := vars.ContainersMeta_DBs[host] + if containersMetaDB == nil { + containersMetaDB, err := leveldb.OpenFile("leveldb/hosts/"+host+"/containersMeta", nil) + if err != nil { + panic(err) + } + vars.ContainersMeta_DBs[host] = containersMetaDB + } + containersMetaDB = vars.ContainersMeta_DBs[host] + + iter := containersMetaDB.NewIterator(nil, nil) defer iter.Release() iter.Last() diff --git a/application/backend/app/vars/vars.go b/application/backend/app/vars/vars.go index 16c12ca..47b20dd 100644 --- a/application/backend/app/vars/vars.go +++ b/application/backend/app/vars/vars.go @@ -9,22 +9,30 @@ import ( ) var ( - ActiveDBs = map[string]*leveldb.DB{} - Stat_Containers_DBs = map[string]*leveldb.DB{} - Stat_Hosts_DBs = map[string]*leveldb.DB{} - Statuses_DBs = map[string]*leveldb.DB{} - BrokenLogs_DBs = map[string]*leveldb.DB{} - Active_Daemon_Streams = []string{} - DockerContainers = []string{} - AgentsActiveContainers = map[string][]string{} - ToDelete = map[string][]string{} - Connections = map[string][]websocket.Conn{} + ActiveDBs = map[string]*leveldb.DB{} + Stat_Containers_DBs = map[string]*leveldb.DB{} + Stat_Hosts_DBs = map[string]*leveldb.DB{} + Statuses_DBs = map[string]*leveldb.DB{} + BrokenLogs_DBs = map[string]*leveldb.DB{} + ContainersMeta_DBs = map[string]*leveldb.DB{} + + Active_Daemon_Streams = []string{} + + DockerContainers = []string{} + AgentsActiveContainers = map[string][]string{} + + ToDelete = map[string][]string{} + Connections = map[string][]websocket.Conn{} + Counters_For_Hosts_Last_30_Min = map[string]map[string]uint64{} Counters_For_Containers_Last_30_Min = map[string]map[string]uint64{} - FavsDB, FavsDBErr = leveldb.OpenFile("leveldb/favourites", nil) - StateDB, StateDBErr = leveldb.OpenFile("leveldb/state", nil) - UsersDB, UsersDBErr = leveldb.OpenFile("leveldb/users", nil) // should i ever close it? - Year = strconv.Itoa(time.Now().UTC().Year()) + + FavsDB, FavsDBErr = leveldb.OpenFile("leveldb/favourites", nil) + StateDB, StateDBErr = leveldb.OpenFile("leveldb/state", nil) + UsersDB, UsersDBErr = leveldb.OpenFile("leveldb/users", nil) + TokensDB, TokensDBErr = leveldb.OpenFile("leveldb/tokens", nil) + + Year = strconv.Itoa(time.Now().UTC().Year()) ) type UserData struct { diff --git a/application/build.sh b/application/build.sh index 16d4006..8062213 100644 --- a/application/build.sh +++ b/application/build.sh @@ -1,2 +1,3 @@ docker buildx create --use -docker buildx build --platform=linux/amd64,linux/arm64 --tag "devforth/onlogs:latest" --tag "devforth/onlogs:1.0.4" --push . +# docker buildx build --platform=linux/amd64,linux/arm64 --tag "devforth/onlogs:latest" --tag "devforth/onlogs:1.0.5" --push . +docker buildx build --platform=linux/amd64,linux/arm64 --tag "devforth/onlogs:1.0.4-beta-7" --push .