diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..a7db148 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,27 @@ +name: ci + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + "build-and-test": + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Build + run: go build . + + - name: Test + run: go test -v ./... + + - name: Check README up-to-date + run: go run ./scripts/update_readme.go -mode check diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2689c75 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..5b92f61 --- /dev/null +++ b/NOTICE @@ -0,0 +1,4 @@ +Datadog datadog-pgo +Copyright 2024-2024 Datadog, Inc. + +This product includes software developed at Datadog (). \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f3a0d88 --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# datadog-pgo + +datadog-pgo is a tool for integrating continuous [profile-guided optimization](https://go.dev/doc/pgo) (PGO) into your Go build process. It fetches representative CPU profiles from Datadog and merges them into a `default.pgo` file that is used by the Go toolchain to optimize your application. + +## ⚠️ Public Beta + +* This tool is currently in public beta. Please open a GitHub issue if you have any feedback or questions. +* Always use the latest version of datadog-pgo in CI as shown below. Old versions may become deprecated and stop working on short notice. +* We're considering to integrate this tool into [datadog-ci](https://github.com/DataDog/datadog-ci). Please let us know if you're interested in this. + +## Getting Started + +1. Create a dedicated API key and unscoped Application key for PGO as described in the [documentation](https://docs.datadoghq.com/account_management/api-app-keys/). +2. Set the `DD_API_KEY` and `DD_APP_KEY` via the environment secret mechanism of your CI provider. +3. Run `datadog-pgo` before your build step. E.g. for a service `foo` that runs in `prod` and has its main package in `./cmd/foo`, you should add this step: + +``` +go run github.com/DataDog/datadog-pgo@latest 'service:foo env:prod' ./cmd/foo/default.pgo +``` + +## CLI + + +``` +usage: datadog-pgo [OPTIONS]... QUERY... DEST + +datadog-pgo fetches CPU profiles from Datadog using the given QUERY arguments +and merges the results into a single DEST file suitable for profile-guided +optimization. + +In order to use this, you need to set the following environment variables. + + DD_API_KEY: A Datadog API key + DD_APP_KEY: A Datadog Application key + DD_SITE: A Datadog site to use (defaults to datadoghq.com) + +After this, typical usage will look like this: + + datadog-pgo 'service:my-service env:prod' ./cmd/my-service/default.pgo + +The go toolchain will automatically pick up any default.pgo file found in the +main package (go1.21+), so you can build your service as usual, for example: + + go build ./cmd/my-service + +Unless the -fail flag is set, datadog-pgo will always return with a zero exit +code in order to let your build succeed, even if a PGO download error occured. + +OPTIONS + -fail + return with a non-zero exit code on failure + -from duration + how far back to search for profiles (default 72h0m0s) + -json + print logs in json format + -profiles int + the number of profiles to fetch per query (default 5) + -timeout duration + timeout for fetching PGO profile (default 1m0s) + -v verbose output +``` + + +## FAQ + +### How are profiles selected? + +By default datadog-pgo selects the top 5 profiles by CPU utilization within the last 72 hours. You can change this behavior using the `-profiles` and `-from` flags. + +This oppinionated approach is based on our internal testing where it has yielded better results than taking a sample of average profiles. + +Please open a GitHub issue if you have feedback on this. + +### Can I use profiles from a different environment? + +The official [PGO documentation](https://go.dev/doc/pgo) recommends using profiles from your production environment. Profiles from other environments may not be representative of the production workload and will likely yield suboptimal results. + +### How do I know if PGO is working? + +dd-trace-go tags the profiles of PGO-enabled applications with a `pgo:true` tag. You can search for profiles with this tag in the profile explorer. + +### How can I measure the impact of PGO? + +The impact of PGO can be tricky to measure. When in doubt, try to measure CPU time per request by building a dashboard widget that divides the CPU usage of your application by the number of requests it serves. We hope to provide a better solution for this in the future. + +### What happens if there is a problem? + +datadog-pgo will always return with a zero exit code in order to let your build succeed, even if pgo downloading failed. If you want to fail the build on error, use the `-fail` flag. + +### How can I look at the profiles? + +1. Copy the the `debug-query` output from the last log line of datadog-pgo. +2. Go to the profile explorer in the Datadog UI and paste the query. +3. Increase the time range to 1 week to make sure you see all profiles. + +Please note that the profile retention is 7 days. If you're interested in the use case of retaining pgo profiles for longer, please let us know by opening an github issue on this repo. + +### How can I provide feedback? + +Just open a GitHub issue on this repository. We're happy to hear from you! diff --git a/client.go b/client.go new file mode 100644 index 0000000..c6a24de --- /dev/null +++ b/client.go @@ -0,0 +1,243 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "time" +) + +// maxConcurrency is the maximum number of concurrent requests to make to the +// Datadog API. +const maxConcurrency = 5 + +// ClientFromEnv returns a new Client with its fields populated from the +// environment. It returns an error if any of the required environment variables +// are not set. +func ClientFromEnv() (*Client, error) { + c := &Client{concurrency: make(chan struct{}, maxConcurrency)} + if c.site = os.Getenv("DD_SITE"); c.site == "" { + c.site = "datadoghq.com" + } + if c.apiKey = os.Getenv("DD_API_KEY"); c.apiKey == "" { + return nil, errors.New("DD_API_KEY is not set") + } + if c.appKey = os.Getenv("DD_APP_KEY"); c.appKey == "" { + return nil, errors.New("DD_APP_KEY is not set") + } + return c, nil +} + +// Client is a client for the Datadog API. +type Client struct { + site string + apiKey string + appKey string + concurrency chan struct{} +} + +// SearchAndDownloadProfiles searches for profiles using the given queries and +// downloads them. +func (c *Client) SearchAndDownloadProfiles(ctx context.Context, queries []SearchQuery) (profiles *ProfilesDownload, err error) { + defer wrapErr(&err, "search and download profiles") + defer c.limitConcurrency()() + + var payload = struct { + Queries []SearchQuery `json:"queries"` + }{queries} + + data, err := c.post(ctx, "/api/unstable/profiles/gopgo", payload) + if err != nil { + return nil, err + } + return &ProfilesDownload{data: data}, nil +} + +// SearchProfiles searches for profiles using the given query. It returns a list +// of profiles and an error if any. +func (c *Client) SearchProfiles(ctx context.Context, query SearchQuery) (profiles []*SearchProfile, err error) { + defer wrapErr(&err, "search profiles") + defer c.limitConcurrency()() + var response struct { + Data []struct { + ID string `json:"id"` + Attributes struct { + ID string `json:"id"` + Service string `json:"service"` + DurationNanos float64 `json:"duration_nanos"` + Timestamp JSONTime `json:"timestamp"` + Custom struct { + Metrics struct { + CoreCPUCores float64 `json:"core_cpu_cores"` + } `json:"metrics"` + } `json:"custom"` + } `json:"attributes"` + } `json:"data"` + } + data, err := c.post(ctx, "/api/unstable/profiles/list", query) + if err != nil { + return nil, err + } else if err := json.Unmarshal(data, &response); err != nil { + return nil, err + } + + if len(response.Data) == 0 { + return nil, errors.New("no profiles found") + } + + for _, item := range response.Data { + p := &SearchProfile{ + EventID: item.ID, + ProfileID: item.Attributes.ID, + Service: item.Attributes.Service, + CPUCores: item.Attributes.Custom.Metrics.CoreCPUCores, + Timestamp: item.Attributes.Timestamp.Time, + Duration: time.Duration(item.Attributes.DurationNanos), + } + profiles = append(profiles, p) + } + return +} + +// DownloadProfile downloads the profile identified by the given SearchProfile. +func (c *Client) DownloadProfile(ctx context.Context, p *SearchProfile) (d ProfileDownload, err error) { + defer wrapErr(&err, "download profile") + defer c.limitConcurrency()() + req, err := c.request(ctx, "GET", fmt.Sprintf("/api/ui/profiling/profiles/%s/download?eventId=%s", p.ProfileID, p.EventID), nil) + if err != nil { + return ProfileDownload{}, err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return ProfileDownload{}, err + } + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + if err != nil { + return ProfileDownload{}, err + } + return ProfileDownload{data: data}, nil +} + +// request creates a new HTTP request with the given method and path and sets +// the required headers. +func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) { + url := fmt.Sprintf("https://app.%s%s", c.site, path) + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", name+"/"+version) + req.Header.Set("DD-APPLICATION-KEY", c.appKey) + req.Header.Set("DD-API-KEY", c.apiKey) + return req, nil +} + +// post sends a POST request to the given path with the given payload and decodes +// the response. +func (c *Client) post(ctx context.Context, path string, payload any) ([]byte, error) { + reqBody, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + req, err := c.request(ctx, "POST", path, reqBody) + if err != nil { + return nil, err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + resBody, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, fmt.Errorf("POST %s: %s: please check that your DD_API_KEY, DD_APP_KEY and DD_SITE env vars are set correctly", path, res.Status) + } + return resBody, nil +} + +// limitConcurrency blocks until a slot is available in the concurrency channel. +// It returns a function that should be called to release the slot. +func (c *Client) limitConcurrency() func() { + c.concurrency <- struct{}{} + return func() { <-c.concurrency } +} + +// SearchQuery holds the query parameters for searching for profiles. +type SearchQuery struct { + Filter SearchFilter `json:"filter"` + Sort SearchSort `json:"sort"` + Limit int `json:"limit"` +} + +// SearchFilter holds the filter parameters for searching for profiles. +type SearchFilter struct { + From JSONTime `json:"from"` + To JSONTime `json:"to"` + Query string `json:"query"` +} + +// SearchSort holds the sort parameters for searching for profiles. +type SearchSort struct { + Order string `json:"order"` + Field string `json:"field"` +} + +// timeFormat is the time format used by the Datadog API. +const timeFormat = "2006-01-02T15:04:05.999999999Z" + +// JSONTime is a time.Time that marshals to and from JSON in the format used by +// the Datadog API. +type JSONTime struct { + time.Time +} + +// MarshalJSON marshals the time in the format used by the Datadog API. +func (t JSONTime) MarshalJSON() ([]byte, error) { + return json.Marshal(t.String()) +} + +// UnmarshalJSON unmarshals the time from the format used by the Datadog API. +func (t *JSONTime) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + parsed, err := time.Parse(timeFormat, s) + if err != nil { + return err + } + t.Time = parsed + return nil +} + +// String returns the time in the format used by the Datadog API. +func (t JSONTime) String() string { + return t.Time.UTC().Round(time.Second).Format(timeFormat) +} + +// SearchProfile holds information about a profile search result. ProfileID and +// EventID are used to identify the SearchProfile for downloading. The other +// fields are just logged for debugging. +type SearchProfile struct { + Service string + CPUCores float64 + ProfileID string + EventID string + Timestamp time.Time + Duration time.Duration +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..819c0fb --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/DataDog/datadog-pgo + +go 1.21 + +require ( + github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 + github.com/lmittmann/tint v1.0.4 + github.com/mattn/go-isatty v0.0.20 + github.com/sourcegraph/conc v0.3.0 +) + +require ( + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/sys v0.6.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7f25a8f --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q= +github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..49c016f --- /dev/null +++ b/main.go @@ -0,0 +1,484 @@ +package main + +import ( + "archive/zip" + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "log/slog" + + "github.com/google/pprof/profile" + "github.com/lmittmann/tint" + "github.com/mattn/go-isatty" + "github.com/sourcegraph/conc/pool" +) + +const ( + name = "datadog-pgo" + version = "0.0.1" +) + +// main runs the pgo tool. +func main() { + if err := run(); err != nil && !errors.As(err, &handledError{}) { + if !errors.As(err, &loggedError{}) { + fmt.Fprintf(os.Stderr, "pgo: error: %v\n", err) + } + os.Exit(1) + } +} + +// run runs the pgo tool and returns an error if any. +func run() (err error) { + start := time.Now() + + // Define usage + flag.Usage = func() { + usage := `usage: ` + name + ` [OPTIONS]... QUERY... DEST + +` + name + ` fetches CPU profiles from Datadog using the given QUERY arguments +and merges the results into a single DEST file suitable for profile-guided +optimization. + +In order to use this, you need to set the following environment variables. + + DD_API_KEY: A Datadog API key + DD_APP_KEY: A Datadog Application key + DD_SITE: A Datadog site to use (defaults to datadoghq.com) + +After this, typical usage will look like this: + + ` + name + ` 'service:my-service env:prod' ./cmd/my-service/default.pgo + +The go toolchain will automatically pick up any default.pgo file found in the +main package (go1.21+), so you can build your service as usual, for example: + + go build ./cmd/my-service + +Unless the -fail flag is set, ` + name + ` will always return with a zero exit +code in order to let your build succeed, even if a PGO download error occured. + +OPTIONS` + fmt.Fprintln(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + + // Parse flags + var ( + failF = flag.Bool("fail", false, "return with a non-zero exit code on failure") + jsonF = flag.Bool("json", false, "print logs in json format") + profilesF = flag.Int("profiles", 5, "the number of profiles to fetch per query") + timeoutF = flag.Duration("timeout", 60*time.Second, "timeout for fetching PGO profile") + verboseF = flag.Bool("v", false, "verbose output") + fromF = flag.Duration("from", 3*24*time.Hour, "how far back to search for profiles") + ) + flag.Parse() + + // Validate args + if flag.NArg() < 2 { + flag.Usage() + return errors.New("at least 2 arguments are required") + } + + // Split args into queries and dst + queries := buildQueries(*fromF, *profilesF, flag.Args()[:flag.NArg()-1]) + dst := flag.Arg(flag.NArg() - 1) + + // Setup logger + logOpt := &slog.HandlerOptions{AddSource: *verboseF} + if *verboseF { + logOpt.Level = slog.LevelDebug + } + log := slog.New(tint.NewHandler(os.Stdout, &tint.Options{ + AddSource: logOpt.AddSource, + Level: logOpt.Level, + TimeFormat: "", + NoColor: !isatty.IsTerminal(os.Stdout.Fd()), + })) + if *jsonF { + log = slog.New(slog.NewJSONHandler(os.Stdout, logOpt)) + } + log.Info(name, "version", version, "go-version", runtime.Version()) + + // Log errors and turn them into warnings unless -fail is set + defer func() { + if err == nil { + return + } + log.Error(err.Error()) + err = loggedError{err} + if !*failF { + err = handledError{err} + log.Warn(name + " failed, but -fail is not set, returning exit code 0 to continue without PGO") + } + }() + + // Setup API client + client, err := ClientFromEnv() + if err != nil { + return fmt.Errorf("clientFromEnv: %w", err) + } + + // Create context + ctx, cancel := context.WithTimeout(context.Background(), *timeoutF) + defer cancel() + + // Search, download and merge profiles + mergedProfile, err := SearchDownloadMerge(ctx, log, client, queries) + if err != nil { + return err + } + + // Writing pgo file to dst + n, err := mergedProfile.Write(dst) + if err != nil { + return err + } + log.Info( + "wrote PGO file", + "path", dst, + "samples", mergedProfile.Samples(), + "bytes", n, + "total-duration", timeSinceRoundMS(start), + "debug-query", mergedProfile.DebugQuery(), + ) + return nil +} + +// buildQueries returns a list of SearchQuery for the given time window and queries. +func buildQueries(window time.Duration, limit int, queries []string) (searchQueries []SearchQuery) { + searchQueries = make([]SearchQuery, 0, len(queries)) + for _, q := range queries { + // PGO is only supported for Go right now, avoid fetching non-go + // profiles (e.g. from native) that might exist for the same query. + if !strings.Contains(q, "language:go") && !strings.Contains(q, "runtime:go") { + q = strings.TrimSpace(q) + " runtime:go" + } + + searchQueries = append(searchQueries, SearchQuery{ + Filter: SearchFilter{ + From: JSONTime{time.Now().Add(-window)}, + To: JSONTime{time.Now()}, + Query: q, + }, + Sort: SearchSort{ + Order: "desc", + // TODO(fg) or use @metrics.core_cpu_time_total? + Field: "@metrics.core_cpu_cores", + }, + Limit: limit, + }) + } + return +} + +// usePGOEndpoint is a flag to use the pgo endpoint instead of the search and +// download endpoints. If this new endpoint proves to work well, we can remove +// this flag and the old code. +const usePGOEndpoint = true + +// SearchDownloadMerge queries the profiles, downloads them and merges them into a single profile. +func SearchDownloadMerge(ctx context.Context, log *slog.Logger, client *Client, queries []SearchQuery) (*MergedProfile, error) { + if usePGOEndpoint { + return searchDownloadMergePGOEndpoint(ctx, log, client, queries) + } + return searchDownloadMerge(ctx, log, client, queries) +} + +// searchDownloadMerge queries the profiles, downloads them and merges them into a single profile. +func searchDownloadMerge(ctx context.Context, log *slog.Logger, client *Client, queries []SearchQuery) (*MergedProfile, error) { + newPool := func() *pool.ContextPool { + return pool.New().WithErrors().WithContext(ctx).WithCancelOnError().WithFirstError() + } + + var pgoProfile = &MergedProfile{} + queryPool := newPool() + downloadPool := newPool() + for _, q := range queries { + q := q + queryPool.Go(func(ctx context.Context) error { + log.Info( + "searching profiles", + "query", q.Filter.Query, + "by", q.Sort.Field, + "order", q.Sort.Order, + "from", q.Filter.From.String(), + "to", q.Filter.To.String(), + ) + startQuery := time.Now() + profiles, err := client.SearchProfiles(ctx, q) + if err != nil { + return err + } + log.Debug( + "found profiles", + "count", len(profiles), + "duration", timeSinceRoundMS(startQuery), + "query", q.Filter.Query, + ) + + if len(profiles) > q.Limit { + profiles = profiles[:q.Limit] + } + + for _, p := range profiles { + p := p + downloadPool.Go(func(ctx context.Context) error { + log.Info( + "downloading profile", + "service", p.Service, + "cpu-cores", float64(int(p.CPUCores*10))/10, + "duration", p.Duration, + "age", time.Since(p.Timestamp).Round(time.Second), + "profile-id", p.ProfileID, + ) + startDownload := time.Now() + download, err := client.DownloadProfile(ctx, p) + if err != nil { + return err + } + log.Debug( + "downloaded profile", + "duration", timeSinceRoundMS(startDownload), + "bytes", len(download.data), + "profile-id", p.ProfileID, + "event-id", p.EventID, + ) + + cpu, err := download.ExtractCPUProfile() + if err != nil { + return err + } + + prof, err := profile.ParseData(cpu) + if err != nil { + return err + } + return pgoProfile.Merge(p.ProfileID, prof) + }) + } + return nil + }) + } + if err := queryPool.Wait(); err != nil { + return nil, err + } else if err := downloadPool.Wait(); err != nil { + return nil, err + } + return pgoProfile, nil +} + +// searchDownloadMergePGOEndpoint queries the profiles and downloads them using +// the new pgo endpoint. Then it merges hte profiles into a single profile using +// the pgo endpoint. +func searchDownloadMergePGOEndpoint(ctx context.Context, log *slog.Logger, client *Client, queries []SearchQuery) (*MergedProfile, error) { + download, err := client.SearchAndDownloadProfiles(ctx, queries) + if err != nil { + return nil, err + } + return download.MergedProfile(log) +} + +// MergedProfile is the result of merging multiple profiles. +type MergedProfile struct { + mu sync.Mutex + profile *profile.Profile + profileIDs []string +} + +// Merge merges prof into the current profile. Callers must not use prof after +// calling Merge. +func (p *MergedProfile) Merge(id string, prof *profile.Profile) (err error) { + // Drop labels to reduce profile size + for _, s := range prof.Sample { + s.Label = nil + } + + // Acquire lock to access p fields + p.mu.Lock() + defer p.mu.Unlock() + + // Append profile ID + p.profileIDs = append(p.profileIDs, id) + + // First profile? No need to merge. + if p.profile == nil { + p.profile = prof + return nil + } + + // Merge profiles after the first one. + p.profile, err = profile.Merge([]*profile.Profile{p.profile, prof}) + return +} + +// Write writes the merged profile to dst and returns the number of bytes +// written. +func (p *MergedProfile) Write(dst string) (int64, error) { + file, err := os.Create(dst) + if err != nil { + return 0, err + } + defer file.Close() + + cw := &countingWriter{W: file} + if err := p.profile.Write(cw); err != nil { + return cw.N, err + } + return cw.N, file.Close() +} + +// Samples returns the number of samples in the merged profile. +func (p *MergedProfile) Samples() int { + return len(p.profile.Sample) +} + +// DebugQuery returns a query string that can be used to view the profiles that +// went into the merged profile. +func (p *MergedProfile) DebugQuery() string { + return "profile-id:(" + strings.Join(p.profileIDs, " OR ") + ")" +} + +// ProfileDownload is the result of downloading a profile. +type ProfileDownload struct { + data []byte +} + +// ExtractCPUProfile extracts the CPU profile from the download. +func (d ProfileDownload) ExtractCPUProfile() ([]byte, error) { + zr, err := zip.NewReader(bytes.NewReader(d.data), int64(len(d.data))) + if err != nil { + return nil, err + } + for _, f := range zr.File { + if filepath.Base(f.Name) == "cpu.pprof" { + rc, err := f.Open() + if err != nil { + return nil, err + } + defer rc.Close() + return io.ReadAll(rc) + } + } + + return nil, errors.New("no cpu.pprof found in download") +} + +// ProfilesDownload is the result of downloading several profiles from the pgo +// endpoint. +type ProfilesDownload struct { + data []byte +} + +// MergeProfile merges the profiles in the download into a single profile. +func (d *ProfilesDownload) MergedProfile(log *slog.Logger) (*MergedProfile, error) { + zr, err := zip.NewReader(bytes.NewReader(d.data), int64(len(d.data))) + if err != nil { + return nil, err + } + + var pgoProfile = &MergedProfile{} + for _, f := range zr.File { + rc, err := f.Open() + if err != nil { + return nil, err + } + prof, err := profile.Parse(rc) + if err != nil { + return nil, err + } + if err := pgoProfile.Merge(f.Name, prof); err != nil { + return nil, err + } + + seconds := prof.TimeNanos / int64(time.Second) + nanoseconds := prof.TimeNanos % int64(time.Second) + t := time.Unix(seconds, nanoseconds) + + cores, err := cpuCores(prof) + if err != nil { + log.Warn("failed to extract cpu cores", "error", err) + } + + log.Info( + "extracted profile", + // "service", p.Service, TODO: can we get this? + "cpu-cores", float64(int(cores*10))/10, + "duration", time.Duration(prof.DurationNanos), + "age", time.Since(t).Round(time.Second), + "profile-id", f.Name, + ) + if err := rc.Close(); err != nil { + return nil, err + } + } + + return pgoProfile, nil +} + +// cpuCores returns the number of CPU cores used in the profile. +func cpuCores(prof *profile.Profile) (float64, error) { + cpuIdx := -1 + for idx, st := range prof.SampleType { + if st.Type == "cpu" && st.Unit == "nanoseconds" { + cpuIdx = idx + break + } + } + if cpuIdx == -1 { + return 0, errors.New("no cpu sample type found") + } + var cpuNanos int64 + for _, s := range prof.Sample { + if len(s.Value) <= int(cpuIdx) { + return 0, errors.New("invalid sample value") + } + cpuNanos += s.Value[cpuIdx] + } + return float64(cpuNanos) / float64(prof.DurationNanos), nil +} + +// wrapErr wraps the error with name if it is not nil. +func wrapErr(err *error, name string) { + if *err != nil { + *err = fmt.Errorf("%s: %w", name, *err) + } +} + +// timeSinceRoundMS returns the time since t rounded to the nearest millisecond. +func timeSinceRoundMS(t time.Time) time.Duration { + return time.Since(t) / time.Millisecond * time.Millisecond +} + +// countingWriter counts the number of bytes written to W. +type countingWriter struct { + W io.Writer + N int64 +} + +// Write writes p to W and updates N. +func (c *countingWriter) Write(p []byte) (n int, err error) { + n, err = c.W.Write(p) + c.N += int64(n) + return +} + +// loggedError is an error that has been logged. +type loggedError struct { + error +} + +// handledError is an error that has been handled. +type handledError struct { + error +} diff --git a/scripts/update_readme.go b/scripts/update_readme.go new file mode 100644 index 0000000..bb7e189 --- /dev/null +++ b/scripts/update_readme.go @@ -0,0 +1,75 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "os" + "os/exec" + "regexp" + "strings" +) + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func run() error { + modeF := flag.String("mode", "write", "update readme") + flag.Parse() + + help, err := helpText() + if err != nil { + return fmt.Errorf("failed to get help text: %w", err) + } + + readme, err := readmeMarkdown() + if err != nil { + return fmt.Errorf("failed to get README.md: %w", err) + } + + newReadme := updateReadme(readme, help) + switch *modeF { + case "write": + return os.WriteFile("README.md", []byte(newReadme), 0644) + case "check": + if newReadme != readme { + return fmt.Errorf("README.md is out of date") + } + case "print": + fmt.Fprintf(os.Stdout, "%s", newReadme) + } + return nil +} + +func updateReadme(readme string, help string) string { + newReadme := replaceSection(readme, "scripts/update_readme.go", "```\n"+help+"```\n") + newReadme = strings.TrimSpace(newReadme) + "\n" + return newReadme +} + +func replaceSection(input, section, value string) string { + marker := fmt.Sprintf("", section) + return regexp. + MustCompile("(?s)"+marker+".*?"+marker). + ReplaceAllString(input, marker+"\n"+value+marker) +} + +func readmeMarkdown() (string, error) { + data, err := os.ReadFile("README.md") + return string(data), err +} + +func helpText() (string, error) { + buf := &bytes.Buffer{} + c := exec.Command("go", "run", ".", "-h") + c.Stdout = buf + c.Stderr = buf + if err := c.Run(); err != nil { + return "", err + } + return buf.String(), nil +}