diff --git a/README.md b/README.md index 306c5bd92..361e87b90 100644 --- a/README.md +++ b/README.md @@ -165,16 +165,6 @@ These benchmarks allow you to benchmark an integration end to end. For details on how to configure system benchmarks for a package, review the [HOWTO guide](./docs/howto/system_benchmarking.md). -### `elastic-package benchmark generate-corpus` - -_Context: package_ - - -*BEWARE*: this command is in beta and it's behaviour may change in the future. -Use this command to generate benchmarks corpus data for a package. -Currently, only data for what we have related assets on https://github.com/elastic/elastic-integration-corpus-generator-tool are supported. -For details on how to run this command, review the [HOWTO guide](./docs/howto/generate_corpus.md). - ### `elastic-package benchmark pipeline` _Context: package_ diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 402ec29c7..80558383c 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -12,9 +12,6 @@ import ( "strings" "time" - "github.com/dustin/go-humanize" - - "github.com/elastic/elastic-package/internal/corpusgenerator" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/install" "github.com/elastic/elastic-package/internal/logger" @@ -36,12 +33,6 @@ import ( "github.com/elastic/elastic-package/internal/testrunner" ) -const generateLongDescription = ` -*BEWARE*: this command is in beta and it's behaviour may change in the future. -Use this command to generate benchmarks corpus data for a package. -Currently, only data for what we have related assets on https://github.com/elastic/elastic-integration-corpus-generator-tool are supported. -For details on how to run this command, review the [HOWTO guide](./docs/howto/generate_corpus.md).` - const benchLongDescription = `Use this command to run benchmarks on a package. Currently, the following types of benchmarks are available: #### Pipeline Benchmarks @@ -80,9 +71,6 @@ func setupBenchmarkCommand() *cobraext.Command { systemCmd := getSystemCommand() cmd.AddCommand(systemCmd) - generateCorpusCmd := getGenerateCorpusCommand() - cmd.AddCommand(generateCorpusCmd) - return cobraext.NewCommand(cmd, cobraext.ContextPackage) } @@ -257,16 +245,6 @@ func rallyCommandAction(cmd *cobra.Command, args []string) error { return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName) } - deferCleanup, err := cmd.Flags().GetDuration(cobraext.DeferCleanupFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.DeferCleanupFlagName) - } - - metricsInterval, err := cmd.Flags().GetDuration(cobraext.BenchMetricsIntervalFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.BenchMetricsIntervalFlagName) - } - dataReindex, err := cmd.Flags().GetBool(cobraext.BenchReindexToMetricstoreFlagName) if err != nil { return cobraext.FlagParsingError(err, cobraext.BenchReindexToMetricstoreFlagName) @@ -314,8 +292,6 @@ func rallyCommandAction(cmd *cobra.Command, args []string) error { withOpts := []rally.OptionFunc{ rally.WithVariant(variant), rally.WithBenchmarkName(benchName), - rally.WithDeferCleanup(deferCleanup), - rally.WithMetricsInterval(metricsInterval), rally.WithDataReindexing(dataReindex), rally.WithPackageRootPath(packageRootPath), rally.WithESAPI(esClient.API), @@ -496,75 +472,6 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { return nil } -func getGenerateCorpusCommand() *cobra.Command { - generateCorpusCmd := &cobra.Command{ - Use: "generate-corpus", - Short: "Generate benchmarks corpus data for the package", - Long: generateLongDescription, - Args: cobra.NoArgs, - RunE: generateDataStreamCorpusCommandAction, - } - - generateCorpusCmd.Flags().StringP(cobraext.PackageFlagName, cobraext.PackageFlagShorthand, "", cobraext.PackageFlagDescription) - generateCorpusCmd.Flags().StringP(cobraext.GenerateCorpusDataSetFlagName, cobraext.GenerateCorpusDataSetFlagShorthand, "", cobraext.GenerateCorpusDataSetFlagDescription) - generateCorpusCmd.Flags().StringP(cobraext.GenerateCorpusSizeFlagName, cobraext.GenerateCorpusSizeFlagShorthand, "", cobraext.GenerateCorpusSizeFlagDescription) - generateCorpusCmd.Flags().StringP(cobraext.GenerateCorpusCommitFlagName, cobraext.GenerateCorpusCommitFlagShorthand, "main", cobraext.GenerateCorpusCommitFlagDescription) - generateCorpusCmd.Flags().StringP(cobraext.GenerateCorpusRallyTrackOutputDirFlagName, cobraext.GenerateCorpusRallyTrackOutputDirFlagShorthand, "", cobraext.GenerateCorpusRallyTrackOutputDirFlagDescription) - - return generateCorpusCmd -} - -func generateDataStreamCorpusCommandAction(cmd *cobra.Command, _ []string) error { - packageName, err := cmd.Flags().GetString(cobraext.PackageFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.PackageFlagName) - } - - dataSetName, err := cmd.Flags().GetString(cobraext.GenerateCorpusDataSetFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.GenerateCorpusDataSetFlagName) - } - - totSize, err := cmd.Flags().GetString(cobraext.GenerateCorpusSizeFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.GenerateCorpusSizeFlagName) - } - - totSizeInBytes, err := humanize.ParseBytes(totSize) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.GenerateCorpusSizeFlagName) - } - - commit, err := cmd.Flags().GetString(cobraext.GenerateCorpusCommitFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.GenerateCorpusCommitFlagName) - } - - if len(commit) == 0 { - commit = "main" - } - - rallyTrackOutputDir, err := cmd.Flags().GetString(cobraext.GenerateCorpusRallyTrackOutputDirFlagName) - if err != nil { - return cobraext.FlagParsingError(err, cobraext.GenerateCorpusRallyTrackOutputDirFlagName) - } - - genLibClient := corpusgenerator.NewClient(commit) - generator, err := corpusgenerator.NewGenerator(genLibClient, packageName, dataSetName, totSizeInBytes) - if err != nil { - return fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err) - } - - // TODO: we need a way to extract the type from the package and dataset, currently hardcode to `metrics` - dataStream := fmt.Sprintf("metrics-%s.%s-default", packageName, dataSetName) - err = corpusgenerator.RunGenerator(generator, dataStream, rallyTrackOutputDir) - if err != nil { - return fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err) - } - - return nil -} - func initializeESMetricsClient(ctx context.Context) (*elasticsearch.Client, error) { address := os.Getenv(benchcommon.ESMetricstoreHostEnv) user := os.Getenv(benchcommon.ESMetricstoreUsernameEnv) diff --git a/docs/howto/generate_corpus.md b/docs/howto/generate_corpus.md deleted file mode 100644 index 1a9c9603d..000000000 --- a/docs/howto/generate_corpus.md +++ /dev/null @@ -1,41 +0,0 @@ -# HOWTO: Generate corpus for a package dataset - -## Introduction - -The `elastic-package` tool can be used to generate a rally corpus for a package dataset. -This feature is currently in beta and manual steps are required to create a valid rally track from the generated corpus. -Currently, only data for what we have related assets on https://github.com/elastic/elastic-integration-corpus-generator-tool are supported. - -### Generate a corpus for a package dataset - -#### Steps - -1. Run the elastic-package command for generating the corpus of the package dataset: - `elastic-package benchmark generate-corpus --dataset sqs --package aws --size 100M` - 1. replace the sample value for `--dataset` with the one of the dataset you want to generate a corpus for - 2. replace the sample value for `--package` with the one of the package you want to generate a corpus for - 3. replace the sample value for `--size` with the *approximate* size of the corpus you want to generate -2. Choose a file where to redirect the output of the command if you want to save it: - `elastic-package benchmark generate-corpus --dataset sqs --package aws --size 100M > aws.sqs.100M.ndjson` - 1. replace the sample value of the redirect file with the one you've chosen - -### Generate a rally track for a package dataset and run a rally benchmark - -*BEWARE*: this is only supported for `metrics` type data streams. - -#### Steps - -1. Run the elastic-package command for generating the corpus of the package dataset: - `elastic-package benchmark generate-corpus --dataset sqs --package aws --size 100M --rally-track-output-dir - ./track-output-dir` - 1. replace the sample value for `--dataset` with the one of the dataset you want to generate a corpus for - 2. replace the sample value for `--package` with the one of the package you want to generate a corpus for - 3. replace the sample value for `--size` with the *approximate* size of the corpus you want to generate - 4. replace the sample value for `--rally-track-output-dir` with the path to the folder where you want to save the rally track and the generated corpus (the folder will be created if it does not exist already) -2. Go to the Kibana instance of the cluster you want to run the rally on and install the integration package that you have generated the rally track for. You can use as well the `elastic-package install` command in order to install the package in Kibana: more details in this [HOWTO guide](./install_package.md). -3. Run the rally race with the generated track: - `esrally race --kill-running-processes --track-path=./track-output-dir --target-hosts=my-deployment.es.eastus2.azure.elastic-cloud.com:443 --pipeline=benchmark-only` - 1. replace the sample value for `--track-path` with the path to the folder provided as `--rally-track-output-dir` at step 1 - 2. replace the sample value for `--target-hosts` with the host and port of the Elasticsearch instance(s) you want rally to connect to. - 3. You might need to add the "client-options" parameter to rally in order to authenticate and use SSL: `--client-options="use_ssl:true,verify_certs:true,basic_auth_user:'elastic',basic_auth_password:'changeme'"` - 1. replace the sample value for `basic_auth_user` and `basic_auth_password` in `--client-options` to the credentials of the user in the cluster you want rally to use. diff --git a/docs/howto/rally_benchmarking.md b/docs/howto/rally_benchmarking.md index c60500440..e374c38e9 100644 --- a/docs/howto/rally_benchmarking.md +++ b/docs/howto/rally_benchmarking.md @@ -1,4 +1,4 @@ -# HOWTO: Writing system benchmarks for a package +# HOWTO: Writing rally benchmarks for a package ## Introduction Elastic Packages are comprised of data streams. A rally benchmark runs `esrally` track with a corpus of data into an Elasticsearch data stream, and reports rally stats as well as retrieving performance metrics from the Elasticsearch nodes. @@ -10,11 +10,11 @@ Conceptually, running a rally benchmark involves the following steps: 1. Deploy the Elastic Stack, including Elasticsearch, Kibana, and the Elastic Agent(s). This step takes time so it should typically be done once as a pre-requisite to running a system benchmark scenario. 1. Install a package that configures its assets for every data stream in the package. 1. Metrics collections from the cluster starts. (**TODO**: record metrics from all Elastic Agents involved using the `system` integration.) -1. Send the collected metrics to the ES Metricstore if set. 1. Generate data (it uses the [corpus-generator-tool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) 1. Run an `esrally` track with the corpus of generated data. `esrally` must be installed on the system where the `elastic-package` is run and available in the `PATH`. 1. Wait for the `esrally` track to be executed. 1. Metrics collection ends and a summary report is created. +1. Send the collected metrics to the ES Metricstore if set. 1. Delete test artifacts. 1. Optionally reindex all ingested data into the ES Metricstore for further analysis. 1. **TODO**: Optionally compare results against another benchmark run. @@ -60,7 +60,6 @@ Example: description: Benchmark 20000 events ingested data_stream: name: testds -warmup_time_period: 10s corpora: generator: total_events: 900000 @@ -275,7 +274,7 @@ In the directory of the `rally-track-output-dir` flag two files are saved: Both files are required to replay the rally benchmark. The first file references the second in its content. The command to run for replaying the track is the following: ```shell -rally --target-hosts='{"default":["%es_cluster_host:es_cluster_port%"]}' --track-path=%path/to/saved-track-json% --client-options='{"default":{"basic_auth_user":"%es_user%","basic_auth_password":"%es_user%","use_ssl":true,"verify_certs":false}}' --pipeline=benchmark-only +esrally --target-hosts='{"defauelt":["%es_cluster_host:es_cluster_port%"]}' --track-path=%path/to/saved-track-json% --client-options='{"default":{"basic_auth_user":"%es_user%","basic_auth_password":"%es_user%","use_ssl":true,"verify_certs":false}}' --pipeline=benchmark-only ``` Please refer to [esrally CLI reference](https://esrally.readthedocs.io/en/stable/command_line_reference.html) for more details. diff --git a/internal/benchrunner/runners/rally/options.go b/internal/benchrunner/runners/rally/options.go index 02aa8e6d7..34b4adc9d 100644 --- a/internal/benchrunner/runners/rally/options.go +++ b/internal/benchrunner/runners/rally/options.go @@ -67,18 +67,6 @@ func WithBenchmarkName(name string) OptionFunc { } } -func WithDeferCleanup(d time.Duration) OptionFunc { - return func(opts *Options) { - opts.DeferCleanup = d - } -} - -func WithMetricsInterval(d time.Duration) OptionFunc { - return func(opts *Options) { - opts.MetricsInterval = d - } -} - func WithDataReindexing(b bool) OptionFunc { return func(opts *Options) { opts.ReindexData = b diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index 48e53e421..ca1f06097 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -17,13 +17,13 @@ import ( "os/exec" "path/filepath" "strings" + "text/template" "time" "github.com/elastic/elastic-package/internal/packages/installer" "github.com/magefile/mage/sh" - "github.com/elastic/elastic-package/internal/corpusgenerator" "github.com/elastic/elastic-package/internal/stack" "github.com/google/uuid" @@ -51,6 +51,42 @@ const ( // BenchType defining rally benchmark BenchType benchrunner.Type = "rally" + + rallyTrackTemplate = `{% import "rally.helpers" as rally with context %} +{ + "version": 2, + "description": "Track for [[.DataStream]]", + "datastream": [ + { + "name": "[[.DataStream]]", + "body": "[[.CorpusFilename]]" + } + ], + "corpora": [ + { + "name": "[[.CorpusFilename]]", + "documents": [ + { + "target-data-stream": "[[.DataStream]]", + "source-file": "[[.CorpusFilename]]", + "document-count": [[.CorpusDocsCount]], + "uncompressed-bytes": [[.CorpusSizeInBytes]] + } + ] + } + ], + "schedule": [ + { + "operation": { + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} + }, + "clients": {{bulk_indexing_clients | default(8)}} + } + ] +} +` ) var ErrDryRun = errors.New("dry run: rally benchmark not executed") @@ -524,7 +560,7 @@ func (r *runner) runGenerator(destDir string) error { return fmt.Errorf("cannot not create rally track file: %w", err) } r.trackFile = trackFile.Name() - rallyTrackContent, err := corpusgenerator.GenerateRallyTrack(r.runtimeDataStream, corpusFile, corpusDocsCount) + rallyTrackContent, err := generateRallyTrack(r.runtimeDataStream, corpusFile, corpusDocsCount) if err != nil { return fmt.Errorf("cannot not generate rally track content: %w", err) } @@ -909,3 +945,34 @@ func createRunID() string { func getDataStreamPath(packageRoot, dataStream string) string { return filepath.Join(packageRoot, "data_stream", dataStream) } + +func generateRallyTrack(dataStream string, corpusFile *os.File, corpusDocsCount uint64) ([]byte, error) { + t := template.New("rallytrack") + + parsedTpl, err := t.Delims("[[", "]]").Parse(rallyTrackTemplate) + if err != nil { + return nil, fmt.Errorf("error while parsing rally track template: %w", err) + } + + fi, err := corpusFile.Stat() + if err != nil { + return nil, fmt.Errorf("error with stat on rally corpus file: %w", err) + } + + corpusSizeInBytes := fi.Size() + + buf := new(bytes.Buffer) + templateData := map[string]any{ + "DataStream": dataStream, + "CorpusFilename": filepath.Base(corpusFile.Name()), + "CorpusDocsCount": corpusDocsCount, + "CorpusSizeInBytes": corpusSizeInBytes, + } + + err = parsedTpl.Execute(buf, templateData) + if err != nil { + return nil, fmt.Errorf("error on parsin on rally track template: %w", err) + } + + return buf.Bytes(), nil +} diff --git a/internal/benchrunner/runners/rally/scenario.go b/internal/benchrunner/runners/rally/scenario.go index 89186575d..27982bf61 100644 --- a/internal/benchrunner/runners/rally/scenario.go +++ b/internal/benchrunner/runners/rally/scenario.go @@ -7,7 +7,6 @@ package rally import ( "errors" "fmt" - "os" "path/filepath" "github.com/elastic/go-ucfg/yaml" @@ -56,7 +55,7 @@ func readConfig(path, scenario, packageName, packageVersion string) (*scenario, configPath := filepath.Join(path, devPath, fmt.Sprintf("%s.yml", scenario)) c := defaultConfig() cfg, err := yaml.NewConfigWithFile(configPath) - if err != nil && !errors.Is(err, os.ErrNotExist) { + if err != nil { return nil, fmt.Errorf("can't load benchmark configuration: %s: %w", configPath, err) } @@ -69,5 +68,9 @@ func readConfig(path, scenario, packageName, packageVersion string) (*scenario, c.Package = packageName c.Version = packageVersion + if c.DataStream.Name == "" { + return nil, errors.New("can't read data stream name from benchmark configuration: empty") + } + return c, nil } diff --git a/internal/cobraext/flags.go b/internal/cobraext/flags.go index f02bf6e66..ad4191ab7 100644 --- a/internal/cobraext/flags.go +++ b/internal/cobraext/flags.go @@ -104,25 +104,8 @@ const ( FailOnMissingFlagName = "fail-on-missing" FailOnMissingFlagDescription = "fail if tests are missing" - FailFastFlagName = "fail-fast" - FailFastFlagDescription = "fail immediately if any file requires updates (do not overwrite)" - - GenerateCorpusDataSetFlagName = "dataset" - GenerateCorpusDataSetFlagShorthand = "D" - GenerateCorpusDataSetFlagDescription = "dataset to generate benchmarks data corpus for" - - GenerateCorpusCommitFlagName = "commit" - GenerateCorpusCommitFlagShorthand = "C" - GenerateCorpusCommitFlagDescription = "commit to fetch assets from the corpus generator tool repo from" - - GenerateCorpusRallyTrackOutputDirFlagName = "rally-track-output-dir" - GenerateCorpusRallyTrackOutputDirFlagShorthand = "R" - GenerateCorpusRallyTrackOutputDirFlagDescription = "output dir of the rally track: if present the command will generate a rally track instead of writing the generated data to stdout" - - GenerateCorpusSizeFlagName = "size" - GenerateCorpusSizeFlagShorthand = "S" - GenerateCorpusSizeFlagDescription = "size of benchmarks data corpus to generate" - + FailFastFlagName = "fail-fast" + FailFastFlagDescription = "fail immediately if any file requires updates (do not overwrite)" GenerateTestResultFlagName = "generate" GenerateTestResultFlagDescription = "generate test result file" diff --git a/internal/corpusgenerator/assets.go b/internal/corpusgenerator/assets.go deleted file mode 100644 index 390175dac..000000000 --- a/internal/corpusgenerator/assets.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package corpusgenerator - -import ( - "context" - "fmt" - "net/http" - - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/config" - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/fields" -) - -const ( - confYamlAssetName = "schema-b/configs.yml" - fieldsYamlAssetName = "schema-b/fields.yml" - gotextTemplateAssetName = "schema-b/gotext.tpl" -) - -// GetGoTextTemplate returns the gotext template of a package's data stream -func (c *Client) GetGoTextTemplate(packageName, dataStreamName string) ([]byte, error) { - assetsSubFolder := fmt.Sprintf("%s.%s", packageName, dataStreamName) - statusCode, respBody, err := c.get(fmt.Sprintf("%s/%s", assetsSubFolder, gotextTemplateAssetName)) - if err != nil { - return nil, fmt.Errorf("could not get gotext template: %w", err) - } - - if statusCode != http.StatusOK { - return nil, fmt.Errorf("could not get gotext template; API status code = %d; response body = %s", statusCode, respBody) - } - - return respBody, nil -} - -// GetConf returns the genlib.Config of a package's data stream -func (c *Client) GetConf(packageName, dataStreamName string) (genlib.Config, error) { - assetsSubFolder := fmt.Sprintf("%s.%s", packageName, dataStreamName) - - statusCode, respBody, err := c.get(fmt.Sprintf("%s/%s", assetsSubFolder, confYamlAssetName)) - if err != nil { - - return genlib.Config{}, fmt.Errorf("could not get config yaml: %w", err) - } - - if statusCode != http.StatusOK { - return genlib.Config{}, fmt.Errorf("could not get config yaml; API status code = %d; response body = %s", statusCode, respBody) - } - - cfg, err := config.LoadConfigFromYaml(respBody) - if err != nil { - return genlib.Config{}, fmt.Errorf("could not load config yaml: %w", err) - } - - return cfg, nil -} - -// GetFields returns the genlib.Config of a package's data stream -func (c *Client) GetFields(packageName, dataStreamName string) (genlib.Fields, error) { - assetsSubFolder := fmt.Sprintf("%s.%s", packageName, dataStreamName) - - statusCode, respBody, err := c.get(fmt.Sprintf("%s/%s", assetsSubFolder, fieldsYamlAssetName)) - if err != nil { - return genlib.Fields{}, fmt.Errorf("could not get fields yaml: %w", err) - } - - if statusCode != http.StatusOK { - return genlib.Fields{}, fmt.Errorf("could not get fields yaml; API status code = %d; response body = %s", statusCode, respBody) - } - - ctx := context.Background() - fields, err := fields.LoadFieldsWithTemplateFromString(ctx, string(respBody)) - if err != nil { - return genlib.Fields{}, fmt.Errorf("could not load fields yaml: %w", err) - } - - return fields, nil -} diff --git a/internal/corpusgenerator/client.go b/internal/corpusgenerator/client.go deleted file mode 100644 index 05dce6771..000000000 --- a/internal/corpusgenerator/client.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package corpusgenerator - -import ( - "bytes" - "fmt" - "io" - "net/http" - "net/url" - "strings" - - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" - - "github.com/elastic/elastic-package/internal/logger" -) - -// TODO: fetching artifacts from the corpus generator repo is a temporary solution in place -// before we will have the relevant assets content in package-spec. -// We still give the option to fetch from a specific commit -const integrationCorpusGeneratorAssetsBaseURL = "https://raw.githubusercontent.com/elastic/elastic-integration-corpus-generator-tool/%COMMIT%/assets/templates/" -const commitPlaceholder = "%COMMIT%" - -// Client is responsible for exporting assets from elastic-integration-corpus-generator-tool repo. -type Client struct { - commit string -} - -// GenLibClient is an interface for the genlib client -type GenLibClient interface { - GetGoTextTemplate(packageName, dataStreamName string) ([]byte, error) - GetConf(packageName, dataStreamName string) (genlib.Config, error) - GetFields(packageName, dataStreamName string) (genlib.Fields, error) -} - -// NewClient creates a new instance of the client. -func NewClient(commit string) GenLibClient { - return &Client{commit: commit} -} - -func (c *Client) get(resourcePath string) (int, []byte, error) { - return c.sendRequest(http.MethodGet, resourcePath, nil) -} - -func (c *Client) sendRequest(method, resourcePath string, body []byte) (int, []byte, error) { - reqBody := bytes.NewReader(body) - commitAssetsBaseURL := strings.Replace(integrationCorpusGeneratorAssetsBaseURL, commitPlaceholder, c.commit, -1) - base, err := url.Parse(commitAssetsBaseURL) - if err != nil { - return 0, nil, fmt.Errorf("could not create base URL from commit: %v: %w", c.commit, err) - } - - rel, err := url.Parse(resourcePath) - if err != nil { - return 0, nil, fmt.Errorf("could not create relative URL from resource path: %v: %w", resourcePath, err) - } - - u := base.JoinPath(rel.EscapedPath()) - - logger.Debugf("%s %s", method, u) - - req, err := http.NewRequest(method, u.String(), reqBody) - if err != nil { - return 0, nil, fmt.Errorf("could not create %v request to elastic-integration-corpus-generator-tool repo: %s: %w", method, resourcePath, err) - } - - client := http.Client{} - - resp, err := client.Do(req) - if err != nil { - return 0, nil, fmt.Errorf("could not send request to elastic-integration-corpus-generator-tool repo: %w", err) - } - - if resp.Body == nil { - return 0, nil, fmt.Errorf("could not get response from elastic-integration-corpus-generator-tool repo: %w", err) - } - - defer func() { - _ = resp.Body.Close() - }() - - body, err = io.ReadAll(resp.Body) - if err != nil { - return resp.StatusCode, nil, fmt.Errorf("could not read response body: %w", err) - } - - return resp.StatusCode, body, nil -} diff --git a/internal/corpusgenerator/rally.go b/internal/corpusgenerator/rally.go deleted file mode 100644 index c50e55bf4..000000000 --- a/internal/corpusgenerator/rally.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package corpusgenerator - -import ( - "bytes" - "fmt" - "os" - "path/filepath" - "text/template" -) - -const ( - rallyTrackTemplate = `{% import "rally.helpers" as rally with context %} -{ - "version": 2, - "description": "Track for [[.DataStream]]", - "datastream": [ - { - "name": "[[.DataStream]]", - "body": "[[.CorpusFilename]]" - } - ], - "corpora": [ - { - "name": "[[.CorpusFilename]]", - "documents": [ - { - "target-data-stream": "[[.DataStream]]", - "source-file": "[[.CorpusFilename]]", - "document-count": [[.CorpusDocsCount]], - "uncompressed-bytes": [[.CorpusSizeInBytes]] - } - ] - } - ], - "schedule": [ - { - "operation": { - "operation-type": "bulk", - "bulk-size": {{bulk_size | default(5000)}}, - "ingest-percentage": {{ingest_percentage | default(100)}} - }, - "clients": {{bulk_indexing_clients | default(8)}} - } - ] -} -` -) - -func GenerateRallyTrack(dataStream string, corpusFile *os.File, corpusDocsCount uint64) ([]byte, error) { - t := template.New("rallytrack") - - parsedTpl, err := t.Delims("[[", "]]").Parse(rallyTrackTemplate) - if err != nil { - return nil, fmt.Errorf("error while parsing rally track template: %w", err) - } - - fi, err := corpusFile.Stat() - if err != nil { - return nil, fmt.Errorf("error with stat on rally corpus file: %w", err) - } - - corpusSizeInBytes := fi.Size() - - buf := new(bytes.Buffer) - templateData := map[string]any{ - "DataStream": dataStream, - "CorpusFilename": filepath.Base(corpusFile.Name()), - "CorpusDocsCount": corpusDocsCount, - "CorpusSizeInBytes": corpusSizeInBytes, - } - - err = parsedTpl.Execute(buf, templateData) - if err != nil { - return nil, fmt.Errorf("error on parsin on rally track template: %w", err) - } - - return buf.Bytes(), nil -} diff --git a/internal/corpusgenerator/utils.go b/internal/corpusgenerator/utils.go deleted file mode 100644 index f55b3c8da..000000000 --- a/internal/corpusgenerator/utils.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package corpusgenerator - -import ( - "bytes" - "io" - "os" - "path/filepath" - - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" -) - -func RunGenerator(generator genlib.Generator, dataStream, rallyTrackOutputDir string) error { - var f io.Writer - if len(rallyTrackOutputDir) == 0 { - f = os.Stdout - } else { - err := os.MkdirAll(rallyTrackOutputDir, os.ModePerm) - if err != nil { - return err - } - - f, err = os.CreateTemp(rallyTrackOutputDir, "corpus-*") - if err != nil { - return err - } - } - buf := bytes.NewBufferString("") - var corpusDocsCount uint64 - for { - err := generator.Emit(buf) - if err == io.EOF { - break - } - - if err != nil { - return err - } - - // TODO: this should be taken care of by the corpus generator tool, once it will be done let's remove this - event := bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("")) - if _, err = f.Write(event); err != nil { - return err - } - - if _, err = f.Write([]byte("\n")); err != nil { - return err - } - - buf.Reset() - corpusDocsCount += 1 - } - - if len(rallyTrackOutputDir) > 0 { - corpusFile := f.(*os.File) - rallyTrackContent, err := GenerateRallyTrack(dataStream, corpusFile, corpusDocsCount) - if err != nil { - return err - } - - err = os.WriteFile(filepath.Join(rallyTrackOutputDir, "track.json"), rallyTrackContent, os.ModePerm) - if err != nil { - return err - } - - } - - return generator.Close() -} - -func NewGenerator(genLibClient GenLibClient, packageName, dataStreamName string, totEvents uint64) (genlib.Generator, error) { - - config, err := genLibClient.GetConf(packageName, dataStreamName) - if err != nil { - return nil, err - } - fields, err := genLibClient.GetFields(packageName, dataStreamName) - - if err != nil { - return nil, err - } - tpl, err := genLibClient.GetGoTextTemplate(packageName, dataStreamName) - if err != nil { - return nil, err - } - - g, err := genlib.NewGeneratorWithTextTemplate(tpl, config, fields, totEvents) - if err != nil { - return nil, err - } - - return g, nil -} diff --git a/internal/corpusgenerator/utils_test.go b/internal/corpusgenerator/utils_test.go deleted file mode 100644 index 4801e1c05..000000000 --- a/internal/corpusgenerator/utils_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package corpusgenerator - -import ( - "bytes" - "io" - "testing" - - "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" - "github.com/stretchr/testify/assert" -) - -type mockClient struct { -} - -func (c mockClient) GetGoTextTemplate(packageName, dataStreamName string) ([]byte, error) { - return []byte("7 bytes"), nil -} -func (c mockClient) GetConf(packageName, dataStreamName string) (genlib.Config, error) { - return genlib.Config{}, nil -} -func (c mockClient) GetFields(packageName, dataStreamName string) (genlib.Fields, error) { - return genlib.Fields{}, nil -} - -func TestGeneratorEmitTotEvents(t *testing.T) { - generator, err := NewGenerator(mockClient{}, "packageName", "dataSetName", 7) - assert.NoError(t, err) - - totEvents := 0 - buf := bytes.NewBufferString("") - for { - err := generator.Emit(buf) - if err == io.EOF { - break - } - - totEvents += 1 - } - - assert.Equal(t, 7, totEvents, "expected 7 totEvents, got %d", totEvents) -}