From 6dfd1d514917518cd28c59b0226c4c627a5880c6 Mon Sep 17 00:00:00 2001 From: Utku Ozdemir Date: Fri, 10 May 2024 12:08:16 +0200 Subject: [PATCH] feat: implement state storage On a best-effort basis, store the state on the disk periodically and on shutdown & restore it from the disk on startup. Additionally, bump Go version, deps & rekres. Closes siderolabs/discovery-service#54. Signed-off-by: Utku Ozdemir --- .dockerignore | 3 +- .github/workflows/ci.yaml | 48 +- .golangci.yml | 80 +- .kres.yaml | 9 + Dockerfile | 113 +- Makefile | 43 +- api/storage/storage.pb.go | 411 +++++++ api/storage/storage.proto | 34 + api/storage/storage_vtproto.pb.go | 1064 ++++++++++++++++++ cmd/discovery-service/main.go | 27 + cmd/snapshot-decoder/main.go | 54 + go.mod | 13 +- go.sum | 18 +- internal/landing/html/index.html | 6 - internal/state/affiliate_test.go | 2 +- internal/state/cluster_test.go | 2 +- internal/state/snapshot.go | 130 +++ internal/state/storage/protobuf.go | 83 ++ internal/state/storage/storage.go | 392 +++++++ internal/state/storage/storage_bench_test.go | 86 ++ internal/state/storage/storage_test.go | 290 +++++ pkg/server/client_test.go | 6 +- pkg/server/server_test.go | 6 +- pkg/server/version_test.go | 2 - 24 files changed, 2796 insertions(+), 126 deletions(-) create mode 100644 api/storage/storage.pb.go create mode 100644 api/storage/storage.proto create mode 100644 api/storage/storage_vtproto.pb.go create mode 100644 cmd/snapshot-decoder/main.go create mode 100644 internal/state/snapshot.go create mode 100644 internal/state/storage/protobuf.go create mode 100644 internal/state/storage/storage.go create mode 100644 internal/state/storage/storage_bench_test.go create mode 100644 internal/state/storage/storage_test.go diff --git a/.dockerignore b/.dockerignore index 8e27148..81972d2 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,8 +1,9 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2023-01-30T15:45:28Z by kres latest. +# Generated on 2024-05-09T09:59:51Z by kres 1e986af. * +!api !cmd !internal !pkg diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9782318..324c1e1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-10T10:40:41Z by kres 1e986af. name: default concurrency: @@ -31,7 +31,7 @@ jobs: if: (!startsWith(github.head_ref, 'renovate/') && !startsWith(github.head_ref, 'dependabot/')) services: buildkitd: - image: moby/buildkit:v0.12.5 + image: moby/buildkit:v0.13.2 options: --privileged ports: - 1234:1234 @@ -45,11 +45,12 @@ jobs: run: | git fetch --prune --unshallow - name: Set up Docker Buildx + id: setup-buildx uses: docker/setup-buildx-action@v3 with: driver: remote endpoint: tcp://127.0.0.1:1234 - timeout-minutes: 1 + timeout-minutes: 10 - name: base run: | make base @@ -60,8 +61,11 @@ jobs: run: | make unit-tests-race - name: coverage - run: | - make coverage + uses: codecov/codecov-action@v4 + with: + files: _out/coverage-unit-tests.txt + token: ${{ secrets.CODECOV_TOKEN }} + timeout-minutes: 3 - name: discovery-service run: | make discovery-service @@ -86,17 +90,42 @@ jobs: run: | make image-discovery-service - name: push-discovery-service-latest - if: github.event_name != 'pull_request' + if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' env: PLATFORM: linux/amd64,linux/arm64 PUSH: "true" run: | - make image-discovery-service TAG=latest + make image-discovery-service IMAGE_TAG=latest + - name: snapshot-decoder + run: | + make snapshot-decoder + - name: Login to registry + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + password: ${{ secrets.GITHUB_TOKEN }} + registry: ghcr.io + username: ${{ github.repository_owner }} + - name: image-snapshot-decoder + run: | + make image-snapshot-decoder + - name: push-snapshot-decoder + if: github.event_name != 'pull_request' + env: + PUSH: "true" + run: | + make image-snapshot-decoder + - name: push-snapshot-decoder-latest + if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' + env: + PUSH: "true" + run: | + make image-snapshot-decoder IMAGE_TAG=latest - name: Generate Checksums if: startsWith(github.ref, 'refs/tags/') run: | - sha256sum _out/discovery-service-* > _out/sha256sum.txt - sha512sum _out/discovery-service-* > _out/sha512sum.txt + sha256sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha256sum.txt + sha512sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha512sum.txt - name: release-notes if: startsWith(github.ref, 'refs/tags/') run: | @@ -109,4 +138,5 @@ jobs: draft: "true" files: |- _out/discovery-service-* + _out/snapshot-decoder-* _out/sha*.txt diff --git a/.golangci.yml b/.golangci.yml index f20e168..6cfe81e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,21 +1,20 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-09T09:47:30Z by kres 1e986af. # options for analysis running run: timeout: 10m issues-exit-code: 1 tests: true - build-tags: [] - skip-dirs: [] - skip-dirs-use-default: true - skip-files: [] + build-tags: [ ] modules-download-mode: readonly # output configuration options output: - format: colored-line-number + formats: + - format: colored-line-number + path: stdout print-issued-lines: true print-linter-name: true uniq-by-line: true @@ -32,54 +31,37 @@ linters-settings: check-blank: true exhaustive: default-signifies-exhaustive: false - funlen: - lines: 60 - statements: 40 gci: - local-prefixes: github.com/siderolabs/discovery-service/ + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + - prefix(github.com/siderolabs/discovery-service/) # Custom section: groups all imports with the specified Prefix. gocognit: min-complexity: 30 - ireturn: - allow: - - anon - - error - - empty - - stdlib - - github.com\/talos-systems\/kres\/internal\/dag.Node nestif: min-complexity: 5 goconst: min-len: 3 min-occurrences: 3 gocritic: - disabled-checks: [] + disabled-checks: [ ] gocyclo: min-complexity: 20 godot: - check-all: false - godox: - keywords: # default keywords are TODO, BUG, and FIXME, these can be overwritten by this setting - - NOTE - - OPTIMIZE # marks code that should be optimized before merging - - HACK # marks hack-arounds that should be removed before merging + scope: declarations gofmt: simplify: true goimports: local-prefixes: github.com/siderolabs/discovery-service/ - golint: - min-confidence: 0.8 - gomnd: - settings: {} - gomodguard: {} + gomodguard: { } govet: - check-shadowing: true enable-all: true lll: line-length: 200 tab-width: 4 misspell: locale: US - ignore-words: [] + ignore-words: [ ] nakedret: max-func-lines: 30 prealloc: @@ -88,16 +70,15 @@ linters-settings: for-loops: false # Report preallocation suggestions on for loops, false by default nolintlint: allow-unused: false - allow-leading-space: false - allow-no-explanation: [] + allow-no-explanation: [ ] require-explanation: false require-specific: true - rowserrcheck: {} - testpackage: {} + rowserrcheck: { } + testpackage: { } unparam: check-exported: false unused: - check-exported: false + local-variables-are-used: false whitespace: multi-if: false # Enforces newlines (or comments) after every multi-line if statement multi-func: false # Enforces newlines (or comments) after every multi-line function signature @@ -113,8 +94,8 @@ linters-settings: gofumpt: extra-rules: false cyclop: - # the maximal code complexity to report - max-complexity: 20 + # the maximal code complexity to report + max-complexity: 20 # depguard: # Main: # deny: @@ -125,48 +106,49 @@ linters: disable-all: false fast: false disable: - - exhaustruct - exhaustivestruct + - exhaustruct + - err113 - forbidigo - funlen - - gas - gochecknoglobals - gochecknoinits - godox - - goerr113 - gomnd - gomoddirectives + - gosec + - inamedparam - ireturn + - mnd - nestif - nonamedreturns - nosnakecase - paralleltest + - tagalign - tagliatelle - thelper - typecheck - varnamelen - wrapcheck - depguard # Disabled because starting with golangci-lint 1.53.0 it doesn't allow denylist alone anymore - - tagalign - - inamedparam - testifylint # complains about our assert recorder and has a number of false positives for assert.Greater(t, thing, 1) - protogetter # complains about us using Value field on typed spec, instead of GetValue which has a different signature - perfsprint # complains about us using fmt.Sprintf in non-performance critical code, updating just kres took too long # abandoned linters for which golangci shows the warning that the repo is archived by the owner + - deadcode + - golint + - ifshort - interfacer - maligned - - golint - scopelint - - varcheck - - deadcode - structcheck - - ifshort + - varcheck # disabled as it seems to be broken - goes into imported libraries and reports issues there - musttag issues: - exclude: [] - exclude-rules: [] + exclude: [ ] + exclude-rules: [ ] exclude-use-default: false exclude-case-sensitive: false max-issues-per-linter: 10 diff --git a/.kres.yaml b/.kres.yaml index 3900766..ab9f56d 100644 --- a/.kres.yaml +++ b/.kres.yaml @@ -15,6 +15,15 @@ spec: GOOS: linux GOARCH: arm64 --- +kind: golang.Generate +spec: + baseSpecPath: /api + vtProtobufEnabled: true + specs: + - source: api/storage/storage.proto + subdirectory: storage + genGateway: false +--- kind: service.CodeCov spec: targetThreshold: 30 diff --git a/Dockerfile b/Dockerfile index fbced5d..9d4fdce 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,17 @@ -# syntax = docker/dockerfile-upstream:1.7.0-labs +# syntax = docker/dockerfile-upstream:1.7.1-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-10T10:40:41Z by kres 1e986af. ARG TOOLCHAIN -# cleaned up specs and compiled versions -FROM scratch AS generate +FROM ghcr.io/siderolabs/ca-certificates:v1.7.0 AS image-ca-certificates -FROM ghcr.io/siderolabs/ca-certificates:v1.6.0 AS image-ca-certificates - -FROM ghcr.io/siderolabs/fhs:v1.6.0 AS image-fhs +FROM ghcr.io/siderolabs/fhs:v1.7.0 AS image-fhs # runs markdownlint -FROM docker.io/node:21.7.1-alpine3.19 AS lint-markdown +FROM docker.io/node:21.7.3-alpine3.19 AS lint-markdown WORKDIR /src RUN npm i -g markdownlint-cli@0.39.0 RUN npm i sentences-per-line@0.2.1 @@ -22,6 +19,10 @@ COPY .markdownlint.json . COPY ./README.md ./README.md RUN markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ignore '**/hack/chglog/**' --rules node_modules/sentences-per-line/index.js . +# collects proto specs +FROM scratch AS proto-specs +ADD api/storage/storage.proto /api/storage/ + # base toolchain image FROM ${TOOLCHAIN} AS toolchain RUN apk --update --no-cache add bash curl build-base protoc protobuf-dev @@ -36,6 +37,18 @@ ENV GOTOOLCHAIN ${GOTOOLCHAIN} ARG GOEXPERIMENT ENV GOEXPERIMENT ${GOEXPERIMENT} ENV GOPATH /go +ARG PROTOBUF_GO_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/protobuf/cmd/protoc-gen-go@v${PROTOBUF_GO_VERSION} +RUN mv /go/bin/protoc-gen-go /bin +ARG GRPC_GO_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v${GRPC_GO_VERSION} +RUN mv /go/bin/protoc-gen-go-grpc /bin +ARG GRPC_GATEWAY_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v${GRPC_GATEWAY_VERSION} +RUN mv /go/bin/protoc-gen-grpc-gateway /bin +ARG VTPROTOBUF_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto@v${VTPROTOBUF_VERSION} +RUN mv /go/bin/protoc-gen-go-vtproto /bin ARG DEEPCOPY_VERSION RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/siderolabs/deep-copy@${DEEPCOPY_VERSION} \ && mv /go/bin/deep-copy /bin/deep-copy @@ -59,26 +72,19 @@ COPY go.sum go.sum RUN cd . RUN --mount=type=cache,target=/go/pkg go mod download RUN --mount=type=cache,target=/go/pkg go mod verify +COPY ./api ./api COPY ./cmd ./cmd COPY ./internal ./internal COPY ./pkg ./pkg RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null -# builds discovery-service-linux-amd64 -FROM base AS discovery-service-linux-amd64-build -COPY --from=generate / / -WORKDIR /src/cmd/discovery-service -ARG GO_BUILDFLAGS -ARG GO_LDFLAGS -RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64 - -# builds discovery-service-linux-arm64 -FROM base AS discovery-service-linux-arm64-build -COPY --from=generate / / -WORKDIR /src/cmd/discovery-service -ARG GO_BUILDFLAGS -ARG GO_LDFLAGS -RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64 +# runs protobuf compiler +FROM tools AS proto-compile +COPY --from=proto-specs / / +RUN protoc -I/api --go_out=paths=source_relative:/api --go-grpc_out=paths=source_relative:/api --go-vtproto_out=paths=source_relative:/api --go-vtproto_opt=features=marshal+unmarshal+size+equal+clone /api/storage/storage.proto +RUN rm /api/storage/storage.proto +RUN goimports -w -local github.com/siderolabs/discovery-service /api +RUN gofumpt -w /api # runs gofumpt FROM base AS lint-gofumpt @@ -93,6 +99,7 @@ FROM base AS lint-golangci-lint WORKDIR /src COPY .golangci.yml . ENV GOGC 50 +RUN golangci-lint config verify --config .golangci.yml RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/.cache/golangci-lint --mount=type=cache,target=/go/pkg golangci-lint run --config .golangci.yml # runs govulncheck @@ -112,14 +119,56 @@ WORKDIR /src ARG TESTPKGS RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg --mount=type=cache,target=/tmp go test -v -covermode=atomic -coverprofile=coverage.txt -coverpkg=${TESTPKGS} -count 1 ${TESTPKGS} +# cleaned up specs and compiled versions +FROM scratch AS generate +COPY --from=proto-compile /api/ /api/ + +FROM scratch AS unit-tests +COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt + +# builds discovery-service-linux-amd64 +FROM base AS discovery-service-linux-amd64-build +COPY --from=generate / / +WORKDIR /src/cmd/discovery-service +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64 + +# builds discovery-service-linux-arm64 +FROM base AS discovery-service-linux-arm64-build +COPY --from=generate / / +WORKDIR /src/cmd/discovery-service +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64 + +# builds snapshot-decoder-linux-amd64 +FROM base AS snapshot-decoder-linux-amd64-build +COPY --from=generate / / +WORKDIR /src/cmd/snapshot-decoder +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-amd64 + +# builds snapshot-decoder-linux-arm64 +FROM base AS snapshot-decoder-linux-arm64-build +COPY --from=generate / / +WORKDIR /src/cmd/snapshot-decoder +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-arm64 + FROM scratch AS discovery-service-linux-amd64 COPY --from=discovery-service-linux-amd64-build /discovery-service-linux-amd64 /discovery-service-linux-amd64 FROM scratch AS discovery-service-linux-arm64 COPY --from=discovery-service-linux-arm64-build /discovery-service-linux-arm64 /discovery-service-linux-arm64 -FROM scratch AS unit-tests -COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt +FROM scratch AS snapshot-decoder-linux-amd64 +COPY --from=snapshot-decoder-linux-amd64-build /snapshot-decoder-linux-amd64 /snapshot-decoder-linux-amd64 + +FROM scratch AS snapshot-decoder-linux-arm64 +COPY --from=snapshot-decoder-linux-arm64-build /snapshot-decoder-linux-arm64 /snapshot-decoder-linux-arm64 FROM discovery-service-linux-${TARGETARCH} AS discovery-service @@ -127,6 +176,12 @@ FROM scratch AS discovery-service-all COPY --from=discovery-service-linux-amd64 / / COPY --from=discovery-service-linux-arm64 / / +FROM snapshot-decoder-linux-${TARGETARCH} AS snapshot-decoder + +FROM scratch AS snapshot-decoder-all +COPY --from=snapshot-decoder-linux-amd64 / / +COPY --from=snapshot-decoder-linux-arm64 / / + FROM scratch AS image-discovery-service ARG TARGETARCH COPY --from=discovery-service discovery-service-linux-${TARGETARCH} /discovery-service @@ -135,3 +190,11 @@ COPY --from=image-ca-certificates / / LABEL org.opencontainers.image.source https://github.com/siderolabs/discovery-service ENTRYPOINT ["/discovery-service"] +FROM scratch AS image-snapshot-decoder +ARG TARGETARCH +COPY --from=snapshot-decoder snapshot-decoder-linux-${TARGETARCH} /snapshot-decoder +COPY --from=image-fhs / / +COPY --from=image-ca-certificates / / +LABEL org.opencontainers.image.source https://github.com/siderolabs/discovery-service +ENTRYPOINT ["/snapshot-decoder"] + diff --git a/Makefile b/Makefile index 5f44ed7..e615cb9 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-10T10:40:41Z by kres 1e986af. # common variables @@ -9,6 +9,7 @@ TAG := $(shell git describe --tag --always --dirty --match v[0-9]\*) ABBREV_TAG := $(shell git describe --tags >/dev/null 2>/dev/null && git describe --tag --always --match v[0-9]\* --abbrev=0 || echo 'undefined') BRANCH := $(shell git rev-parse --abbrev-ref HEAD) ARTIFACTS := _out +IMAGE_TAG ?= $(TAG) WITH_DEBUG ?= false WITH_RACE ?= false REGISTRY ?= ghcr.io @@ -19,10 +20,10 @@ GRPC_GO_VERSION ?= 1.3.0 GRPC_GATEWAY_VERSION ?= 2.19.1 VTPROTOBUF_VERSION ?= 0.6.0 DEEPCOPY_VERSION ?= v0.5.6 -GOLANGCILINT_VERSION ?= v1.56.2 +GOLANGCILINT_VERSION ?= v1.58.0 GOFUMPT_VERSION ?= v0.6.0 -GO_VERSION ?= 1.22.1 -GOIMPORTS_VERSION ?= v0.19.0 +GO_VERSION ?= 1.22.3 +GOIMPORTS_VERSION ?= v0.20.0 GO_BUILDFLAGS ?= GO_LDFLAGS ?= CGO_ENABLED ?= 0 @@ -110,7 +111,7 @@ If you already have a compatible builder instance, you may use that instead. ## Artifacts All artifacts will be output to ./$(ARTIFACTS). Images will be tagged with the -registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(TAG)). +registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(IMAGE_TAG)). The registry and username can be overridden by exporting REGISTRY, and USERNAME respectively. @@ -128,7 +129,7 @@ else GO_LDFLAGS += -s endif -all: unit-tests discovery-service image-discovery-service lint +all: unit-tests discovery-service image-discovery-service snapshot-decoder image-snapshot-decoder lint .PHONY: clean clean: ## Cleans up all artifacts. @@ -140,6 +141,9 @@ target-%: ## Builds the specified target defined in the Dockerfile. The build r local-%: ## Builds the specified target defined in the Dockerfile using the local output type. The build result will be output to the specified local destination. @$(MAKE) target-$* TARGET_ARGS="--output=type=local,dest=$(DEST) $(TARGET_ARGS)" +generate: ## Generate .proto definitions. + @$(MAKE) local-$@ DEST=./ + lint-golangci-lint: ## Runs golangci-lint linter. @$(MAKE) target-$@ @@ -172,10 +176,6 @@ unit-tests: ## Performs unit tests unit-tests-race: ## Performs unit tests with race detection enabled. @$(MAKE) target-$@ -.PHONY: coverage -coverage: ## Upload coverage data to codecov.io. - bash -c "bash <(curl -s https://codecov.io/bash) -f $(ARTIFACTS)/coverage-unit-tests.txt -X fix" - .PHONY: $(ARTIFACTS)/discovery-service-linux-amd64 $(ARTIFACTS)/discovery-service-linux-amd64: @$(MAKE) local-discovery-service-linux-amd64 DEST=$(ARTIFACTS) @@ -202,7 +202,28 @@ lint: lint-golangci-lint lint-gofumpt lint-govulncheck lint-goimports lint-markd .PHONY: image-discovery-service image-discovery-service: ## Builds image for discovery-service. - @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(TAG)" + @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(IMAGE_TAG)" + +.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-amd64 +$(ARTIFACTS)/snapshot-decoder-linux-amd64: + @$(MAKE) local-snapshot-decoder-linux-amd64 DEST=$(ARTIFACTS) + +.PHONY: snapshot-decoder-linux-amd64 +snapshot-decoder-linux-amd64: $(ARTIFACTS)/snapshot-decoder-linux-amd64 ## Builds executable for snapshot-decoder-linux-amd64. + +.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-arm64 +$(ARTIFACTS)/snapshot-decoder-linux-arm64: + @$(MAKE) local-snapshot-decoder-linux-arm64 DEST=$(ARTIFACTS) + +.PHONY: snapshot-decoder-linux-arm64 +snapshot-decoder-linux-arm64: $(ARTIFACTS)/snapshot-decoder-linux-arm64 ## Builds executable for snapshot-decoder-linux-arm64. + +.PHONY: snapshot-decoder +snapshot-decoder: snapshot-decoder-linux-amd64 snapshot-decoder-linux-arm64 ## Builds executables for snapshot-decoder. + +.PHONY: image-snapshot-decoder +image-snapshot-decoder: ## Builds image for snapshot-decoder. + @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/snapshot-decoder:$(IMAGE_TAG)" .PHONY: rekres rekres: diff --git a/api/storage/storage.pb.go b/api/storage/storage.pb.go new file mode 100644 index 0000000..8cfc834 --- /dev/null +++ b/api/storage/storage.pb.go @@ -0,0 +1,411 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v4.24.4 +// source: storage/storage.proto + +package storagepb + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// StateSnapshot is the snapshot of the discovery service state. +// +// We avoid marshalling/unmarshalling to this type directly, as it causes an allocation of all of the clusters at once. +// Instead, we marshal & unmarshal the list of clusters in a streaming fashion. +type StateSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Clusters []*ClusterSnapshot `protobuf:"bytes,1,rep,name=clusters,proto3" json:"clusters,omitempty"` +} + +func (x *StateSnapshot) Reset() { + *x = StateSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StateSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StateSnapshot) ProtoMessage() {} + +func (x *StateSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StateSnapshot.ProtoReflect.Descriptor instead. +func (*StateSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{0} +} + +func (x *StateSnapshot) GetClusters() []*ClusterSnapshot { + if x != nil { + return x.Clusters + } + return nil +} + +// ClusterSnapshot is the snapshot of a cluster with a set of affiliates. +type ClusterSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Affiliates []*AffiliateSnapshot `protobuf:"bytes,2,rep,name=affiliates,proto3" json:"affiliates,omitempty"` +} + +func (x *ClusterSnapshot) Reset() { + *x = ClusterSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClusterSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClusterSnapshot) ProtoMessage() {} + +func (x *ClusterSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClusterSnapshot.ProtoReflect.Descriptor instead. +func (*ClusterSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{1} +} + +func (x *ClusterSnapshot) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ClusterSnapshot) GetAffiliates() []*AffiliateSnapshot { + if x != nil { + return x.Affiliates + } + return nil +} + +// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints. +type AffiliateSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Expiration *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=expiration,proto3" json:"expiration,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + Endpoints []*EndpointSnapshot `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"` +} + +func (x *AffiliateSnapshot) Reset() { + *x = AffiliateSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AffiliateSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AffiliateSnapshot) ProtoMessage() {} + +func (x *AffiliateSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AffiliateSnapshot.ProtoReflect.Descriptor instead. +func (*AffiliateSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{2} +} + +func (x *AffiliateSnapshot) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *AffiliateSnapshot) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *AffiliateSnapshot) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *AffiliateSnapshot) GetEndpoints() []*EndpointSnapshot { + if x != nil { + return x.Endpoints + } + return nil +} + +// EndpointSnapshot is the snapshot of an endpoint of an affiliate. +type EndpointSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Expiration *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=expiration,proto3" json:"expiration,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *EndpointSnapshot) Reset() { + *x = EndpointSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndpointSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndpointSnapshot) ProtoMessage() {} + +func (x *EndpointSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndpointSnapshot.ProtoReflect.Descriptor instead. +func (*EndpointSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{3} +} + +func (x *EndpointSnapshot) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *EndpointSnapshot) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_storage_storage_proto protoreflect.FileDescriptor + +var file_storage_storage_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, + 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x12, 0x45, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, + 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x22, 0x6e, 0x0a, 0x0f, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x4b, 0x0a, + 0x0a, 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x66, 0x66, + 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x0a, + 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, 0x11, 0x41, + 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x3a, 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x48, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, + 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x45, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, + 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0x62, 0x0a, 0x10, 0x45, 0x6e, + 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x3a, + 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, + 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x37, + 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x64, + 0x65, 0x72, 0x6f, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, + 0x6f, 0x72, 0x61, 0x67, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_storage_storage_proto_rawDescOnce sync.Once + file_storage_storage_proto_rawDescData = file_storage_storage_proto_rawDesc +) + +func file_storage_storage_proto_rawDescGZIP() []byte { + file_storage_storage_proto_rawDescOnce.Do(func() { + file_storage_storage_proto_rawDescData = protoimpl.X.CompressGZIP(file_storage_storage_proto_rawDescData) + }) + return file_storage_storage_proto_rawDescData +} + +var file_storage_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_storage_storage_proto_goTypes = []interface{}{ + (*StateSnapshot)(nil), // 0: sidero.discovery.storage.StateSnapshot + (*ClusterSnapshot)(nil), // 1: sidero.discovery.storage.ClusterSnapshot + (*AffiliateSnapshot)(nil), // 2: sidero.discovery.storage.AffiliateSnapshot + (*EndpointSnapshot)(nil), // 3: sidero.discovery.storage.EndpointSnapshot + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp +} +var file_storage_storage_proto_depIdxs = []int32{ + 1, // 0: sidero.discovery.storage.StateSnapshot.clusters:type_name -> sidero.discovery.storage.ClusterSnapshot + 2, // 1: sidero.discovery.storage.ClusterSnapshot.affiliates:type_name -> sidero.discovery.storage.AffiliateSnapshot + 4, // 2: sidero.discovery.storage.AffiliateSnapshot.expiration:type_name -> google.protobuf.Timestamp + 3, // 3: sidero.discovery.storage.AffiliateSnapshot.endpoints:type_name -> sidero.discovery.storage.EndpointSnapshot + 4, // 4: sidero.discovery.storage.EndpointSnapshot.expiration:type_name -> google.protobuf.Timestamp + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_storage_storage_proto_init() } +func file_storage_storage_proto_init() { + if File_storage_storage_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_storage_storage_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StateSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClusterSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AffiliateSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EndpointSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_storage_storage_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_storage_storage_proto_goTypes, + DependencyIndexes: file_storage_storage_proto_depIdxs, + MessageInfos: file_storage_storage_proto_msgTypes, + }.Build() + File_storage_storage_proto = out.File + file_storage_storage_proto_rawDesc = nil + file_storage_storage_proto_goTypes = nil + file_storage_storage_proto_depIdxs = nil +} diff --git a/api/storage/storage.proto b/api/storage/storage.proto new file mode 100644 index 0000000..ff7843d --- /dev/null +++ b/api/storage/storage.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package sidero.discovery.storage; +option go_package = "github.com/siderolabs/discovery-service/api/storagepb"; + +import "google/protobuf/timestamp.proto"; + +// StateSnapshot is the snapshot of the discovery service state. +// +// We avoid marshalling/unmarshalling to this type directly, as it causes an allocation of all of the clusters at once. +// Instead, we marshal & unmarshal the list of clusters in a streaming fashion. +message StateSnapshot { + repeated ClusterSnapshot clusters = 1; +} + +// ClusterSnapshot is the snapshot of a cluster with a set of affiliates. +message ClusterSnapshot { + string id = 1; + repeated AffiliateSnapshot affiliates = 2; +} + +// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints. +message AffiliateSnapshot { + string id = 1; + google.protobuf.Timestamp expiration = 2; + bytes data = 3; + repeated EndpointSnapshot endpoints = 4; +} + +// EndpointSnapshot is the snapshot of an endpoint of an affiliate. +message EndpointSnapshot { + google.protobuf.Timestamp expiration = 1; + bytes data = 2; +} diff --git a/api/storage/storage_vtproto.pb.go b/api/storage/storage_vtproto.pb.go new file mode 100644 index 0000000..4ae0b8c --- /dev/null +++ b/api/storage/storage_vtproto.pb.go @@ -0,0 +1,1064 @@ +// Code generated by protoc-gen-go-vtproto. DO NOT EDIT. +// protoc-gen-go-vtproto version: v0.6.0 +// source: storage/storage.proto + +package storagepb + +import ( + fmt "fmt" + io "io" + + protohelpers "github.com/planetscale/vtprotobuf/protohelpers" + timestamppb1 "github.com/planetscale/vtprotobuf/types/known/timestamppb" + proto "google.golang.org/protobuf/proto" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +func (m *StateSnapshot) CloneVT() *StateSnapshot { + if m == nil { + return (*StateSnapshot)(nil) + } + r := new(StateSnapshot) + if rhs := m.Clusters; rhs != nil { + tmpContainer := make([]*ClusterSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Clusters = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *StateSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ClusterSnapshot) CloneVT() *ClusterSnapshot { + if m == nil { + return (*ClusterSnapshot)(nil) + } + r := new(ClusterSnapshot) + r.Id = m.Id + if rhs := m.Affiliates; rhs != nil { + tmpContainer := make([]*AffiliateSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Affiliates = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ClusterSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *AffiliateSnapshot) CloneVT() *AffiliateSnapshot { + if m == nil { + return (*AffiliateSnapshot)(nil) + } + r := new(AffiliateSnapshot) + r.Id = m.Id + r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT()) + if rhs := m.Data; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.Data = tmpBytes + } + if rhs := m.Endpoints; rhs != nil { + tmpContainer := make([]*EndpointSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Endpoints = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *AffiliateSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *EndpointSnapshot) CloneVT() *EndpointSnapshot { + if m == nil { + return (*EndpointSnapshot)(nil) + } + r := new(EndpointSnapshot) + r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT()) + if rhs := m.Data; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.Data = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *EndpointSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (this *StateSnapshot) EqualVT(that *StateSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if len(this.Clusters) != len(that.Clusters) { + return false + } + for i, vx := range this.Clusters { + vy := that.Clusters[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &ClusterSnapshot{} + } + if q == nil { + q = &ClusterSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *StateSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*StateSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *ClusterSnapshot) EqualVT(that *ClusterSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Id != that.Id { + return false + } + if len(this.Affiliates) != len(that.Affiliates) { + return false + } + for i, vx := range this.Affiliates { + vy := that.Affiliates[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &AffiliateSnapshot{} + } + if q == nil { + q = &AffiliateSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *ClusterSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*ClusterSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *AffiliateSnapshot) EqualVT(that *AffiliateSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Id != that.Id { + return false + } + if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) { + return false + } + if string(this.Data) != string(that.Data) { + return false + } + if len(this.Endpoints) != len(that.Endpoints) { + return false + } + for i, vx := range this.Endpoints { + vy := that.Endpoints[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &EndpointSnapshot{} + } + if q == nil { + q = &EndpointSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *AffiliateSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*AffiliateSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *EndpointSnapshot) EqualVT(that *EndpointSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) { + return false + } + if string(this.Data) != string(that.Data) { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *EndpointSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*EndpointSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (m *StateSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StateSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *StateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Clusters) > 0 { + for iNdEx := len(m.Clusters) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Clusters[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *ClusterSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ClusterSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *ClusterSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Affiliates) > 0 { + for iNdEx := len(m.Affiliates) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Affiliates[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *AffiliateSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AffiliateSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *AffiliateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Endpoints) > 0 { + for iNdEx := len(m.Endpoints) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Endpoints[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x22 + } + } + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x1a + } + if m.Expiration != nil { + size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *EndpointSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *EndpointSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *EndpointSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x12 + } + if m.Expiration != nil { + size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *StateSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Clusters) > 0 { + for _, e := range m.Clusters { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *ClusterSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if len(m.Affiliates) > 0 { + for _, e := range m.Affiliates { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *AffiliateSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Expiration != nil { + l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if len(m.Endpoints) > 0 { + for _, e := range m.Endpoints { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *EndpointSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Expiration != nil { + l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) + return n +} + +func (m *StateSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StateSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Clusters", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Clusters = append(m.Clusters, &ClusterSnapshot{}) + if err := m.Clusters[len(m.Clusters)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ClusterSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ClusterSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ClusterSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Affiliates", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Affiliates = append(m.Affiliates, &AffiliateSnapshot{}) + if err := m.Affiliates[len(m.Affiliates)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AffiliateSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AffiliateSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AffiliateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Expiration == nil { + m.Expiration = ×tamppb.Timestamp{} + } + if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Endpoints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Endpoints = append(m.Endpoints, &EndpointSnapshot{}) + if err := m.Endpoints[len(m.Endpoints)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *EndpointSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: EndpointSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: EndpointSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Expiration == nil { + m.Expiration = ×tamppb.Timestamp{} + } + if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/cmd/discovery-service/main.go b/cmd/discovery-service/main.go index 23d6097..b9dce91 100644 --- a/cmd/discovery-service/main.go +++ b/cmd/discovery-service/main.go @@ -22,6 +22,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "github.com/jonboulle/clockwork" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/siderolabs/discovery-api/api/v1alpha1/server/pb" @@ -37,6 +38,7 @@ import ( "github.com/siderolabs/discovery-service/internal/limiter" _ "github.com/siderolabs/discovery-service/internal/proto" "github.com/siderolabs/discovery-service/internal/state" + "github.com/siderolabs/discovery-service/internal/state/storage" "github.com/siderolabs/discovery-service/pkg/limits" "github.com/siderolabs/discovery-service/pkg/server" ) @@ -49,6 +51,9 @@ var ( devMode = false gcInterval = time.Minute redirectEndpoint = "" + snapshotsEnabled = true + snapshotPath = "/var/discovery-service/state.binpb" + snapshotInterval = 10 * time.Minute ) func init() { @@ -58,6 +63,9 @@ func init() { flag.BoolVar(&devMode, "debug", devMode, "enable debug mode") flag.DurationVar(&gcInterval, "gc-interval", gcInterval, "garbage collection interval") flag.StringVar(&redirectEndpoint, "redirect-endpoint", redirectEndpoint, "redirect all clients to a new endpoint (gRPC endpoint, e.g. 'example.com:443'") + flag.BoolVar(&snapshotsEnabled, "snapshots-enabled", snapshotsEnabled, "enable snapshots") + flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file") + flag.DurationVar(&snapshotInterval, "snapshot-interval", snapshotInterval, "interval to save the snapshot") if debug.Enabled { flag.StringVar(&debugAddr, "debug-addr", debugAddr, "debug (pprof, trace, expvar) listen addr") @@ -189,6 +197,19 @@ func run(ctx context.Context, logger *zap.Logger) error { state := state.NewState(logger) prom.MustRegister(state) + var stateStorage *storage.Storage + + if snapshotsEnabled { + stateStorage = storage.New(snapshotPath, state, logger) + prom.MustRegister(stateStorage) + + if err := stateStorage.Load(); err != nil { + logger.Warn("failed to load state from storage", zap.Error(err)) + } + } else { + logger.Info("snapshots are disabled") + } + srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint) prom.MustRegister(srv) @@ -226,6 +247,12 @@ func run(ctx context.Context, logger *zap.Logger) error { eg, ctx := errgroup.WithContext(ctx) + if snapshotsEnabled { + eg.Go(func() error { + return stateStorage.Start(ctx, clockwork.NewRealClock(), snapshotInterval) + }) + } + eg.Go(func() error { logger.Info("gRPC server starting", zap.Stringer("address", lis.Addr())) diff --git a/cmd/snapshot-decoder/main.go b/cmd/snapshot-decoder/main.go new file mode 100644 index 0000000..2a4bfa4 --- /dev/null +++ b/cmd/snapshot-decoder/main.go @@ -0,0 +1,54 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package main implements a simple tool to decode a snapshot file. +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "os" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +var snapshotPath = "/var/discovery-service/state.binpb" + +func init() { + flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file") +} + +func main() { + flag.Parse() + + if err := run(); err != nil { + log.Fatalf("error: %v", err) + } +} + +func run() error { + data, err := os.ReadFile(snapshotPath) + if err != nil { + return fmt.Errorf("failed to read snapshot: %w", err) + } + + snapshot := &storagepb.StateSnapshot{} + + if err = snapshot.UnmarshalVT(data); err != nil { + return fmt.Errorf("failed to unmarshal snapshot: %w", err) + } + + encoder := json.NewEncoder(os.Stdout) + + encoder.SetIndent("", " ") + + if err = encoder.Encode(snapshot); err != nil { + return fmt.Errorf("failed to encode snapshot: %w", err) + } + + return nil +} diff --git a/go.mod b/go.mod index b3c64d2..db562ea 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,12 @@ module github.com/siderolabs/discovery-service -go 1.22.1 +go 1.22.3 require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 + github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293 + github.com/planetscale/vtprotobuf v0.6.0 github.com/prometheus/client_golang v1.19.0 github.com/siderolabs/discovery-api v0.1.4 github.com/siderolabs/discovery-client v0.1.8 @@ -16,7 +18,7 @@ require ( golang.org/x/sync v0.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.62.1 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 ) require ( @@ -25,15 +27,14 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/planetscale/vtprotobuf v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fefb714..bf91970 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tg github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= +github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293 h1:l3TVsYI+QxIp0CW7YCizx9WG26Lj7DXnc1pdlBKk3gY= +github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -52,14 +54,14 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M= go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -69,8 +71,8 @@ google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/landing/html/index.html b/internal/landing/html/index.html index a55d952..16bce56 100644 --- a/internal/landing/html/index.html +++ b/internal/landing/html/index.html @@ -34,12 +34,6 @@

