diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2c09e68ebe7..55008478de2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,6 +15,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* +- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] + *Heartbeat* *Metricbeat* diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index c21ac3eaa89..019f624a271 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -142,8 +142,12 @@ func GoTestIntegrationForModule(ctx context.Context) error { return err } + type moduleErr struct { + name string + err error + } foundModule := false - failedModules := []string{} + failedModules := []moduleErr{} for _, fi := range modulesFileInfo { if !fi.IsDir() { continue @@ -169,14 +173,23 @@ func GoTestIntegrationForModule(ctx context.Context) error { }) if err != nil { // err will already be report to stdout, collect failed module to report at end - failedModules = append(failedModules, fi.Name()) + failedModules = append(failedModules, moduleErr{ + name: fi.Name(), + err: err, + }) } } if module != "" && !foundModule { return fmt.Errorf("no module %s", module) } if len(failedModules) > 0 { - return fmt.Errorf("failed modules: %s", strings.Join(failedModules, ", ")) + errMsg := strings.Builder{} + names := []string{} + for _, m := range failedModules { + fmt.Fprintf(&errMsg, "Module: %s\nError: %s\n", m.name, m.err.Error()) + names = append(names, m.name) + } + return fmt.Errorf("failed modules: %s.\n%s", strings.Join(names, ", "), errMsg.String()) } return nil } diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index 84a398e0bd7..b41a3c9af25 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -20,8 +20,9 @@ package memlog import ( "bufio" "encoding/json" + "errors" "fmt" - "io/ioutil" + "io" "os" "path/filepath" "sort" @@ -102,11 +103,11 @@ const ( checkpointTmpFileName = "checkpoint.new" storeVersion = "1" - - keyField = "_key" ) -// newDiskStore initializes the disk store stucture only. The store must have +var ErrCorruptStore = errors.New("corrupted data file") + +// newDiskStore initializes the disk store structure only. The store must have // been opened already. It tries to open the update log file for append // operations. If opening the update log file fails, it is marked as // 'corrupted', triggering a checkpoint operation on the first update to the store. @@ -180,7 +181,7 @@ func (s *diskstore) tryOpenLog() error { f.Close() }) - _, err = f.Seek(0, os.SEEK_END) + _, err = f.Seek(0, io.SeekEnd) if err != nil { return err } @@ -261,12 +262,16 @@ func (s *diskstore) LogOperation(op op) error { if err := enc.Encode(logAction{Op: op.name(), ID: s.nextTxID}); err != nil { return err } - writer.WriteByte('\n') + if err := writer.WriteByte('\n'); err != nil { + s.log.Errorf("could not write to registry log file: %s", err) + } if err := enc.Encode(op); err != nil { return err } - writer.WriteByte('\n') + if err := writer.WriteByte('\n'); err != nil { + s.log.Errorf("could not write to registry log file: %s", err) + } if err := writer.Flush(); err != nil { return err @@ -327,7 +332,10 @@ func (s *diskstore) WriteCheckpoint(state map[string]entry) error { } // delete old transaction files - updateActiveMarker(s.log, s.home, s.activeDataFile.path) + if err := updateActiveMarker(s.log, s.home, s.activeDataFile.path); err != nil { + s.log.Warnf("could not update active marker: %s", err) + } + s.removeOldDataFiles() trySyncPath(s.home) @@ -399,7 +407,8 @@ func (s *diskstore) checkpointClearLog() { err := s.logFile.Truncate(0) if err == nil { - _, err = s.logFile.Seek(0, os.SEEK_SET) + _, err = s.logFile.Seek(0, io.SeekStart) + s.logInvalid = false } if err != nil { @@ -436,7 +445,7 @@ func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) e log.Errorf("Failed to remove old temporary active.dat.tmp file: %v", err) return err } - if err := ioutil.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil { + if err := os.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil { log.Errorf("Failed to write temporary pointer file: %v", err) return err } @@ -534,7 +543,7 @@ func readDataFile(path string, fn func(string, common.MapStr)) error { var states []map[string]interface{} dec := json.NewDecoder(f) if err := dec.Decode(&states); err != nil { - return fmt.Errorf("corrupted data file: %v", err) + return fmt.Errorf("%w: %w", ErrCorruptStore, err) } for _, state := range states { @@ -555,7 +564,7 @@ func readDataFile(path string, fn func(string, common.MapStr)) error { // memStore. // The txid is the transaction ID of the last known valid data file. // Transactions older then txid will be ignored. -// loadLogFile returns the last commited txid in logTxid and the total number +// loadLogFile returns the last committed txid in logTxid and the total number // of operations in logCount. func loadLogFile( store *memstore, @@ -679,7 +688,7 @@ func readMetaFile(home string) (storeMeta, error) { dec := json.NewDecoder(f) if err := dec.Decode(&meta); err != nil { - return meta, fmt.Errorf("can not read store meta file: %v", err) + return meta, fmt.Errorf("can not read store meta file: %w", err) } return meta, nil diff --git a/libbeat/statestore/backend/memlog/store_test.go b/libbeat/statestore/backend/memlog/store_test.go new file mode 100644 index 00000000000..5230b45b6fc --- /dev/null +++ b/libbeat/statestore/backend/memlog/store_test.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestRecoverFromCorruption(t *testing.T) { + path := t.TempDir() + logp.DevelopmentSetup() //nolint: errcheck // it's a test and we don't rely on the logs + + if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil { + t.Fatalf("Failed to copy test file to the temporary directory: %v", err) + } + + store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool { + return false + }) + require.NoError(t, err, "openStore must succeed") + require.True(t, store.disk.logInvalid, "expecting the log file to be invalid") + + err = store.logOperation(&opSet{K: "key", V: common.MapStr{ + "field": 42, + }}) + require.NoError(t, err, "logOperation must succeed") + require.False(t, store.disk.logInvalid, "log file must be valid") + require.FileExistsf(t, filepath.Join(path, "7.json"), "expecting the checkpoint file to have been created") + + file, err := os.Stat(filepath.Join(path, "log.json")) + require.NoError(t, err, "Stat on the log file must succeed") + require.Equal(t, int64(0), file.Size(), "expecting the log file to be truncated") +} diff --git a/metricbeat/module/kubernetes/_meta/test/docs/02_objects/cronjob.yaml b/metricbeat/module/kubernetes/_meta/test/docs/02_objects/cronjob.yaml index eba346cf155..8f334648489 100644 --- a/metricbeat/module/kubernetes/_meta/test/docs/02_objects/cronjob.yaml +++ b/metricbeat/module/kubernetes/_meta/test/docs/02_objects/cronjob.yaml @@ -1,7 +1,7 @@ # Cronjob that will execute each minute. # It will print a message and sleep (reporting being active) for 5 seconds -apiVersion: batch/v1beta1 +apiVersion: batch/v1 kind: CronJob metadata: name: mycronjob @@ -19,4 +19,4 @@ spec: restartPolicy: OnFailure terminationGracePeriodSeconds: 0 - concurrencyPolicy: Allow \ No newline at end of file + concurrencyPolicy: Allow