Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add tests for tsdb cli tool #673

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
81 changes: 5 additions & 76 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package tsdb
import (
"context"
"encoding/binary"

"errors"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/go-kit/kit/log"

"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand Down Expand Up @@ -57,7 +56,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
Expand All @@ -77,7 +76,7 @@ func TestCreateBlock(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
b, err := OpenBlock(nil, CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 10)), nil)
if err == nil {
testutil.Ok(t, b.Close())
}
Expand Down Expand Up @@ -134,7 +133,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1))
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.")
Expand Down Expand Up @@ -168,7 +167,7 @@ func TestBlockSize(t *testing.T) {

// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockDirInit = CreateBlock(t, tmpdir, GenSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
testutil.Ok(t, err)
defer func() {
Expand Down Expand Up @@ -204,76 +203,6 @@ func TestBlockSize(t *testing.T) {
}
}

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head := createHead(tb, series)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err)

testutil.Ok(tb, os.MkdirAll(dir, 0777))

// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String())
}

func createHead(tb testing.TB, series []Series) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()

app := head.Appender()
for _, s := range series {
ref := uint64(0)
it := s.Iterator()
for it.Next() {
t, v := it.At()
if ref != 0 {
err := app.AddFast(ref, t, v)
if err == nil {
continue
}
}
ref, err = app.Add(s.Labels(), t, v)
testutil.Ok(tb, err)
}
testutil.Ok(tb, it.Err())
}
err = app.Commit()
testutil.Ok(tb, err)
return head
}

const (
defaultLabelName = "labelName"
defaultLabelValue = "labelValue"
)

// genSeries generates series with a given number of labels and values.
func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}

series := make([]Series, totalSeries)

for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
lbls[defaultLabelName] = strconv.Itoa(i)
for j := 1; len(lbls) < labelCount; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
}
return series
}

// populateSeries generates series from given labels, mint and maxt.
func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
if len(lbls) == 0 {
Expand Down
87 changes: 51 additions & 36 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
)

const (
printBlocksTableHeader = "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES"
defaultAnalyzeLimit = "20"
timeDelta = 30000
)

func main() {
Expand All @@ -62,7 +69,7 @@ func execute() (err error) {
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default(defaultAnalyzeLimit).Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
Expand Down Expand Up @@ -95,7 +102,7 @@ func execute() (err error) {
if err != nil {
return err
}
printBlocks(blocks, listCmdHumanReadable)
printBlocks(os.Stdout, blocks, listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*analyzePath, nil)
if err != nil {
Expand All @@ -110,21 +117,12 @@ func execute() (err error) {
if err != nil {
return err
}
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return fmt.Errorf("block not found")
block, err := extractBlock(blocks, analyzeBlockID)
if err != nil {
return err
}
return analyzeBlock(block, *analyzeLimit)

return analyzeBlock(os.Stdout, block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*dumpPath, nil)
if err != nil {
Expand All @@ -140,6 +138,25 @@ func execute() (err error) {
return nil
}

// extractBlock takes a slice of BlockReader and returns a specific block by ID.
func extractBlock(blocks []tsdb.BlockReader, analyzeBlockID *string) (tsdb.BlockReader, error) {
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return nil, fmt.Errorf("block not found")
}
return block, nil
}

type writeBenchmark struct {
outPath string
samplesFile string
Expand Down Expand Up @@ -235,8 +252,6 @@ func (b *writeBenchmark) run() error {
return nil
}

const timeDelta = 30000

func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
var mu sync.Mutex
var total uint64
Expand Down Expand Up @@ -434,11 +449,11 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
return mets, nil
}

func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
func printBlocks(w io.Writer, blocks []tsdb.BlockReader, humanReadable *bool) {
tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
fmt.Fprintln(tw, printBlocksTableHeader)
for _, b := range blocks {
meta := b.Meta()

Expand All @@ -461,12 +476,12 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string {
return strconv.FormatInt(timestamp, 10)
}

func analyzeBlock(b tsdb.BlockReader, limit int) error {
func analyzeBlock(w io.Writer, b tsdb.BlockReader, limit int) error {
meta := b.Meta()
fmt.Printf("Block ID: %s\n", meta.ULID)
fmt.Fprintf(w, "Block ID: %s\n", meta.ULID)
// Presume 1ms resolution that Prometheus uses.
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
fmt.Fprintf(w, "Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Fprintf(w, "Series: %d\n", meta.Stats.NumSeries)
ir, err := b.Index()
if err != nil {
return err
Expand All @@ -477,7 +492,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
if err != nil {
return err
}
fmt.Printf("Label names: %d\n", len(allLabelNames))
fmt.Fprintf(w, "Label names: %d\n", len(allLabelNames))

type postingInfo struct {
key string
Expand All @@ -489,7 +504,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
sort.Slice(postingInfos, func(i, j int) bool { return postingInfos[i].metric > postingInfos[j].metric })

for i, pc := range postingInfos {
fmt.Printf("%d %s\n", pc.metric, pc.key)
fmt.Fprintf(w, "%d %s\n", pc.metric, pc.key)
if i >= limit {
break
}
Expand Down Expand Up @@ -523,31 +538,31 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
if p.Err() != nil {
return p.Err()
}
fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered))
fmt.Printf("Postings entries (total label pairs): %d\n", entries)
fmt.Fprintf(w, "Postings (unique label pairs): %d\n", len(labelpairsUncovered))
fmt.Fprintf(w, "Postings entries (total label pairs): %d\n", entries)

postingInfos = postingInfos[:0]
for k, m := range labelpairsUncovered {
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
}

fmt.Printf("\nLabel pairs most involved in churning:\n")
fmt.Fprintf(w, "\nLabel pairs most involved in churning:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
for k, m := range labelsUncovered {
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
}

fmt.Printf("\nLabel names most involved in churning:\n")
fmt.Fprintf(w, "\nLabel names most involved in churning:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
for k, m := range labelpairsCount {
postingInfos = append(postingInfos, postingInfo{k, m})
}

fmt.Printf("\nMost common label pairs:\n")
fmt.Fprintf(w, "\nMost common label pairs:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand All @@ -571,7 +586,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
postingInfos = append(postingInfos, postingInfo{n, cumulativeLength})
}

fmt.Printf("\nLabel names with highest cumulative label value length:\n")
fmt.Fprintf(w, "\nLabel names with highest cumulative label value length:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand All @@ -582,7 +597,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
}
postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())})
}
fmt.Printf("\nHighest cardinality labels:\n")
fmt.Fprintf(w, "\nHighest cardinality labels:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand Down Expand Up @@ -610,7 +625,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
postingInfos = append(postingInfos, postingInfo{n, uint64(count)})
}
}
fmt.Printf("\nHighest cardinality metric names:\n")
fmt.Fprintf(w, "\nHighest cardinality metric names:\n")
printInfo(postingInfos)
return nil
}
Expand Down
Loading