Details

Each peer receives information back about other affiliates from the discovery service, decrypts it and uses it to drive KubeSpan and cluster discovery.

-

- Moreover, the discovery service has no peristence. - Data is stored in memory only with a TTL set by the clients (i.e. Talos). - The cluster ID is used as a key to select the affiliates (so that different clusters see different affiliates). -

-

To summarize, the discovery service knows the client version, cluster ID, the number of affiliates, some encrypted data for each affiliate, and a list of encrypted endpoints.

diff --git a/internal/state/affiliate_test.go b/internal/state/affiliate_test.go index ec0ff56..c8f3985 100644 --- a/internal/state/affiliate_test.go +++ b/internal/state/affiliate_test.go @@ -106,7 +106,7 @@ func TestAffiliateTooManyEndpoints(t *testing.T) { affiliate := state.NewAffiliate("id1") - for i := 0; i < limits.AffiliateEndpointsMax; i++ { + for i := range limits.AffiliateEndpointsMax { assert.NoError(t, affiliate.MergeEndpoints([][]byte{[]byte(fmt.Sprintf("endpoint%d", i))}, now)) } diff --git a/internal/state/cluster_test.go b/internal/state/cluster_test.go index a21f7de..742d945 100644 --- a/internal/state/cluster_test.go +++ b/internal/state/cluster_test.go @@ -272,7 +272,7 @@ func TestClusterTooManyAffiliates(t *testing.T) { cluster := state.NewCluster("cluster3") - for i := 0; i < limits.ClusterAffiliatesMax; i++ { + for i := range limits.ClusterAffiliatesMax { assert.NoError(t, cluster.WithAffiliate(fmt.Sprintf("af%d", i), func(*state.Affiliate) error { return nil })) diff --git a/internal/state/snapshot.go b/internal/state/snapshot.go new file mode 100644 index 0000000..a2d938d --- /dev/null +++ b/internal/state/snapshot.go @@ -0,0 +1,130 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package state + +import ( + "slices" + + "github.com/siderolabs/gen/xslices" + "google.golang.org/protobuf/types/known/timestamppb" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +// ExportClusterSnapshots exports all cluster snapshots and calls the provided function for each one. +// +// Implements storage.Snapshotter interface. +func (state *State) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error { + var err error + + // reuse the same snapshotin each iteration + clusterSnapshot := &storagepb.ClusterSnapshot{} + + state.clusters.Range(func(_ string, cluster *Cluster) bool { + snapshotCluster(cluster, clusterSnapshot) + + err = f(clusterSnapshot) + + return err == nil + }) + + return err +} + +// ImportClusterSnapshots imports cluster snapshots by calling the provided function until it returns false. +// +// Implements storage.Snapshotter interface. +func (state *State) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error { + for { + clusterSnapshot, ok, err := f() + if err != nil { + return err + } + + if !ok { + break + } + + cluster := clusterFromSnapshot(clusterSnapshot) + + state.clusters.Store(cluster.id, cluster) + } + + return nil +} + +func snapshotCluster(cluster *Cluster, snapshot *storagepb.ClusterSnapshot) { + cluster.affiliatesMu.Lock() + defer cluster.affiliatesMu.Unlock() + + snapshot.Id = cluster.id + + // reuse the same slice, resize it as needed + snapshot.Affiliates = slices.Grow(snapshot.Affiliates, len(cluster.affiliates)) + snapshot.Affiliates = snapshot.Affiliates[:len(cluster.affiliates)] + + i := 0 + for _, affiliate := range cluster.affiliates { + if snapshot.Affiliates[i] == nil { + snapshot.Affiliates[i] = &storagepb.AffiliateSnapshot{} + } + + snapshot.Affiliates[i].Id = affiliate.id + + if snapshot.Affiliates[i].Expiration == nil { + snapshot.Affiliates[i].Expiration = ×tamppb.Timestamp{} + } + + snapshot.Affiliates[i].Expiration.Seconds = affiliate.expiration.Unix() + snapshot.Affiliates[i].Expiration.Nanos = int32(affiliate.expiration.Nanosecond()) + + snapshot.Affiliates[i].Data = affiliate.data + + // reuse the same slice, resize it as needed + snapshot.Affiliates[i].Endpoints = slices.Grow(snapshot.Affiliates[i].Endpoints, len(affiliate.endpoints)) + snapshot.Affiliates[i].Endpoints = snapshot.Affiliates[i].Endpoints[:len(affiliate.endpoints)] + + for j, endpoint := range affiliate.endpoints { + if snapshot.Affiliates[i].Endpoints[j] == nil { + snapshot.Affiliates[i].Endpoints[j] = &storagepb.EndpointSnapshot{} + } + + snapshot.Affiliates[i].Endpoints[j].Data = endpoint.data + + if snapshot.Affiliates[i].Endpoints[j].Expiration == nil { + snapshot.Affiliates[i].Endpoints[j].Expiration = ×tamppb.Timestamp{} + } + + snapshot.Affiliates[i].Endpoints[j].Expiration.Seconds = endpoint.expiration.Unix() + snapshot.Affiliates[i].Endpoints[j].Expiration.Nanos = int32(endpoint.expiration.Nanosecond()) + } + + i++ + } +} + +func clusterFromSnapshot(snapshot *storagepb.ClusterSnapshot) *Cluster { + return &Cluster{ + id: snapshot.Id, + affiliates: xslices.ToMap(snapshot.Affiliates, affiliateFromSnapshot), + } +} + +func affiliateFromSnapshot(snapshot *storagepb.AffiliateSnapshot) (string, *Affiliate) { + return snapshot.Id, &Affiliate{ + id: snapshot.Id, + expiration: snapshot.Expiration.AsTime(), + data: snapshot.Data, + endpoints: xslices.Map(snapshot.Endpoints, endpointFromSnapshot), + } +} + +func endpointFromSnapshot(snapshot *storagepb.EndpointSnapshot) Endpoint { + return Endpoint{ + data: snapshot.Data, + expiration: snapshot.Expiration.AsTime(), + } +} diff --git a/internal/state/storage/protobuf.go b/internal/state/storage/protobuf.go new file mode 100644 index 0000000..5f63fee --- /dev/null +++ b/internal/state/storage/protobuf.go @@ -0,0 +1,83 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage + +import ( + "fmt" + "slices" + + "google.golang.org/protobuf/encoding/protowire" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +const ( + clustersFieldNum = 1 + clustersFieldType = protowire.BytesType +) + +// encodeClusterSnapshot encodes a ClusterSnapshot into the given buffer, resizing it as needed. +// +// It returns the buffer with the encoded ClusterSnapshot and an error if the encoding fails. +func encodeClusterSnapshot(buffer []byte, snapshot *storagepb.ClusterSnapshot) ([]byte, error) { + buffer = protowire.AppendTag(buffer, clustersFieldNum, clustersFieldType) + buffer = protowire.AppendVarint(buffer, uint64(snapshot.SizeVT())) + + startIdx := len(buffer) + clusterSize := snapshot.SizeVT() + + buffer = slices.Grow(buffer, clusterSize) + buffer = buffer[:startIdx+clusterSize] + + if _, err := snapshot.MarshalToSizedBufferVT(buffer[startIdx:]); err != nil { + return nil, fmt.Errorf("failed to marshal cluster: %w", err) + } + + return buffer, nil +} + +// decodeClusterSnapshot decodes a ClusterSnapshot from the given buffer. +// +// If the given buffer size is not enough to decode the ClusterSnapshot, it returns the offset N, which can be positive or negative. +// +// When it is positive, ClusterSnapshot will be nil, and the caller should read N more bytes from the stream and call this function again. +// +// When negative, it means that the buffer had N more data than needed. +func decodeClusterSnapshot(buffer []byte) (*storagepb.ClusterSnapshot, int, error) { + tagNum, tagType, tagEncodedLen := protowire.ConsumeTag(buffer) + if tagEncodedLen < 0 { + return nil, 0, fmt.Errorf("failed to read tag: %w", protowire.ParseError(tagEncodedLen)) + } + + if tagNum != clustersFieldNum { + return nil, 0, fmt.Errorf("unexpected number: %v", tagNum) + } + + if tagType != clustersFieldType { + return nil, 0, fmt.Errorf("unexpected type: %v", tagType) + } + + clusterSize, clusterSizeEncodedLen := protowire.ConsumeVarint(buffer[tagEncodedLen:]) + if clusterSizeEncodedLen < 0 { + return nil, 0, fmt.Errorf("failed to read varint: %w", protowire.ParseError(clusterSizeEncodedLen)) + } + + clusterStartIndex := tagEncodedLen + clusterSizeEncodedLen + clusterEndIndex := clusterStartIndex + int(clusterSize) + offset := clusterEndIndex - len(buffer) + + if offset > 0 { // ask for more data + return nil, offset, nil + } + + var clusterSnapshot storagepb.ClusterSnapshot + + if err := clusterSnapshot.UnmarshalVT(buffer[clusterStartIndex:clusterEndIndex]); err != nil { + return nil, 0, fmt.Errorf("failed to unmarshal cluster snapshot: %w", err) + } + + return &clusterSnapshot, offset, nil +} diff --git a/internal/state/storage/storage.go b/internal/state/storage/storage.go new file mode 100644 index 0000000..ac9412c --- /dev/null +++ b/internal/state/storage/storage.go @@ -0,0 +1,392 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package storage implements persistent storage for the state of the discovery service. +package storage + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "slices" + "sync" + "time" + + "github.com/jonboulle/clockwork" + prom "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +const ( + labelOperation = "operation" + labelStatus = "status" + + operationSave = "save" + operationLoad = "load" + + statusSuccess = "success" + statusError = "error" +) + +// Storage is a persistent storage for the state of the discovery service. +type Storage struct { + state Snapshotter + logger *zap.Logger + + operationsMetric *prom.CounterVec + lastSnapshotSizeMetric *prom.GaugeVec + lastOperationClustersMetric *prom.GaugeVec + lastOperationAffiliatesMetric *prom.GaugeVec + lastOperationEndpointsMetric *prom.GaugeVec + lastOperationDurationMetric *prom.GaugeVec + + path string +} + +// Describe implements prometheus.Collector interface. +func (storage *Storage) Describe(descs chan<- *prom.Desc) { + prom.DescribeByCollect(storage, descs) +} + +// Collect implements prometheus.Collector interface. +func (storage *Storage) Collect(metrics chan<- prom.Metric) { + storage.operationsMetric.Collect(metrics) + storage.lastSnapshotSizeMetric.Collect(metrics) + storage.lastOperationClustersMetric.Collect(metrics) + storage.lastOperationAffiliatesMetric.Collect(metrics) + storage.lastOperationEndpointsMetric.Collect(metrics) + storage.lastOperationDurationMetric.Collect(metrics) +} + +// Snapshotter is an interface for exporting and importing cluster state. +type Snapshotter interface { + // ExportClusterSnapshots exports cluster snapshots to the given function. + ExportClusterSnapshots(f func(*storagepb.ClusterSnapshot) error) error + + // ImportClusterSnapshots imports cluster snapshots from the given function. + ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error +} + +// New creates a new instance of Storage. +func New(path string, state Snapshotter, logger *zap.Logger) *Storage { + return &Storage{ + state: state, + logger: logger.With(zap.String("component", "storage"), zap.String("path", path)), + path: path, + + operationsMetric: prom.NewCounterVec(prom.CounterOpts{ + Name: "discovery_storage_operations_total", + Help: "The total number of storage operations.", + }, []string{labelOperation, labelStatus}), + lastSnapshotSizeMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_snapshot_size_bytes", + Help: "The size of the last processed snapshot in bytes.", + }, []string{labelOperation}), + lastOperationClustersMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_clusters", + Help: "The number of clusters in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationAffiliatesMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_affiliates", + Help: "The number of affiliates in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationEndpointsMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_endpoints", + Help: "The number of endpoints in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationDurationMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_duration_seconds", + Help: "The duration of the last operation in seconds.", + }, []string{labelOperation}), + } +} + +// Start starts the storage loop that periodically saves the state. +func (storage *Storage) Start(ctx context.Context, clock clockwork.Clock, interval time.Duration) error { + storage.logger.Info("start storage loop", zap.Duration("interval", interval)) + + ticker := clock.NewTicker(interval) + + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + storage.logger.Info("received shutdown signal") + + if err := storage.Save(); err != nil { + return fmt.Errorf("failed to save state on shutdown: %w", err) + } + + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + + return ctx.Err() + case <-ticker.Chan(): + if err := storage.Save(); err != nil { + storage.logger.Error("failed to save state", zap.Error(err)) + } + } + } +} + +// Save saves all clusters' states into the persistent storage. +func (storage *Storage) Save() (err error) { + start := time.Now() + + defer func() { + if err != nil { + storage.operationsMetric.WithLabelValues(operationSave, statusError).Inc() + } + }() + + if err = os.MkdirAll(filepath.Dir(storage.path), 0o755); err != nil { + return fmt.Errorf("failed to create directory path: %w", err) + } + + tmpFile, err := getTempFile(storage.path) + if err != nil { + return fmt.Errorf("failed to create temporary file: %w", err) + } + + defer func() { + tmpFile.Close() //nolint:errcheck + os.Remove(tmpFile.Name()) //nolint:errcheck + }() + + stats, err := storage.Export(tmpFile) + if err != nil { + return fmt.Errorf("failed to write snapshot: %w", err) + } + + if err = commitTempFile(tmpFile, storage.path); err != nil { + return fmt.Errorf("failed to commit temporary file: %w", err) + } + + duration := time.Since(start) + + storage.logger.Info("state saved", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates), + zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration)) + + storage.operationsMetric.WithLabelValues(operationSave, statusSuccess).Inc() + storage.lastSnapshotSizeMetric.WithLabelValues(operationSave).Set(float64(stats.Size)) + storage.lastOperationClustersMetric.WithLabelValues(operationSave).Set(float64(stats.NumClusters)) + storage.lastOperationAffiliatesMetric.WithLabelValues(operationSave).Set(float64(stats.NumAffiliates)) + storage.lastOperationEndpointsMetric.WithLabelValues(operationSave).Set(float64(stats.NumEndpoints)) + storage.lastOperationDurationMetric.WithLabelValues(operationSave).Set(duration.Seconds()) + + return nil +} + +// Load loads all clusters' states from the persistent storage. +func (storage *Storage) Load() (err error) { + defer func() { + if err != nil { + storage.operationsMetric.WithLabelValues(operationLoad, statusError).Inc() + } + }() + + start := time.Now() + + // open file for reading + file, err := os.Open(storage.path) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + + defer file.Close() //nolint:errcheck + + stats, err := storage.Import(file) + if err != nil { + return fmt.Errorf("failed to read snapshot: %w", err) + } + + if err = file.Close(); err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + + duration := time.Since(start) + + storage.logger.Info("state loaded", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates), + zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration)) + + storage.operationsMetric.WithLabelValues(operationLoad, statusSuccess).Inc() + storage.lastSnapshotSizeMetric.WithLabelValues(operationLoad).Set(float64(stats.Size)) + storage.lastOperationClustersMetric.WithLabelValues(operationLoad).Set(float64(stats.NumClusters)) + storage.lastOperationAffiliatesMetric.WithLabelValues(operationLoad).Set(float64(stats.NumAffiliates)) + storage.lastOperationEndpointsMetric.WithLabelValues(operationLoad).Set(float64(stats.NumEndpoints)) + storage.lastOperationDurationMetric.WithLabelValues(operationLoad).Set(duration.Seconds()) + + return nil +} + +// Import imports all clusters' states from the given reader. +func (storage *Storage) Import(reader io.Reader) (SnapshotStats, error) { + const bufferSize = 32 + + size := 0 + numClusters := 0 + numAffiliates := 0 + numEndpoints := 0 + + index := 0 + buffer := make([]byte, bufferSize) + + // unmarshal the clusters in a streaming manner and import them into the state + if err := storage.state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) { + for { + numRead, err := reader.Read(buffer[index:]) + size += numRead + + if err != nil { + if err == io.EOF { //nolint:errorlint + return nil, false, nil + } + + return nil, false, fmt.Errorf("failed to read bytes: %w", err) + } + + clusterSnapshot, offset, err := decodeClusterSnapshot(buffer) + if err != nil { + return nil, false, fmt.Errorf("failed to decode cluster: %w", err) + } + + clusterEndIndex := len(buffer) + offset + + if clusterSnapshot == nil { // we need to read more data + buffer = slices.Grow(buffer, offset) + buffer = buffer[:clusterEndIndex] + index += numRead + + continue + } + + // prepare the buffer for the next iteration + buffer = slices.Delete(buffer, 0, clusterEndIndex) // delete the bytes which belong to the processed cluster + index = len(buffer) // set the start index for the next iteration - here, the remaining bytes in the buffer belong to the next cluster + buffer = buffer[:bufferSize] // reset the buffer to the initial size + + // update stats + numClusters++ + numAffiliates += len(clusterSnapshot.Affiliates) + + for _, affiliate := range clusterSnapshot.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + + return clusterSnapshot, true, nil + } + }); err != nil { + return SnapshotStats{}, fmt.Errorf("failed to import clusters: %w", err) + } + + return SnapshotStats{ + Size: size, + NumClusters: numClusters, + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + }, nil +} + +// Export exports all clusters' states into the given writer. +func (storage *Storage) Export(writer io.Writer) (SnapshotStats, error) { + numClusters := 0 + numAffiliates := 0 + numEndpoints := 0 + size := 0 + + var buffer []byte + + // marshal the clusters in a streaming manner and export them into the writer + if err := storage.state.ExportClusterSnapshots(func(snapshot *storagepb.ClusterSnapshot) error { + var err error + + buffer, err = encodeClusterSnapshot(buffer, snapshot) + if err != nil { + return fmt.Errorf("failed to encode cluster: %w", err) + } + + written, err := writer.Write(buffer) + if err != nil { + return fmt.Errorf("failed to write cluster: %w", err) + } + + // prepare the buffer for the next iteration - reset it + buffer = buffer[:0] + + // update stats + size += written + numClusters++ + numAffiliates += len(snapshot.Affiliates) + + for _, affiliate := range snapshot.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + + return nil + }); err != nil { + return SnapshotStats{}, fmt.Errorf("failed to snapshot clusters: %w", err) + } + + return SnapshotStats{ + Size: size, + NumClusters: numClusters, + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + }, nil +} + +func getTempFile(dst string) (*os.File, error) { + tmpFile, err := os.OpenFile(dst+".tmp", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o666) + if err != nil { + return nil, fmt.Errorf("failed to create file: %w", err) + } + + return tmpFile, nil +} + +// commitTempFile commits the temporary file to the destination and removes it. +func commitTempFile(tmpFile *os.File, dst string) error { + renamed := false + closer := sync.OnceValue(tmpFile.Close) + + defer func() { + closer() //nolint:errcheck + + if !renamed { + os.Remove(tmpFile.Name()) //nolint:errcheck + } + }() + + if err := tmpFile.Sync(); err != nil { + return fmt.Errorf("failed to sync data: %w", err) + } + + if err := closer(); err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + + if err := os.Rename(tmpFile.Name(), dst); err != nil { + return fmt.Errorf("failed to rename file: %w", err) + } + + renamed = true + + return nil +} + +// SnapshotStats contains statistics about a snapshot. +type SnapshotStats struct { + Size int + NumClusters int + NumAffiliates int + NumEndpoints int +} diff --git a/internal/state/storage/storage_bench_test.go b/internal/state/storage/storage_bench_test.go new file mode 100644 index 0000000..56b9972 --- /dev/null +++ b/internal/state/storage/storage_bench_test.go @@ -0,0 +1,86 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage_test + +import ( + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + + storagepb "github.com/siderolabs/discovery-service/api/storage" + "github.com/siderolabs/discovery-service/internal/state" + "github.com/siderolabs/discovery-service/internal/state/storage" +) + +func BenchmarkExport(b *testing.B) { + logger := zap.NewNop() + state := buildState(b, b.N, logger) + storage := storage.New("", state, logger) + + b.ReportAllocs() + b.ResetTimer() + + _, err := storage.Export(io.Discard) + require.NoError(b, err) +} + +func testBenchmarkAllocs(t *testing.T, f func(b *testing.B), threshold int64) { + res := testing.Benchmark(f) + + allocs := res.AllocsPerOp() + if allocs > threshold { + t.Fatalf("Expected AllocsPerOp <= %d, got %d", threshold, allocs) + } +} + +func TestBenchmarkExportAllocs(t *testing.T) { + testBenchmarkAllocs(t, BenchmarkExport, 0) +} + +func buildState(tb testing.TB, numClusters int, logger *zap.Logger) *state.State { + i := 0 + state := state.NewState(logger) + + err := state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) { + if i >= numClusters { + return nil, false, nil + } + + affiliates := make([]*storagepb.AffiliateSnapshot, 0, 5) + + for j := range 5 { + affiliates = append(affiliates, &storagepb.AffiliateSnapshot{ + Id: fmt.Sprintf("aff%d", j), + Expiration: timestamppb.New(time.Now().Add(time.Hour)), + Data: []byte(fmt.Sprintf("aff%d data", j)), + }) + } + + if i%2 == 0 { + affiliates[0].Endpoints = []*storagepb.EndpointSnapshot{ + { + Expiration: timestamppb.New(time.Now().Add(time.Hour)), + Data: []byte(fmt.Sprintf("endpoint%d data", i)), + }, + } + } + + i++ + + return &storagepb.ClusterSnapshot{ + Id: fmt.Sprintf("cluster%d", i), + Affiliates: affiliates, + }, true, nil + }) + require.NoError(tb, err) + + return state +} diff --git a/internal/state/storage/storage_test.go b/internal/state/storage/storage_test.go new file mode 100644 index 0000000..4ba865e --- /dev/null +++ b/internal/state/storage/storage_test.go @@ -0,0 +1,290 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage_test + +import ( + "bytes" + "context" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "google.golang.org/protobuf/types/known/timestamppb" + + storagepb "github.com/siderolabs/discovery-service/api/storage" + "github.com/siderolabs/discovery-service/internal/state/storage" +) + +func TestExportData(t *testing.T) { + t.Parallel() + + snapshot, snapshotStats := getTestSnapshot() + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + var buffer bytes.Buffer + + exportStats, err := stateStorage.Export(&buffer) + require.NoError(t, err) + + assert.Equal(t, snapshotStats, exportStats) + + expected, err := snapshot.MarshalVT() + require.NoError(t, err) + + require.Equal(t, expected, buffer.Bytes()) +} + +func TestImportData(t *testing.T) { + t.Parallel() + + snapshot, snapshotStats := getTestSnapshot() + path := filepath.Join(t.TempDir(), "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + data, err := snapshot.MarshalVT() + require.NoError(t, err) + + importStats, err := stateStorage.Import(bytes.NewReader(data)) + require.NoError(t, err) + + require.Equal(t, snapshotStats, importStats) + + loads := state.getLoads() + + require.Len(t, loads, 1) + require.True(t, loads[0].EqualVT(snapshot)) +} + +func TestStorage(t *testing.T) { + t.Parallel() + + snapshot, _ := getTestSnapshot() + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + // test save + + require.NoError(t, stateStorage.Save()) + + expectedData, err := snapshot.MarshalVT() + require.NoError(t, err) + + actualData, err := os.ReadFile(path) + require.NoError(t, err) + + require.Equal(t, expectedData, actualData) + + // test load + + require.NoError(t, stateStorage.Load()) + require.Len(t, state.getLoads(), 1) + require.True(t, snapshot.EqualVT(state.getLoads()[0])) + + // modify, save & load again to assert that the file content gets overwritten + + snapshot.Clusters[1].Affiliates[0].Data = []byte("new aff1 data") + + require.NoError(t, stateStorage.Save()) + require.NoError(t, stateStorage.Load()) + require.Len(t, state.getLoads(), 2) + require.True(t, snapshot.EqualVT(state.getLoads()[1])) +} + +func TestSchedule(t *testing.T) { + t.Parallel() + + clock := clockwork.NewFakeClock() + snapshot, _ := getTestSnapshot() + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + // start the periodic storage and wait for it to block on the timer + + errCh := make(chan error) + + go func() { + errCh <- stateStorage.Start(ctx, clock, 10*time.Minute) + }() + + require.NoError(t, clock.BlockUntilContext(ctx, 1)) + + // advance time to trigger the first snapshot and assert it + + clock.Advance(13 * time.Minute) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 1, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) + + // advance time to trigger the second snapshot and assert it + + clock.Advance(10 * time.Minute) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 2, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) + + // cancel the context to stop the storage loop and wait for it to exit + cancel() + + require.NoError(t, <-errCh) + + // assert that the state was saved on shutdown + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 3, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) +} + +type mockSnapshotter struct { + data *storagepb.StateSnapshot + loads []*storagepb.StateSnapshot + + snapshots int + + lock sync.Mutex +} + +func (m *mockSnapshotter) getSnapshots() int { + m.lock.Lock() + defer m.lock.Unlock() + + return m.snapshots +} + +func (m *mockSnapshotter) getLoads() []*storagepb.StateSnapshot { + m.lock.Lock() + defer m.lock.Unlock() + + return append([]*storagepb.StateSnapshot(nil), m.loads...) +} + +// ExportClusterSnapshots implements storage.Snapshotter interface. +func (m *mockSnapshotter) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.snapshots++ + + for _, cluster := range m.data.Clusters { + if err := f(cluster); err != nil { + return err + } + } + + return nil +} + +// ImportClusterSnapshots implements storage.Snapshotter interface. +func (m *mockSnapshotter) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error { + m.lock.Lock() + defer m.lock.Unlock() + + var clusters []*storagepb.ClusterSnapshot + + for { + cluster, ok, err := f() + if err != nil { + return err + } + + if !ok { + break + } + + clusters = append(clusters, cluster) + } + + m.loads = append(m.loads, &storagepb.StateSnapshot{Clusters: clusters}) + + return nil +} + +func getTestSnapshot() (*storagepb.StateSnapshot, storage.SnapshotStats) { + snapshot := storagepb.StateSnapshot{ + Clusters: []*storagepb.ClusterSnapshot{ + { + Id: "cluster0", + }, + { + Id: "cluster1", + Affiliates: []*storagepb.AffiliateSnapshot{ + { + Id: "aff1", + Expiration: timestamppb.New(time.Now().Add(time.Hour)), + Data: []byte("aff1 data"), + Endpoints: []*storagepb.EndpointSnapshot{ + { + Expiration: timestamppb.New(time.Now().Add(2 * time.Hour)), + Data: []byte("aff1 endpoint1 data"), + }, + { + Expiration: timestamppb.New(time.Now().Add(3 * time.Hour)), + Data: []byte("aff1 endpoint2 data"), + }, + }, + }, + { + Id: "aff2", + Expiration: timestamppb.New(time.Now().Add(2 * time.Hour)), + Data: []byte("aff2 data"), + Endpoints: []*storagepb.EndpointSnapshot{ + { + Expiration: timestamppb.New(time.Now().Add(3 * time.Hour)), + Data: []byte("aff2 endpoint1 data"), + }, + }, + }, + }, + }, + { + Id: "cluster2", + }, + }, + } + + numAffiliates := 0 + numEndpoints := 0 + + for _, cluster := range snapshot.Clusters { + numAffiliates += len(cluster.Affiliates) + + for _, affiliate := range cluster.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + } + + return &snapshot, storage.SnapshotStats{ + NumClusters: len(snapshot.Clusters), + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + Size: snapshot.SizeVT(), + } +} diff --git a/pkg/server/client_test.go b/pkg/server/client_test.go index 6b6892a..697efd0 100644 --- a/pkg/server/client_test.go +++ b/pkg/server/client_test.go @@ -406,8 +406,6 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi eg, ctx := errgroup.WithContext(ctx) for i := range affiliates { - i := i - eg.Go(func() error { return affiliates[i].Run(ctx, logger, notifyCh[i]) }) @@ -446,7 +444,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi expected := make(map[int]struct{}) - for i := 0; i < numAffiliates; i++ { + for i := range numAffiliates { if i != affiliateID { expected[i] = struct{}{} } @@ -493,7 +491,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi // eventually all affiliates should see discovered state const NumAttempts = 50 // 50 * 100ms = 5s - for j := 0; j < NumAttempts; j++ { + for j := range NumAttempts { matches := true for i := range affiliates { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 90dc673..89c24f4 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -407,7 +407,7 @@ func TestValidation(t *testing.T) { t.Run("AffiliateUpdateTooMany", func(t *testing.T) { t.Parallel() - for i := 0; i < limits.ClusterAffiliatesMax; i++ { + for i := range limits.ClusterAffiliatesMax { _, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{ ClusterId: "fatcluster", AffiliateId: fmt.Sprintf("af%d", i), @@ -428,7 +428,7 @@ func TestValidation(t *testing.T) { t.Run("AffiliateUpdateTooManyEndpoints", func(t *testing.T) { t.Parallel() - for i := 0; i < limits.AffiliateEndpointsMax; i++ { + for i := range limits.AffiliateEndpointsMax { _, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{ ClusterId: "smallcluster", AffiliateId: "af", @@ -514,7 +514,7 @@ func testHitRateLimit(client pb.ClusterClient, ip string) func(t *testing.T) { ctx = metadata.AppendToOutgoingContext(ctx, "X-Real-IP", ip) - for i := 0; i < limits.IPRateBurstSizeMax; i++ { + for range limits.IPRateBurstSizeMax { _, err := client.Hello(ctx, &pb.HelloRequest{ ClusterId: "fake", ClientVersion: "v0.12.0", diff --git a/pkg/server/version_test.go b/pkg/server/version_test.go index 003d522..3f1554c 100644 --- a/pkg/server/version_test.go +++ b/pkg/server/version_test.go @@ -22,8 +22,6 @@ func TestParseVersion(t *testing.T) { "v0.14.0-alpha.0-7-gf7d9f211": "v0.14.0-alpha.0-dev", "v0.14.0-alpha.0-7-gf7d9f211-dirty": "v0.14.0-alpha.0-dev", } { - v, expected := v, expected - t.Run(v, func(t *testing.T) { t.Parallel()