Skip to content

Commit

Permalink
[7.17](backport #39392) [Bug] fix high IO after sudden filebeat stop (#…
Browse files Browse the repository at this point in the history
…35893) (#39795)

In case of corrupted log file (which has good chances to happen in case
of sudden unclean system shutdown), we set a flag which causes us to
checkpoint immediately, but never do anything else besides that. This
causes Filebeat to just checkpoint on each log operation (therefore
causing a high IO load on the server and also causing Filebeat to fall
behind).

This change resets the logInvalid flag after a successful checkpointing.

Co-authored-by: Tiago Queiroz <[email protected]>
(cherry picked from commit 217f5a6)

# Conflicts:
#	libbeat/statestore/backend/memlog/diskstore.go

---------

Co-authored-by: emmanueltouzery <[email protected]>
Co-authored-by: Tiago Queiroz <[email protected]>
Co-authored-by: Pierre HILBERT <[email protected]>
  • Loading branch information
4 people authored Jun 24, 2024
1 parent 8cf60ea commit 4cc14a1
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]

*Heartbeat*

*Metricbeat*
Expand Down
19 changes: 16 additions & 3 deletions dev-tools/mage/gotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ func GoTestIntegrationForModule(ctx context.Context) error {
return err
}

type moduleErr struct {
name string
err error
}
foundModule := false
failedModules := []string{}
failedModules := []moduleErr{}
for _, fi := range modulesFileInfo {
if !fi.IsDir() {
continue
Expand All @@ -169,14 +173,23 @@ func GoTestIntegrationForModule(ctx context.Context) error {
})
if err != nil {
// err will already be report to stdout, collect failed module to report at end
failedModules = append(failedModules, fi.Name())
failedModules = append(failedModules, moduleErr{
name: fi.Name(),
err: err,
})
}
}
if module != "" && !foundModule {
return fmt.Errorf("no module %s", module)
}
if len(failedModules) > 0 {
return fmt.Errorf("failed modules: %s", strings.Join(failedModules, ", "))
errMsg := strings.Builder{}
names := []string{}
for _, m := range failedModules {
fmt.Fprintf(&errMsg, "Module: %s\nError: %s\n", m.name, m.err.Error())
names = append(names, m.name)
}
return fmt.Errorf("failed modules: %s.\n%s", strings.Join(names, ", "), errMsg.String())
}
return nil
}
Expand Down
35 changes: 22 additions & 13 deletions libbeat/statestore/backend/memlog/diskstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package memlog
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -102,11 +103,11 @@ const (
checkpointTmpFileName = "checkpoint.new"

storeVersion = "1"

keyField = "_key"
)

// newDiskStore initializes the disk store stucture only. The store must have
var ErrCorruptStore = errors.New("corrupted data file")

// newDiskStore initializes the disk store structure only. The store must have
// been opened already. It tries to open the update log file for append
// operations. If opening the update log file fails, it is marked as
// 'corrupted', triggering a checkpoint operation on the first update to the store.
Expand Down Expand Up @@ -180,7 +181,7 @@ func (s *diskstore) tryOpenLog() error {
f.Close()
})

_, err = f.Seek(0, os.SEEK_END)
_, err = f.Seek(0, io.SeekEnd)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,12 +262,16 @@ func (s *diskstore) LogOperation(op op) error {
if err := enc.Encode(logAction{Op: op.name(), ID: s.nextTxID}); err != nil {
return err
}
writer.WriteByte('\n')
if err := writer.WriteByte('\n'); err != nil {
s.log.Errorf("could not write to registry log file: %s", err)
}

if err := enc.Encode(op); err != nil {
return err
}
writer.WriteByte('\n')
if err := writer.WriteByte('\n'); err != nil {
s.log.Errorf("could not write to registry log file: %s", err)
}

if err := writer.Flush(); err != nil {
return err
Expand Down Expand Up @@ -327,7 +332,10 @@ func (s *diskstore) WriteCheckpoint(state map[string]entry) error {
}

// delete old transaction files
updateActiveMarker(s.log, s.home, s.activeDataFile.path)
if err := updateActiveMarker(s.log, s.home, s.activeDataFile.path); err != nil {
s.log.Warnf("could not update active marker: %s", err)
}

s.removeOldDataFiles()

trySyncPath(s.home)
Expand Down Expand Up @@ -399,7 +407,8 @@ func (s *diskstore) checkpointClearLog() {

err := s.logFile.Truncate(0)
if err == nil {
_, err = s.logFile.Seek(0, os.SEEK_SET)
_, err = s.logFile.Seek(0, io.SeekStart)
s.logInvalid = false
}

if err != nil {
Expand Down Expand Up @@ -436,7 +445,7 @@ func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) e
log.Errorf("Failed to remove old temporary active.dat.tmp file: %v", err)
return err
}
if err := ioutil.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil {
if err := os.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil {
log.Errorf("Failed to write temporary pointer file: %v", err)
return err
}
Expand Down Expand Up @@ -534,7 +543,7 @@ func readDataFile(path string, fn func(string, common.MapStr)) error {
var states []map[string]interface{}
dec := json.NewDecoder(f)
if err := dec.Decode(&states); err != nil {
return fmt.Errorf("corrupted data file: %v", err)
return fmt.Errorf("%w: %w", ErrCorruptStore, err)
}

for _, state := range states {
Expand All @@ -555,7 +564,7 @@ func readDataFile(path string, fn func(string, common.MapStr)) error {
// memStore.
// The txid is the transaction ID of the last known valid data file.
// Transactions older then txid will be ignored.
// loadLogFile returns the last commited txid in logTxid and the total number
// loadLogFile returns the last committed txid in logTxid and the total number
// of operations in logCount.
func loadLogFile(
store *memstore,
Expand Down Expand Up @@ -679,7 +688,7 @@ func readMetaFile(home string) (storeMeta, error) {

dec := json.NewDecoder(f)
if err := dec.Decode(&meta); err != nil {
return meta, fmt.Errorf("can not read store meta file: %v", err)
return meta, fmt.Errorf("can not read store meta file: %w", err)
}

return meta, nil
Expand Down
55 changes: 55 additions & 0 deletions libbeat/statestore/backend/memlog/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

func TestRecoverFromCorruption(t *testing.T) {
path := t.TempDir()
logp.DevelopmentSetup() //nolint: errcheck // it's a test and we don't rely on the logs

if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil {
t.Fatalf("Failed to copy test file to the temporary directory: %v", err)
}

store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool {
return false
})
require.NoError(t, err, "openStore must succeed")
require.True(t, store.disk.logInvalid, "expecting the log file to be invalid")

err = store.logOperation(&opSet{K: "key", V: common.MapStr{
"field": 42,
}})
require.NoError(t, err, "logOperation must succeed")
require.False(t, store.disk.logInvalid, "log file must be valid")
require.FileExistsf(t, filepath.Join(path, "7.json"), "expecting the checkpoint file to have been created")

file, err := os.Stat(filepath.Join(path, "log.json"))
require.NoError(t, err, "Stat on the log file must succeed")
require.Equal(t, int64(0), file.Size(), "expecting the log file to be truncated")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Cronjob that will execute each minute.
# It will print a message and sleep (reporting being active) for 5 seconds

apiVersion: batch/v1beta1
apiVersion: batch/v1
kind: CronJob
metadata:
name: mycronjob
Expand All @@ -19,4 +19,4 @@ spec:

restartPolicy: OnFailure
terminationGracePeriodSeconds: 0
concurrencyPolicy: Allow
concurrencyPolicy: Allow

0 comments on commit 4cc14a1

Please sign in to comment.