Skip to content

Commit

Permalink
feat: Support status sync for Gateway API resources (#1315)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiantao authored Oct 9, 2024
1 parent ecf52ae commit 93317ad
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 243 deletions.
11 changes: 9 additions & 2 deletions pkg/ingress/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
kubecredentials "istio.io/istio/pilot/pkg/credentials/kube"
"istio.io/istio/pilot/pkg/model"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/status"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
Expand All @@ -48,6 +49,7 @@ type gatewayController struct {
store model.ConfigStoreController
credsController credentials.MulticlusterController
istioController *istiogateway.Controller
statusManager *status.Manager

resourceUpToDate atomic.Bool
}
Expand Down Expand Up @@ -76,9 +78,10 @@ func NewController(client kube.Client, options common.Options) common.GatewayCon
istioController.DefaultGatewaySelector = map[string]string{options.GatewaySelectorKey: options.GatewaySelectorValue}
}

var statusManager *status.Manager = nil
if options.EnableStatus {
// TODO: Add status sync support
//istioController.SetStatusWrite(true,)
statusManager = status.NewManager(store)
istioController.SetStatusWrite(true, statusManager)
} else {
IngressLog.Infof("Disable status update for cluster %s", clusterId)
}
Expand All @@ -87,6 +90,7 @@ func NewController(client kube.Client, options common.Options) common.GatewayCon
store: store,
credsController: credsController,
istioController: istioController,
statusManager: statusManager,
}
}

Expand Down Expand Up @@ -148,6 +152,9 @@ func (g *gatewayController) Run(stop <-chan struct{}) {
})
go g.store.Run(stop)
go g.istioController.Run(stop)
if g.statusManager != nil {
g.statusManager.Start(stop)
}
}

func (g *gatewayController) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
Expand Down
132 changes: 95 additions & 37 deletions pkg/ingress/kube/gateway/istio/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,37 @@
package istio

