Skip to content

Commit

Permalink
Don't create special config cluster. (#129)
Browse files Browse the repository at this point in the history
In the current implementation, the GKE deployer creates a cluster for
each region where the application is being deployed. Also, it creates a
config cluster in "us-central1" where the controller is being deployed.

This is not great, because for each user project, we create an
additional cluster which translates into cloud costs for the user.

This PR deploys the controller in one of the clusters where applications
that are part of the same user project are being deployer.

If this is the first time the user deploys an app in their project, the
controller is deployed in the first region where the app is being
deployed. However, if the controller is already running in some cluster,
it will run there forever.

The caveat with this approach is that if the user decides some day to
not deploy anything anymore in the region where the controller has been
running, we'll still run the controller in that region.
  • Loading branch information
rgrandl authored Mar 7, 2024
1 parent a3952f4 commit 5e4b69f
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 72 deletions.
20 changes: 11 additions & 9 deletions cmd/weaver-gke/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ import (
)

var (
storeFlags = newCloudFlagSet("store", flag.ContinueOnError)
storeRegion = storeFlags.String("region", gke.ConfigClusterRegion,
`Cloud region where the store resides. Default value is the region of
the Service Weaver configuration cluster.`)
storeCluster = storeFlags.String("cluster", gke.ConfigClusterName,
`GKE cluster where the store resides. Default value is the name of the
Service Weaver configuration cluster.`)

storeSpec = tool.StoreSpec{
storeFlags = newCloudFlagSet("store", flag.ContinueOnError)
storeRegion = storeFlags.String("region", "", `Cloud region where the store resides.`)
storeCluster = storeFlags.String("cluster", "", `GKE cluster where the store resides.`)
storeSpec = tool.StoreSpec{
Tool: "weaver gke",
Flags: storeFlags.FlagSet,
Store: func(ctx context.Context) (store.Store, error) {
if *storeRegion == "" {
return nil, fmt.Errorf("must specify --region flag")
}
if *storeCluster == "" {
return nil, fmt.Errorf("must specify --cluster flag. Note that" +
"the cluster should be either serviceweaver or serviceweaver-config")
}
config, err := storeFlags.CloudConfig()
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/gke/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type ClusterInfo struct {
func GetClusterInfo(ctx context.Context, config CloudConfig, cluster, region string) (*ClusterInfo, error) {
// Fetch cluster credentials to the local machine.
kubeFileName := filepath.Join(
os.TempDir(), fmt.Sprintf("serviceweaver_%s_%s", cluster, uuid.New().String()))
os.TempDir(), fmt.Sprintf("serviceweaver_%s", uuid.New().String()))
if _, err := runGcloud(config, "", cmdOptions{
EnvOverrides: []string{
fmt.Sprintf("KUBECONFIG=%s", kubeFileName),
Expand Down Expand Up @@ -137,7 +137,7 @@ func isZone(location string) bool {
return strings.Count(location, "-") > 1
}

func fillClusterInfo(ctx context.Context, cluster, region string, cc CloudConfig, kc *rest.Config) (*ClusterInfo, error) {
func fillClusterInfo(_ context.Context, cluster, region string, cc CloudConfig, kc *rest.Config) (*ClusterInfo, error) {
// Avoid Kubernetes' low default QPS limit.
kc.QPS = math.MaxInt
kc.Burst = math.MaxInt
Expand Down
146 changes: 118 additions & 28 deletions internal/gke/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const (
caName = "serviceweaver-ca"
caPoolName = "serviceweaver-ca"
caOrganization = "serviceweaver"
caLocation = ConfigClusterRegion
caLocation = "us-central1"

// Serving port for the nanny.
nannyServingPort = 80
Expand Down Expand Up @@ -255,7 +255,7 @@ downloaded and installed in the container. Do you want to proceed? [Y/n] `)
stop(err)
}
}()
configCluster, externalGatewayIP, err := prepareProject(childCtx, config, cfg)
_, externalGatewayIP, err := prepareProject(childCtx, config, cfg)
if err != nil {
stop(err)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ to use CloudDNS for name resolution [1].
fmt.Fprintln(os.Stderr, b.String())

// Build the rollout request.
req := buildRolloutRequest(configCluster, cfg)
req := buildRolloutRequest(cfg)
return req, nil
}

Expand Down Expand Up @@ -455,15 +455,15 @@ func prepareProject(ctx context.Context, config CloudConfig, cfg *config.GKEConf
}

// Ensure the Service Weaver configuration cluster is setup.
configCluster, globalGatewayIP, err := ensureConfigCluster(ctx, config, cfg, ConfigClusterName, ConfigClusterRegion, bindings)
configCluster, globalGatewayIP, err := ensureConfigCluster(ctx, config, cfg, bindings)
if err != nil {
return nil, "", err
}

// Ensure that a cluster is started in each deployment region and that
// a distributor and a manager are running in each cluster.
for _, region := range cfg.Regions {
cluster, gatewayIP, err := ensureApplicationCluster(ctx, config, cfg, applicationClusterName, region)
cluster, gatewayIP, err := ensureApplicationCluster(ctx, config, cfg, region)
if err != nil {
return nil, "", err
}
Expand All @@ -474,12 +474,10 @@ func prepareProject(ctx context.Context, config CloudConfig, cfg *config.GKEConf
return configCluster, globalGatewayIP, nil
}

func buildRolloutRequest(configCluster *ClusterInfo, cfg *config.GKEConfig) *controller.RolloutRequest {
req := &controller.RolloutRequest{
Config: cfg,
}
func buildRolloutRequest(cfg *config.GKEConfig) *controller.RolloutRequest {
req := &controller.RolloutRequest{Config: cfg}
for _, region := range cfg.Regions {
// NOTE: distributor address must be resolveable from anywhere inside
// NOTE: distributor address must be resolvable from anywhere inside
// the project's VPC.
distributorAddr :=
fmt.Sprintf("https://distributor.%s.svc.%s-%s:80", namespaceName, applicationClusterName, region)
Expand Down Expand Up @@ -956,11 +954,92 @@ func createServiceAccountsIAMPolicyRole(ctx context.Context, config CloudConfig)
})
}

// getConfigCluster returns the name of the cluster and the name of the region
// that manages all the applications within a Service Weaver project.
func getConfigCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig) (string, string, error) {
name, region, err := getRunningConfigCluster(config)
if err != nil {
return "", "", err
}
if name == "" && region == "" {
// No config cluster. Set up a config cluster in the first region from the
// list of regions where to rollout the app, as specified by the user.
return applicationClusterName, cfg.Regions[0], nil
}

// Check if the config cluster actually exists.
exists, err := hasCluster(ctx, config, name, region)
if err != nil {
return "", "", err
}
if exists {
return name, region, nil
}
// The config cluster doesn't exist; disable ingress.
// TODO(rgrandl): we have to force the deletion of the ingress, because some
// other resources that depend on the ingress might be lingering around. We
// should delete all these resources as well.
_, err = runGcloud(config,
fmt.Sprintf("Disable multi-cluster ingress for cluster %q in %q",
name, region), cmdOptions{},
"container", "fleet", "ingress", "disable", "--force")

// The name of new config clusters will always be applicationClusterName.
name = applicationClusterName
if err != nil {
return "", "", err
}

// Set up the config cluster in the first region from the list of regions
// where to rollout the app, as specified by the user.
return name, cfg.Regions[0], nil
}

// getRunningConfigCluster returns information about the config cluster that
// manages all the applications within a Service Weaver project.
//
// Returns empty cluster name, and empty region if no config cluster is running.
func getRunningConfigCluster(config CloudConfig) (string, string, error) {
// Check if there is a config cluster already running.
out, err := runGcloud(config, "", cmdOptions{},
"container", "fleet", "ingress", "describe",
"--format=value(spec.multiclusteringress.configMembership)")
if err != nil {
// There is no config cluster.
return "", "", nil
}

// There is a config cluster entry. Extract the region where the config
// cluster should be running.
out = strings.TrimSuffix(out, "\n") // remove trailing newline
var clusterName, region string
if strings.Contains(out, "serviceweaver-config") {
// Legacy config cluster name.
clusterName = "serviceweaver-config"
} else if strings.Contains(out, applicationClusterName) {
// New config clusters should always be named applicationClusterName.
clusterName = applicationClusterName
} else {
// Unable to find ingress cluster.
return "", "", fmt.Errorf("unable to find ingress cluster for project %v", config.Project)
}

regionMatcher := fmt.Sprintf("projects/%s/locations/global/memberships/%s", config.Project, clusterName)
if _, err := fmt.Sscanf(out, regionMatcher+"-%s", &region); err != nil {
return "", "", fmt.Errorf("unable to parse config region name: %v", err)
}
return clusterName, region, nil
}

// ensureConfigCluster sets up a Service Weaver configuration cluster, returning the
// cluster information and the IP address of the gateway that routes ingress
// traffic to all Service Weaver applications.
func ensureConfigCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, name, region string, bindings iamBindings) (*ClusterInfo, string, error) {
cluster, err := ensureManagedCluster(ctx, config, cfg, name, region)
func ensureConfigCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, bindings iamBindings) (*ClusterInfo, string, error) {
name, region, err := getConfigCluster(ctx, config, cfg)
if err != nil {
return nil, "", err
}
cluster, err := ensureManagedCluster(ctx, config, name, region)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -1022,8 +1101,8 @@ func ensureConfigCluster(ctx context.Context, config CloudConfig, cfg *config.GK
// and running in the given region and is set up to host Service Weaver applications.
// It returns the cluster information and the IP address of the gateway that
// routes internal traffic to Service Weaver applications in the cluster.
func ensureApplicationCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, clusterName, region string) (*ClusterInfo, string, error) {
cluster, err := ensureManagedCluster(ctx, config, cfg, clusterName, region)
func ensureApplicationCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, region string) (*ClusterInfo, string, error) {
cluster, err := ensureManagedCluster(ctx, config, applicationClusterName, region)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -1162,7 +1241,7 @@ func ensureApplicationCluster(ctx context.Context, config CloudConfig, cfg *conf

// ensureManagedCluster ensures that a Service Weaver managed cluster is available
// and running in the given region.
func ensureManagedCluster(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, name, region string) (*ClusterInfo, error) {
func ensureManagedCluster(ctx context.Context, config CloudConfig, name, region string) (*ClusterInfo, error) {
exists, err := hasCluster(ctx, config, name, region)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1351,8 +1430,7 @@ func hasCluster(ctx context.Context, config CloudConfig, name, region string) (b
}
defer client.Close()
if _, err = client.GetCluster(ctx, &containerpb.GetClusterRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
config.Project, region, name),
Name: fmt.Sprintf("projects/%s/locations/%s/clusters/%s", config.Project, region, name),
}); err != nil {
if isNotFound(err) {
return false, nil
Expand Down Expand Up @@ -1553,7 +1631,7 @@ func registerWithFleet(ctx context.Context, config CloudConfig, cluster *Cluster

// unregisterFromFleet removes the given cluster's registration with the project
// fleet, if one exists.
func unregisterFromFleet(ctx context.Context, config CloudConfig, name, region string) error {
func unregisterFromFleet(_ context.Context, config CloudConfig, name, region string) error {
mName := fmt.Sprintf("%s-%s", name, region)

// TODO(spetrovic): Implement these un-registrations using the Go API.
Expand Down Expand Up @@ -1604,7 +1682,7 @@ func waitForServiceExportsResource(ctx context.Context, cluster *ClusterInfo) er
}

// ensureMultiClusterIngress ensures multi-cluster ingress is enabled
// fo the given (config) cluster.
// for the given (config) cluster.
func ensureMultiClusterIngress(cluster *ClusterInfo) error {
fName := fmt.Sprintf("projects/%s/locations/global/memberships/%s-%s",
cluster.CloudConfig.Project, cluster.Name, cluster.Region)
Expand All @@ -1629,17 +1707,25 @@ func ensureMultiClusterIngress(cluster *ClusterInfo) error {
return err
}
}

// Ingress feature enabled: see if it's for our cluster.
out = strings.TrimSuffix(out, "\n") // remove trailing newline
if out == fName {
return nil
}
// Update ingress feature to point to our cluster.
_, err = runGcloud(cluster.CloudConfig,
fmt.Sprintf("Updating multi-cluster ingress for cluster %q in %q",
cluster.Name, cluster.Region), cmdOptions{},
"container", "fleet", "ingress", "update", "--config-membership",
fName, "--quiet")

// Update ingress feature to point to our cluster. Retry twice since it sometimes
// takes more than two minutes for the ingress controller to be updated.
for i := 0; i < 2; i++ {
_, err = runGcloud(cluster.CloudConfig,
fmt.Sprintf("Updating multi-cluster ingress for cluster %q in %q",
cluster.Name, cluster.Region), cmdOptions{},
"container", "fleet", "ingress", "update", "--config-membership",
fName, "--quiet")
if err == nil {
break
}
}
return err
}

Expand Down Expand Up @@ -1760,7 +1846,11 @@ func ensureRegionalInternalGateway(ctx context.Context, cluster *ClusterInfo) (s
// ensureWeaverServices ensures that Service Weaver services (i.e., controller and
// all needed distributors and managers) are running.
func ensureWeaverServices(ctx context.Context, config CloudConfig, cfg *config.GKEConfig, toolImageURL string) error {
if err := ensureController(ctx, config, toolImageURL); err != nil {
name, region, err := getConfigCluster(ctx, config, cfg)
if err != nil {
return err
}
if err := ensureController(ctx, config, name, region, toolImageURL); err != nil {
return err
}
for _, region := range cfg.Regions {
Expand All @@ -1779,8 +1869,8 @@ func ensureWeaverServices(ctx context.Context, config CloudConfig, cfg *config.G
}

// ensureController ensures that a controller is running in the config cluster.
func ensureController(ctx context.Context, config CloudConfig, toolImageURL string) error {
cluster, err := GetClusterInfo(ctx, config, ConfigClusterName, ConfigClusterRegion)
func ensureController(ctx context.Context, config CloudConfig, clusterName, region string, toolImageURL string) error {
cluster, err := GetClusterInfo(ctx, config, clusterName, region)
if err != nil {
return err
}
Expand Down
9 changes: 1 addition & 8 deletions internal/gke/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"log/slog"
"math"
"net"
"os"
Expand All @@ -28,8 +29,6 @@ import (
"text/template"
"time"

"log/slog"

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/ServiceWeaver/weaver-gke/internal/config"
"github.com/ServiceWeaver/weaver-gke/internal/nanny"
Expand Down Expand Up @@ -70,12 +69,6 @@ const (
// Name of Service Weaver application clusters.
applicationClusterName = "serviceweaver"

// Name of a Service Weaver configuration cluster.
ConfigClusterName = "serviceweaver-config"

// Region for the Service Weaver configuration cluster.
ConfigClusterRegion = "us-central1"

// Name of the backend config used for configuring application listener
// backends.
backendConfigName = "serviceweaver"
Expand Down
4 changes: 2 additions & 2 deletions internal/gke/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func newGCPCatter(ctx context.Context, config CloudConfig, q wlogging.Query) (*g
if err != nil {
return nil, fmt.Errorf("error translating query %s: %v", q, err)
}

client, err := logadmin.NewClient(ctx, config.Project, config.ClientOptions()...)
if err != nil {
return nil, fmt.Errorf("error creating logadmin client: %w", err)
Expand Down Expand Up @@ -412,7 +411,8 @@ func Translate(project string, query wlogging.Query) (string, error) {
// TODO(mwhittaker): Restrict based on location.
fmt.Fprintf(&b, ` AND resource.type="k8s_container"`)
fmt.Fprintf(&b, ` AND resource.labels.project_id=%q`, project)
fmt.Fprintf(&b, ` AND (resource.labels.cluster_name=%q OR resource.labels.cluster_name=%q)`, applicationClusterName, ConfigClusterName)
fmt.Fprintf(&b, ` AND (resource.labels.cluster_name=%q OR resource.labels.cluster_name=%q)`,
applicationClusterName, fmt.Sprintf("%s-config", applicationClusterName))
fmt.Fprintf(&b, ` AND resource.labels.namespace_name=%q`, namespaceName)
fmt.Fprintf(&b, ` AND (resource.labels.container_name=%q OR resource.labels.container_name=%q)`, appContainerName, nannyContainerName)
fmt.Fprintf(&b, ` AND logName="projects/%s/logs/serviceweaver"`, project)
Expand Down
7 changes: 6 additions & 1 deletion internal/gke/nanny.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ func runNannyServer(ctx context.Context, server *http.Server, lis net.Listener)
// Controller returns the HTTP address of the controller and an HTTP client
// that can be used to contact the controller.
func Controller(ctx context.Context, config CloudConfig) (string, *http.Client, error) {
configCluster, err := GetClusterInfo(ctx, config, ConfigClusterName, ConfigClusterRegion)
name, region, err := getRunningConfigCluster(config)
if err != nil || region == "" {
return "", nil, err
}

configCluster, err := GetClusterInfo(ctx, config, name, region)
if err != nil {
return "", nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions internal/gke/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,18 @@ func deleteRepository(ctx context.Context, config CloudConfig) (func() error, er

// deleteConfigCluster deletes the Service Weaver configuration cluster.
func deleteConfigCluster(ctx context.Context, config CloudConfig) (func() error, error) {
exists, err := hasCluster(ctx, config, ConfigClusterName, ConfigClusterRegion)
name, region, err := getRunningConfigCluster(config)
if err != nil || region == "" { // already deleted
return nil, nil
}
exists, err := hasCluster(ctx, config, name, region)
if err != nil {
return nil, err
}
if !exists { // already deleted
return nil, nil
}
return deleteCluster(ctx, config, ConfigClusterName, ConfigClusterRegion)
return deleteCluster(ctx, config, name, region)
}

// getApplicationClusters returns the list of Service Weaver application clusters.
Expand Down
Loading

0 comments on commit 5e4b69f

Please sign in to comment.