import (
"context"
"fmt"
"sort"
"strconv"
"strings"

networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/model"
serviceRegistryKube "istio.io/istio/pilot/pkg/serviceregistry/kube"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/host"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/util/sets"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// GatewayContext contains a minimal subset of push context functionality to be exposed to GatewayAPIControllers
type GatewayContext struct {
ps *model.PushContext
// Start - Updated by Higress
client kube.Client
domainSuffix string
clusterID cluster.ID
// End - Updated by Higress
}

func NewGatewayContext(ps *model.PushContext) GatewayContext {
return GatewayContext{ps}
// Start - Updated by Higress

func NewGatewayContext(ps *model.PushContext, client kube.Client, domainSuffix string, clusterID cluster.ID) GatewayContext {
return GatewayContext{ps, client, domainSuffix, clusterID}
}

// ResolveGatewayInstances attempts to resolve all instances that a gateway will be exposed on.
Expand All @@ -59,26 +70,20 @@ func (gc GatewayContext) ResolveGatewayInstances(
foundExternal := sets.New[string]()
foundPending := sets.New[string]()
warnings := []string{}

// Cache endpoints to reduce redundant queries
endpointsCache := make(map[string]*corev1.Endpoints)

for _, g := range gwsvcs {
svc, f := gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)][namespace]
if !f {
otherNamespaces := []string{}
for ns := range gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)] {
otherNamespaces = append(otherNamespaces, `"`+ns+`"`) // Wrap in quotes for output
}
if len(otherNamespaces) > 0 {
sort.Strings(otherNamespaces)
warnings = append(warnings, fmt.Sprintf("hostname %q not found in namespace %q, but it was found in namespace(s) %v",
g, namespace, strings.Join(otherNamespaces, ", ")))
} else {
warnings = append(warnings, fmt.Sprintf("hostname %q not found", g))
}
svc := gc.GetService(g, namespace, gvk.Service.Kind)
if svc == nil {
warnings = append(warnings, fmt.Sprintf("hostname %q not found", g))
continue
}
svcKey := svc.Key()

for port := range ports {
instances := gc.ps.ServiceInstancesByPort(svc, port, nil)
if len(instances) > 0 {
exists := checkServicePortExists(svc, port)
if exists {
foundInternal.Insert(fmt.Sprintf("%s:%d", g, port))
if svc.Attributes.ClusterExternalAddresses.Len() > 0 {
// Fetch external IPs from all clusters
Expand All @@ -92,22 +97,30 @@ func (gc GatewayContext) ResolveGatewayInstances(
}
}
} else {
instancesByPort := gc.ps.ServiceInstances(svcKey)
if instancesEmpty(instancesByPort) {
endpoints, ok := endpointsCache[g]
if !ok {
endpoints = gc.GetEndpoints(g, namespace)
endpointsCache[g] = endpoints
}

if endpoints == nil {
warnings = append(warnings, fmt.Sprintf("no instances found for hostname %q", g))
} else {
hintPort := sets.New[string]()
for _, instances := range instancesByPort {
for _, i := range instances {
if i.Endpoint.EndpointPort == uint32(port) {
hintPort.Insert(strconv.Itoa(i.ServicePort.Port))
hintWorkloadPort := false
for _, subset := range endpoints.Subsets {
for _, subSetPort := range subset.Ports {
if subSetPort.Port == int32(port) {
hintWorkloadPort = true
break
}
}
if hintWorkloadPort {
break
}
}
if hintPort.Len() > 0 {
if hintWorkloadPort {
warnings = append(warnings, fmt.Sprintf(
"port %d not found for hostname %q (hint: the service port should be specified, not the workload port. Did you mean one of these ports: %v?)",
port, g, sets.SortedList(hintPort)))
"port %d not found for hostname %q (hint: the service port should be specified, not the workload port", port, g))
} else {
warnings = append(warnings, fmt.Sprintf("port %d not found for hostname %q", port, g))
}
Expand All @@ -119,15 +132,60 @@ func (gc GatewayContext) ResolveGatewayInstances(
return sets.SortedList(foundInternal), sets.SortedList(foundExternal), sets.SortedList(foundPending), warnings
}

func (gc GatewayContext) GetService(hostname, namespace string) *model.Service {
return gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(hostname)][namespace]
func (gc GatewayContext) GetService(hostname, namespace, kind string) *model.Service {
// Currently only supports type Kubernetes Service
if kind != gvk.Service.Kind {
log.Warnf("Unsupported kind: expected 'Service', but got '%s'", kind)
return nil
}
serviceName := extractServiceName(hostname)

svc, err := gc.client.Kube().CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return nil
}
log.Errorf("failed to get service (serviceName: %s, namespace: %s): %v", serviceName, namespace, err)
return nil
}

return serviceRegistryKube.ConvertService(*svc, gc.domainSuffix, gc.clusterID)
}

func instancesEmpty(m map[int][]*model.ServiceInstance) bool {
for _, instances := range m {
if len(instances) > 0 {
return false
func (gc GatewayContext) GetEndpoints(hostname, namespace string) *corev1.Endpoints {
serviceName := extractServiceName(hostname)

endpoints, err := gc.client.Kube().CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})

if err != nil {
if kerrors.IsNotFound(err) {
return nil
}
log.Errorf("failed to get endpoints (serviceName: %s, namespace: %s): %v", serviceName, namespace, err)
return nil
}
return true

return endpoints
}

func checkServicePortExists(svc *model.Service, port int) bool {
if svc == nil {
return false
}
for _, svcPort := range svc.Ports {
if port == svcPort.Port {
return true
}
}
return false
}

func extractServiceName(hostName string) string {
parts := strings.Split(hostName, ".")
if len(parts) >= 4 {
return parts[0]
}
return ""
}

// End - Updated by Higress
4 changes: 3 additions & 1 deletion pkg/ingress/kube/gateway/istio/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (c *Controller) Reconcile(ps *model.PushContext) error {
ReferenceGrant: referenceGrant,
DefaultGatewaySelector: c.DefaultGatewaySelector,
Domain: c.domain,
Context: NewGatewayContext(ps),
// Start - Updated by Higress
Context: NewGatewayContext(ps, c.client, c.domain, c.cluster),
// End - Updated by Higress
}

if !input.hasResources() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/kube/gateway/istio/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func buildDestination(ctx configContext, to k8s.BackendRef, ns string, enforceRe
return nil, &ConfigError{Reason: InvalidDestination, Message: "serviceName invalid; the name of the Service must be used, not the hostname."}
}
hostname := fmt.Sprintf("%s.%s.svc.%s", to.Name, namespace, ctx.Domain)
if ctx.Context.GetService(hostname, namespace) == nil {
if ctx.Context.GetService(hostname, namespace, gvk.Service.Kind) == nil {
invalidBackendErr = &ConfigError{Reason: InvalidDestinationNotFound, Message: fmt.Sprintf("backend(%s) not found", hostname)}
}
return &istio.Destination{
Expand All @@ -1192,7 +1192,7 @@ func buildDestination(ctx configContext, to k8s.BackendRef, ns string, enforceRe
if strings.Contains(string(to.Name), ".") {
return nil, &ConfigError{Reason: InvalidDestination, Message: "serviceName invalid; the name of the Service must be used, not the hostname."}
}
if ctx.Context.GetService(hostname, namespace) == nil {
if ctx.Context.GetService(hostname, namespace, "ServiceImport") == nil {
invalidBackendErr = &ConfigError{Reason: InvalidDestinationNotFound, Message: fmt.Sprintf("backend(%s) not found", hostname)}
}
return &istio.Destination{
Expand All @@ -1210,7 +1210,7 @@ func buildDestination(ctx configContext, to k8s.BackendRef, ns string, enforceRe
return nil, &ConfigError{Reason: InvalidDestination, Message: "namespace may not be set with Hostname type"}
}
hostname := string(to.Name)
if ctx.Context.GetService(hostname, namespace) == nil {
if ctx.Context.GetService(hostname, namespace, "Hostname") == nil {
invalidBackendErr = &ConfigError{Reason: InvalidDestinationNotFound, Message: fmt.Sprintf("backend(%s) not found", hostname)}
}
return &istio.Destination{
Expand Down
Loading

0 comments on commit 93317ad

Please sign in to comment.