diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 9df95b5405..3acc132052 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -10,7 +10,6 @@ the Change License after the Change Date as each is defined in accordance with t package main import ( - "context" "flag" "math/rand" "os" @@ -18,8 +17,6 @@ import ( "syscall" "time" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - envoyServerControlPlaneV3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" log "github.com/sirupsen/logrus" kafka_config "github.com/seldonio/seldon-core/components/kafka/v2/pkg/config" @@ -166,9 +163,6 @@ func main() { defer eventHub.Close() go makeSignalHandler(logger, done) - // Create a cache - xdsCache := cache.NewSnapshotCache(true, cache.IDHash{}, logger) - tracer, err := tracing.NewTraceProvider("seldon-scheduler", &tracingConfigPath, logger) if err != nil { logger.WithError(err).Fatalf("Failed to configure otel tracer") @@ -189,7 +183,7 @@ func main() { } // Create envoy incremental processor - _, err = processor.NewIncrementalProcessor(xdsCache, nodeID, logger, ss, es, ps, eventHub, &pipelineGatewayDetails, cleaner) + incrementalProcessor, err := processor.NewIncrementalProcessor(nodeID, logger, ss, es, ps, eventHub, &pipelineGatewayDetails, cleaner) if err != nil { log.WithError(err).Fatalf("Failed to create incremental processor") } @@ -266,11 +260,8 @@ func main() { // extra wait to allow routes state to get created time.Sleep(xDSWaitTimeout) - // Start envoy xDS server, this is done after the scheduler is ready - // so that the xDS server can start sending valid updates to envoy. - ctx := context.Background() - srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil) - xdsServer := processor.NewXDSServer(srv, logger) + // create the processor separately, so it receives all updates + xdsServer := processor.NewXDSServer(incrementalProcessor, logger) err = xdsServer.StartXDSServer(envoyPort) if err != nil { log.WithError(err).Fatalf("Failed to start envoy xDS server") diff --git a/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6145.txt b/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6145.txt new file mode 100644 index 0000000000..fc8d7825a7 --- /dev/null +++ b/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6145.txt @@ -0,0 +1,22 @@ +goos: linux +goarch: amd64 +pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor +cpu: 12th Gen Intel(R) Core(TM) i7-12800H +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 112 10239625 ns/op 5885041 B/op 82877 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 98 12020676 ns/op 11498158 B/op 148125 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 28 46546621 ns/op 129625468 B/op 1804667 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 3 412763452 ns/op 1168723194 B/op 17179528 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 99 12240680 ns/op 53776819 B/op 819509 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 37 33453806 ns/op 170619219 B/op 2398968 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 4 317292666 ns/op 1696447430 B/op 24553734 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 2133085451 ns/op 6621219192 B/op 99532927 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 10000 133839 ns/op 437098 B/op 3280 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 10 102003642 ns/op 164723263 B/op 2314898 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 115990545 ns/op 239089298 B/op 3302609 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 2 532899776 ns/op 1594935896 B/op 22465678 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 10 102020346 ns/op 203750677 B/op 3057694 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 114302062 ns/op 239748963 B/op 3458251 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 301749396 ns/op 926810037 B/op 11268314 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 2109146258 ns/op 4159537240 B/op 58957736 allocs/op +PASS +ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 32.254s \ No newline at end of file diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index f45ced38ad..ee7818e6f7 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -10,17 +10,11 @@ the Change License after the Change Date as each is defined in accordance with t package processor import ( - "context" "fmt" - "math" - "math/rand" "strconv" "sync" "time" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/sirupsen/logrus" "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" @@ -40,10 +34,7 @@ const ( ) type IncrementalProcessor struct { - cache cache.SnapshotCache - nodeID string - // snapshotVersion holds the current version of the snapshot. - snapshotVersion int64 + nodeID string logger logrus.FieldLogger xdsCache *xdscache.SeldonXDSCache mu sync.RWMutex @@ -64,9 +55,8 @@ type pendingModelVersion struct { } func NewIncrementalProcessor( - cache cache.SnapshotCache, nodeID string, - log logrus.FieldLogger, + logger logrus.FieldLogger, modelStore store.ModelStore, experimentServer experiment.ExperimentServer, pipelineHandler pipeline.PipelineHandler, @@ -74,12 +64,15 @@ func NewIncrementalProcessor( pipelineGatewayDetails *xdscache.PipelineGatewayDetails, versionCleaner cleaner.ModelVersionCleaner, ) (*IncrementalProcessor, error) { + xdsCache, err := xdscache.NewSeldonXDSCache(logger, pipelineGatewayDetails) + if err != nil { + return nil, err + } + ip := &IncrementalProcessor{ - cache: cache, nodeID: nodeID, - snapshotVersion: rand.Int63n(1000), - logger: log.WithField("source", "IncrementalProcessor"), - xdsCache: xdscache.NewSeldonXDSCache(log, pipelineGatewayDetails), + logger: logger.WithField("source", "IncrementalProcessor"), + xdsCache: xdsCache, modelStore: modelStore, experimentServer: experimentServer, pipelineHandler: pipelineHandler, @@ -90,11 +83,6 @@ func NewIncrementalProcessor( batchTriggerManual: nil, } - err := ip.init() - if err != nil { - return nil, err - } - hub.RegisterModelEventHandler( modelEventHandlerName, pendingSyncsQueueSize, @@ -185,57 +173,17 @@ func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg }() } -func (p *IncrementalProcessor) init() error { - p.mu.Lock() - defer p.mu.Unlock() - err := p.xdsCache.SetupTLS() - if err != nil { - return err - } - p.xdsCache.AddPermanentListeners() - p.xdsCache.AddPermanentClusters() - return nil -} - -// newSnapshotVersion increments the current snapshotVersion -// and returns as a string. -func (p *IncrementalProcessor) newSnapshotVersion() string { - // Reset the snapshotVersion if it ever hits max size. - if p.snapshotVersion == math.MaxInt64 { - p.snapshotVersion = 0 - } - - // Increment the snapshot version & return as string. - p.snapshotVersion++ - return strconv.FormatInt(p.snapshotVersion, 10) -} - func (p *IncrementalProcessor) updateEnvoy() error { - logger := p.logger.WithField("func", "updateEnvoy") - // Create the snapshot that we'll serve to Envoy - snapshot, err := cache.NewSnapshot( - p.newSnapshotVersion(), // version - map[rsrc.Type][]types.Resource{ - rsrc.ClusterType: p.xdsCache.ClusterContents(), // clusters - rsrc.RouteType: p.xdsCache.RouteContents(), // routes - rsrc.ListenerType: p.xdsCache.ListenerContents(), // listeners - rsrc.SecretType: p.xdsCache.SecretContents(), // Secrets - }) + // sequencing is important here - send clusters before updating routes and removing old clusters + err := p.xdsCache.AddClusters() if err != nil { return err } - - if err := snapshot.Consistent(); err != nil { - return err - } - logger.Debugf("will serve snapshot %+v", snapshot) - - // Add the snapshot to the cache - if err := p.cache.SetSnapshot(context.Background(), p.nodeID, snapshot); err != nil { + err = p.xdsCache.UpdateRoutes(p.nodeID) + if err != nil { return err } - - return nil + return p.xdsCache.RemoveClusters() } // This function does not call `updateEnvoy` directly and therefore callers should make sure diff --git a/scheduler/pkg/envoy/processor/incremental_benchmark_test.go b/scheduler/pkg/envoy/processor/incremental_benchmark_test.go index f9b395eab8..39b60d2aef 100644 --- a/scheduler/pkg/envoy/processor/incremental_benchmark_test.go +++ b/scheduler/pkg/envoy/processor/incremental_benchmark_test.go @@ -15,7 +15,6 @@ import ( "testing" "time" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -125,7 +124,6 @@ func benchmarkModelUpdate( memoryStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub) pipelineStore := pipeline.NewPipelineStore(logger, eventHub, memoryStore) ip, err := NewIncrementalProcessor( - cache.NewSnapshotCache(false, cache.IDHash{}, logger), "some node", logger, memoryStore, @@ -163,12 +161,15 @@ func benchmarkModelUpdate( func BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 10, 1, 1, 10) } + func BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 100, 1, 1, 10) } + func BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 1_000, 1, 1, 10) } + func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 10_000, 1, 1, 10) } @@ -177,12 +178,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms(b *testing.B) { func BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 10, 1, 10, 10) } + func BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 100, 1, 10, 10) } + func BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 1_000, 1, 10, 10) } + func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms(b *testing.B) { benchmarkModelUpdate(b, 10_000, 1, 10, 10) } @@ -191,12 +195,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms(b *testing.B) { func BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 1, 1, 1, 100) } + func BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 100, 1, 1, 100) } + func BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 1_000, 1, 1, 100) } + func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 10_000, 1, 1, 100) } @@ -205,12 +212,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms(b *testing.B) { func BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 10, 1, 10, 100) } + func BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 100, 1, 10, 100) } + func BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 1_000, 1, 10, 100) } + func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms(b *testing.B) { benchmarkModelUpdate(b, 10_000, 1, 10, 100) } diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index 4ef6317f89..0228173e12 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -21,7 +21,6 @@ import ( endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" . "github.com/onsi/gomega" "github.com/onsi/gomega/format" log "github.com/sirupsen/logrus" @@ -291,9 +290,11 @@ func TestUpdateEnvoyForModelVersion(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}) + g.Expect(err).To(BeNil()) inc := IncrementalProcessor{ logger: log.New(), - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{}), + xdsCache: xdsCache, } for _, mv := range test.modelVersions { inc.updateEnvoyForModelVersion(mv.GetMeta().GetName(), mv, test.server, test.traffic, false) @@ -342,15 +343,15 @@ func TestRollingUpdate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil) + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}) + g.Expect(err).To(BeNil()) inc := &IncrementalProcessor{ - cache: cache.NewSnapshotCache(false, cache.IDHash{}, log.New()), logger: log.New(), - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}), + xdsCache: xdsCache, modelStore: modelStore, experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), } - inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -412,15 +413,15 @@ func TestDraining(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil) + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}) + g.Expect(err).To(BeNil()) inc := &IncrementalProcessor{ - cache: cache.NewSnapshotCache(false, cache.IDHash{}, log.New()), logger: log.New(), - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}), + xdsCache: xdsCache, modelStore: modelStore, experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), } - inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -556,16 +557,16 @@ func TestModelSync(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil) + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}) + g.Expect(err).To(BeNil()) inc := &IncrementalProcessor{ - cache: cache.NewSnapshotCache(false, cache.IDHash{}, log.New()), logger: log.New(), - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}), + xdsCache: xdsCache, modelStore: modelStore, experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), pendingModelVersions: test.pendingModelVersions, } - inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -605,6 +606,21 @@ func TestEnvoySettings(t *testing.T) { getStrPtr := func(t string) *string { return &t } tests := []test{ + { + name: "experiment with deleted model", + ops: []func(inc *IncrementalProcessor, g *WithT){ + createTestServer("server", 2), + createTestModel("model1", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}), + createTestModel("model2", "server", 1, []int{1}, 1, []store.ModelReplicaState{store.Available}), + createTestExperiment("exp", []string{"model1", "model2"}, getStrPtr("model1"), nil), + removeTestModel("model2", 1, "server", 1), + }, + numExpectedClusters: 2, // model2 should be removed from the clusters (server 1) + numExpectedRoutes: 2, // model2 should be removed from the routes + experimentActive: false, + experimentExists: true, + snapshotFilename: "experiment-deleted-model", + }, { name: "One model", ops: []func(inc *IncrementalProcessor, g *WithT){ @@ -690,21 +706,7 @@ func TestEnvoySettings(t *testing.T) { }, snapshotFilename: "experiment-new-model-version", }, - { - name: "experiment with deleted model", - ops: []func(inc *IncrementalProcessor, g *WithT){ - createTestServer("server", 2), - createTestModel("model1", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}), - createTestModel("model2", "server", 1, []int{1}, 1, []store.ModelReplicaState{store.Available}), - createTestExperiment("exp", []string{"model1", "model2"}, getStrPtr("model1"), nil), - removeTestModel("model2", 1, "server", 1), - }, - numExpectedClusters: 2, // model2 should be removed from the clusters (server 1) - numExpectedRoutes: 2, // model2 should be removed from the routes - experimentActive: false, - experimentExists: true, - snapshotFilename: "experiment-deleted-model", - }, + { name: "delete experiment", ops: []func(inc *IncrementalProcessor, g *WithT){ @@ -799,10 +801,11 @@ func TestEnvoySettings(t *testing.T) { logger := log.New() eventHub, _ := coordinator.NewEventHub(logger) memoryStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), eventHub) + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}) + g.Expect(err).To(BeNil()) inc := &IncrementalProcessor{ - cache: cache.NewSnapshotCache(false, cache.IDHash{}, log.New()), - logger: log.New(), - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}), + logger: logger.WithField("source", "IncrementalProcessor"), + xdsCache: xdsCache, modelStore: memoryStore, experimentServer: experiment.NewExperimentServer(log.New(), eventHub, memoryStore, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), eventHub, memoryStore), @@ -826,13 +829,15 @@ func TestEnvoySettings(t *testing.T) { inc.handlePipelinesEvents, ) - inc.xdsCache.AddPermanentListeners() - inc.xdsCache.AddPermanentClusters() for _, op := range test.ops { op(inc, g) time.Sleep(50 * time.Millisecond) // to allow event handlers to process } + // clusters won't be removed until the next time updateEnvoy is called + err = inc.updateEnvoy() + g.Expect(err).To(BeNil()) + g.Expect(len(inc.xdsCache.Clusters)).To(Equal(test.numExpectedClusters)) g.Expect(len(inc.xdsCache.Routes)).To(Equal(test.numExpectedRoutes)) g.Expect(len(inc.xdsCache.Pipelines)).To(Equal(test.numExpectedPipelines)) @@ -861,10 +866,10 @@ func TestEnvoySettings(t *testing.T) { routeFilename := test.snapshotFilename + "-routes.json" if generateSnapshots { - createSnapshot(g, inc.xdsCache.RouteContents(), routeFilename) + createSnapshot(g, inc.xdsCache.RouteResources(), routeFilename) } - resultingRoutes := getResultingRoutes(inc.xdsCache.RouteContents()) + resultingRoutes := getResultingRoutes(inc.xdsCache.RouteResources()) data, err := os.ReadFile(snapshots_directory_name + "/" + routeFilename) g.Expect(err).To(BeNil()) @@ -893,10 +898,10 @@ func TestEnvoySettings(t *testing.T) { clusterFilename := test.snapshotFilename + "-clusters.json" if generateSnapshots { - createSnapshot(g, inc.xdsCache.ClusterContents(), clusterFilename) + createSnapshot(g, inc.xdsCache.ClusterResources(), clusterFilename) } - resultingClusters := getResultingClusters(inc.xdsCache.ClusterContents()) + resultingClusters := getResultingClusters(inc.xdsCache.ClusterResources()) data, err = os.ReadFile(snapshots_directory_name + "/" + clusterFilename) g.Expect(err).To(BeNil()) diff --git a/scheduler/pkg/envoy/processor/server.go b/scheduler/pkg/envoy/processor/server.go index 1e1d746e97..49d9e889d7 100644 --- a/scheduler/pkg/envoy/processor/server.go +++ b/scheduler/pkg/envoy/processor/server.go @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t package processor import ( + "context" "fmt" "net" @@ -33,7 +34,11 @@ type XDSServer struct { logger log.FieldLogger } -func NewXDSServer(server serverv3.Server, logger log.FieldLogger) *XDSServer { +func NewXDSServer(incrementalProcessor *IncrementalProcessor, logger log.FieldLogger) *XDSServer { + // Start envoy xDS server, this is done after the scheduler is ready + // so that the xDS server can start sending valid updates to envoy. + ctx := context.Background() + server := serverv3.NewServer(ctx, incrementalProcessor.xdsCache, nil) return &XDSServer{ srv3: server, logger: logger, diff --git a/scheduler/pkg/envoy/processor/server_test.go b/scheduler/pkg/envoy/processor/server_test.go index 871cfb059c..9b3a199814 100644 --- a/scheduler/pkg/envoy/processor/server_test.go +++ b/scheduler/pkg/envoy/processor/server_test.go @@ -5,16 +5,13 @@ import ( "reflect" "slices" "strconv" - "sync" "testing" "time" clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" client "github.com/envoyproxy/go-control-plane/pkg/client/sotw/v3" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - envoyServerControlPlaneV3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -29,6 +26,8 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline" ) +var permanentClusterNames = []string{"pipelinegateway_http", "pipelinegateway_grpc", "mirror_http", "mirror_grpc"} + func TestFetch(t *testing.T) { g := NewGomegaWithT(t) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -36,31 +35,29 @@ func TestFetch(t *testing.T) { logger := log.New() - snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, logger) - - port, err := testing_utils.GetFreePortForTest() - if err != nil { - t.Fatal(err) - } - go func() { - err := startAdsServer(ctx, snapCache, uint(port)) - g.Expect(err).To(BeNil()) - }() - memoryStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), nil) pipelineHandler := pipeline.NewPipelineStore(logger, nil, memoryStore) + xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) inc := &IncrementalProcessor{ - cache: snapCache, logger: logger, - xdsCache: xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{}), + xdsCache: xdsCache, pipelineHandler: pipelineHandler, modelStore: memoryStore, experimentServer: experiment.NewExperimentServer(logger, nil, memoryStore, pipelineHandler), nodeID: "node_1", } - err = inc.init() + port, err := testing_utils.GetFreePortForTest() + if err != nil { + t.Fatal(err) + } + go func() { + err := startAdsServer(inc, uint(port)) + g.Expect(err).To(BeNil()) + }() + g.Expect(err).To(BeNil()) conn, err := grpc.NewClient(":"+strconv.Itoa(port), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -76,11 +73,14 @@ func TestFetch(t *testing.T) { } func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) { - expectedClusterNames := []string{"pipelinegateway_http", "pipelinegateway_grpc", "mirror_http", "mirror_grpc", "model_1_grpc", "model_1_http"} + secondFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http") + + expectedClusters := make([][]string, 2) + + expectedClusters[0] = permanentClusterNames + expectedClusters[1] = secondFetch return func(t *testing.T) { - wg := sync.WaitGroup{} - wg.Add(1) ops := []func(inc *IncrementalProcessor, g *WithT){ createTestServer("server", 1), @@ -91,8 +91,7 @@ func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) f op(inc, g) } - go func() { - // watch for configs + for _, expectedClusterNames := range expectedClusters { resp, err := c.Fetch() g.Expect(err).To(BeNil()) actualClusterNames := make([]string, 0) @@ -108,19 +107,20 @@ func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) f err = c.Ack() g.Expect(err).To(BeNil()) - wg.Done() - }() - - wg.Wait() + } } } func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) { - expectedClusterNames := []string{"pipelinegateway_http", "pipelinegateway_grpc", "mirror_http", "mirror_grpc", "model_2_grpc", "model_2_http"} + firstFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http", "model_2_grpc", "model_2_http") + secondFetch := append(permanentClusterNames, "model_2_grpc", "model_2_http") + + expectedClusters := make([][]string, 2) + + expectedClusters[0] = firstFetch + expectedClusters[1] = secondFetch return func(t *testing.T) { - wg := sync.WaitGroup{} - wg.Add(1) ops := []func(inc *IncrementalProcessor, g *WithT){ createTestModel("model", "server", 1, []int{0}, 2, []store.ModelReplicaState{store.Available}), @@ -130,8 +130,8 @@ func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSCli op(inc, g) } - go func() { - // watch for configs + for _, expectedClusterNames := range expectedClusters { + resp, err := c.Fetch() g.Expect(err).To(BeNil()) actualClusterNames := make([]string, 0) @@ -147,20 +147,14 @@ func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSCli err = c.Ack() g.Expect(err).To(BeNil()) + } - err = c.Ack() - g.Expect(err).To(BeNil()) - wg.Done() - }() - - wg.Wait() } } -func startAdsServer(ctx context.Context, snapCache cache.SnapshotCache, port uint) error { +func startAdsServer(inc *IncrementalProcessor, port uint) error { logger := log.New() - srv := envoyServerControlPlaneV3.NewServer(ctx, snapCache, nil) - xdsServer := NewXDSServer(srv, logger) + xdsServer := NewXDSServer(inc, logger) err := xdsServer.StartXDSServer(port) if err != nil { log.WithError(err).Fatalf("Failed to start envoy xDS server") diff --git a/scheduler/pkg/envoy/xdscache/cache.go b/scheduler/pkg/envoy/xdscache/model.go similarity index 100% rename from scheduler/pkg/envoy/xdscache/cache.go rename to scheduler/pkg/envoy/xdscache/model.go diff --git a/scheduler/pkg/envoy/xdscache/resource.go b/scheduler/pkg/envoy/xdscache/resource.go index 035c1b7432..3de06ee2d0 100644 --- a/scheduler/pkg/envoy/xdscache/resource.go +++ b/scheduler/pkg/envoy/xdscache/resource.go @@ -60,7 +60,7 @@ var ( pipelineRoutePathGrpc = &route.RouteMatch_Prefix{Prefix: "/inference.GRPCInferenceService"} ) -func MakeHTTPListener(listenerName, address string, +func makeHTTPListener(listenerName, address string, port uint32, routeConfigurationName string, serverSecret *Secret, @@ -139,7 +139,7 @@ func MakeHTTPListener(listenerName, address string, } } -func MakeCluster(clusterName string, eps []Endpoint, isGrpc bool, clientSecret *Secret) *cluster.Cluster { +func makeCluster(clusterName string, eps map[string]Endpoint, isGrpc bool, clientSecret *Secret) *cluster.Cluster { if isGrpc { // Need to ensure http 2 is used // https://github.com/envoyproxy/go-control-plane/blob/d1a10d9a9366e8ab48f3f76b44a35930bac46fec/envoy/extensions/upstreams/http/v3/http_protocol_options.pb.go#L165-L166 @@ -161,7 +161,7 @@ func MakeCluster(clusterName string, eps []Endpoint, isGrpc bool, clientSecret * ConnectTimeout: durationpb.New(5 * time.Second), ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STRICT_DNS}, LbPolicy: cluster.Cluster_LEAST_REQUEST, - LoadAssignment: MakeEndpoint(clusterName, eps), + LoadAssignment: makeEndpoint(clusterName, eps), DnsLookupFamily: cluster.Cluster_V4_ONLY, TypedExtensionProtocolOptions: map[string]*anypb.Any{"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": hpoMarshalled}, TransportSocket: createUpstreamTransportSocket(clientSecret), @@ -172,14 +172,14 @@ func MakeCluster(clusterName string, eps []Endpoint, isGrpc bool, clientSecret * ConnectTimeout: durationpb.New(5 * time.Second), ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STRICT_DNS}, LbPolicy: cluster.Cluster_LEAST_REQUEST, - LoadAssignment: MakeEndpoint(clusterName, eps), + LoadAssignment: makeEndpoint(clusterName, eps), DnsLookupFamily: cluster.Cluster_V4_ONLY, TransportSocket: createUpstreamTransportSocket(clientSecret), } } } -func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssignment { +func makeEndpoint(clusterName string, eps map[string]Endpoint) *endpoint.ClusterLoadAssignment { var endpoints []*endpoint.LbEndpoint for _, e := range eps { @@ -210,7 +210,7 @@ func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssig } } -func MakeRoutes(routes map[string]Route, pipelines map[string]PipelineRoute) (*route.RouteConfiguration, *route.RouteConfiguration) { +func makeRoutes(routes map[string]Route, pipelines map[string]PipelineRoute) (*route.RouteConfiguration, *route.RouteConfiguration) { rts := make([]*route.Route, 2*(len(routes)+len(pipelines))+ countModelStickySessions(routes)+ countPipelineStickySessions(pipelines)) diff --git a/scheduler/pkg/envoy/xdscache/resource_test.go b/scheduler/pkg/envoy/xdscache/resource_test.go index 1a21aba696..9d0f28c75e 100644 --- a/scheduler/pkg/envoy/xdscache/resource_test.go +++ b/scheduler/pkg/envoy/xdscache/resource_test.go @@ -168,7 +168,7 @@ func TestMakeRoute(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - rcDef, rcMirror := MakeRoutes(test.modelRoutes, test.pipelineRoutes) + rcDef, rcMirror := makeRoutes(test.modelRoutes, test.pipelineRoutes) g.Expect(len(rcDef.VirtualHosts[0].Routes)).To(Equal(test.expectedDefaultRoutes)) g.Expect(len(rcMirror.VirtualHosts[0].Routes)).To(Equal(test.expectedMirrorRoutes)) }) diff --git a/scheduler/pkg/envoy/xdscache/seldoncache.go b/scheduler/pkg/envoy/xdscache/seldoncache.go index 374860e6f9..1355fff34f 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache.go @@ -10,9 +10,16 @@ the Change License after the Change Date as each is defined in accordance with t package xdscache import ( + "context" "fmt" + "math" + "math/rand" + "strconv" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/sirupsen/logrus" seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" @@ -29,25 +36,30 @@ const ( mirrorListenerAddress = "0.0.0.0" mirrorListenerPort uint32 = 9001 - permanentListenerCount int = 2 // seldon_service and seldon_mirrors - permanentClusterCount int = 4 // pipeline gateway * 2 + model gateway * 2 - - EnvoyDownstreamServerCertName = "downstream_server" - EnvoyDownstreamClientCertName = "downstream_client" - EnvoyUpstreamServerCertName = "upstream_server" - EnvoyUpstreamClientCertName = "upstream_client" + envoyDownstreamServerCertName = "downstream_server" + envoyDownstreamClientCertName = "downstream_client" + envoyUpstreamServerCertName = "upstream_server" + envoyUpstreamClientCertName = "upstream_client" + snapshotType = "snapshot" ) type SeldonXDSCache struct { - permanentListeners []types.Resource - permanentClusters []types.Resource - Routes map[string]Route + muxCache *cache.MuxCache + snapshotCache cache.SnapshotCache + cds *cache.LinearCache + lds *cache.LinearCache + sds *cache.LinearCache + Clusters map[string]Cluster Pipelines map[string]PipelineRoute - Secrets map[string]Secret - PipelineGatewayDetails *PipelineGatewayDetails + Routes map[string]Route + clustersToAdd map[string]bool + clustersToRemove map[string]bool + pipelineGatewayDetails *PipelineGatewayDetails + secrets map[string]Secret logger logrus.FieldLogger - TLSActive bool + tlsActive bool + snapshotVersion int64 } type PipelineGatewayDetails struct { @@ -56,24 +68,115 @@ type PipelineGatewayDetails struct { GrpcPort int } -func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *PipelineGatewayDetails) *SeldonXDSCache { - return &SeldonXDSCache{ - permanentListeners: make([]types.Resource, permanentListenerCount), - permanentClusters: make([]types.Resource, permanentClusterCount), +func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *PipelineGatewayDetails) (*SeldonXDSCache, error) { + xdsCache := &SeldonXDSCache{ Clusters: make(map[string]Cluster), - Routes: make(map[string]Route), Pipelines: make(map[string]PipelineRoute), - Secrets: make(map[string]Secret), - PipelineGatewayDetails: pipelineGatewayDetails, + Routes: make(map[string]Route), + clustersToAdd: make(map[string]bool), + clustersToRemove: make(map[string]bool), + pipelineGatewayDetails: pipelineGatewayDetails, + secrets: make(map[string]Secret), logger: logger.WithField("source", "SeldonXDSCache"), + snapshotVersion: rand.Int63n(1000), + } + err := xdsCache.init() + if err != nil { + return nil, err + } + return xdsCache, nil +} + +func (xds *SeldonXDSCache) CreateWatch(req *cache.Request, stream stream.StreamState, responseChan chan cache.Response) (cancel func()) { + return xds.muxCache.CreateWatch(req, stream, responseChan) +} + +func (xds *SeldonXDSCache) CreateDeltaWatch(req *cache.DeltaRequest, stream stream.StreamState, responseChan chan cache.DeltaResponse) (cancel func()) { + return xds.muxCache.CreateDeltaWatch(req, stream, responseChan) +} + +func (xds *SeldonXDSCache) Fetch(ctx context.Context, req *cache.Request) (cache.Response, error) { + return xds.muxCache.Fetch(ctx, req) +} + +func (xds *SeldonXDSCache) newSnapshotVersion() string { + // Reset the snapshotVersion if it ever hits max size. + if xds.snapshotVersion == math.MaxInt64 { + xds.snapshotVersion = 0 } + + // Increment the snapshot version & return as string. + xds.snapshotVersion++ + return strconv.FormatInt(xds.snapshotVersion, 10) } -func (xds *SeldonXDSCache) SetupTLS() error { +func (xds *SeldonXDSCache) init() error { + linearLogger := xds.logger.WithField("source", "LinearCache") + snapshotLogger := xds.logger.WithField("source", "SnapshotCache") + + secretTypeURL := resource.SecretType + sds := cache.NewLinearCache(secretTypeURL, cache.WithLogger(linearLogger)) + xds.sds = sds + + listenerTypeURL := resource.ListenerType + lds := cache.NewLinearCache(listenerTypeURL, cache.WithLogger(linearLogger)) + xds.lds = lds + + clusterTypeURL := resource.ClusterType + cds := cache.NewLinearCache(clusterTypeURL, cache.WithLogger(linearLogger)) + xds.cds = cds + + snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, snapshotLogger) + xds.snapshotCache = snapshotCache + + classify := func(typeUrl string) string { + switch typeUrl { + case secretTypeURL: + return secretTypeURL + case listenerTypeURL: + return listenerTypeURL + case clusterTypeURL: + return clusterTypeURL + default: + return snapshotType + } + } + + xds.muxCache = &cache.MuxCache{ + Classify: func(req *cache.Request) string { + return classify(req.GetTypeUrl()) + }, + ClassifyDelta: func(req *cache.DeltaRequest) string { + return classify(req.GetTypeUrl()) + }, + Caches: map[string]cache.Cache{ + secretTypeURL: sds, + listenerTypeURL: lds, + clusterTypeURL: cds, + snapshotType: snapshotCache, + }, + } + + err := xds.setupTLS() + if err != nil { + return err + } + err = xds.addPermanentListeners() + if err != nil { + return err + } + err = xds.addPermanentClusters() + if err != nil { + return err + } + return nil +} + +func (xds *SeldonXDSCache) setupTLS() error { logger := xds.logger.WithField("func", "SetupTLS") protocol := seldontls.GetSecurityProtocolFromEnv(seldontls.EnvSecurityPrefixEnvoy) if protocol == seldontls.SecurityProtocolSSL { - xds.TLSActive = true + xds.tlsActive = true // Envoy client to talk to agent or Pipelinegateway logger.Info("Upstream TLS active") @@ -82,7 +185,10 @@ func (xds *SeldonXDSCache) SetupTLS() error { if err != nil { return err } - xds.AddSecret(EnvoyUpstreamClientCertName, EnvoyUpstreamServerCertName, tlsUpstreamClient) + err = xds.createSecret(envoyUpstreamClientCertName, envoyUpstreamServerCertName, tlsUpstreamClient) + if err != nil { + return err + } // Envoy listener - external calls to Seldon logger.Info("Downstream TLS active") @@ -91,109 +197,168 @@ func (xds *SeldonXDSCache) SetupTLS() error { if err != nil { return err } - xds.AddSecret(EnvoyDownstreamServerCertName, EnvoyDownstreamClientCertName, tlsDownstreamServer) + err = xds.createSecret(envoyDownstreamServerCertName, envoyDownstreamClientCertName, tlsDownstreamServer) + if err != nil { + return err + } } return nil } -func (xds *SeldonXDSCache) AddPermanentListeners() { +func (xds *SeldonXDSCache) createSecret(name string, validationSecretName string, certificate *seldontls.CertificateStore) error { + seldonSecret := Secret{ + Name: name, + ValidationSecretName: validationSecretName, + Certificate: certificate, + } + + xds.secrets[name] = seldonSecret + secrets := MakeSecretResource(seldonSecret.Name, seldonSecret.ValidationSecretName, seldonSecret.Certificate) + for _, secret := range secrets { + err := xds.sds.UpdateResource(secret.Name, secret) + if err != nil { + return err + } + } + return nil +} + +func (xds *SeldonXDSCache) addPermanentListeners() error { var serverSecret *Secret - if xds.TLSActive { - if secret, ok := xds.Secrets[EnvoyDownstreamServerCertName]; ok { + if xds.tlsActive { + if secret, ok := xds.secrets[envoyDownstreamServerCertName]; ok { serverSecret = &secret } } - xds.permanentListeners[0] = MakeHTTPListener(defaultListenerName, defaultListenerAddress, defaultListenerPort, DefaultRouteConfigurationName, serverSecret) - xds.permanentListeners[1] = MakeHTTPListener(mirrorListenerName, mirrorListenerAddress, mirrorListenerPort, MirrorRouteConfigurationName, serverSecret) + resources := make(map[string]types.Resource) + resources[defaultListenerName] = makeHTTPListener(defaultListenerName, defaultListenerAddress, defaultListenerPort, DefaultRouteConfigurationName, serverSecret) + resources[mirrorListenerName] = makeHTTPListener(mirrorListenerName, mirrorListenerAddress, mirrorListenerPort, MirrorRouteConfigurationName, serverSecret) + return xds.lds.UpdateResources(resources, nil) } -func (xds *SeldonXDSCache) AddPermanentClusters() { +func (xds *SeldonXDSCache) addPermanentClusters() error { var clientSecret *Secret - if xds.TLSActive { - if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { + if xds.tlsActive { + if secret, ok := xds.secrets[envoyUpstreamClientCertName]; ok { clientSecret = &secret } } + resources := make(map[string]types.Resource) + // Add pipeline gateway clusters - xds.logger.Infof("Add http pipeline cluster %s host:%s port:%d", PipelineGatewayHttpClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.HttpPort) - xds.permanentClusters[0] = MakeCluster(PipelineGatewayHttpClusterName, []Endpoint{ - { - UpstreamHost: xds.PipelineGatewayDetails.Host, - UpstreamPort: uint32(xds.PipelineGatewayDetails.HttpPort), + pipelineGatewayHttpEndpointName := fmt.Sprintf("%s:%d", xds.pipelineGatewayDetails.Host, xds.pipelineGatewayDetails.HttpPort) + pipelineGatewayHttpCluster := makeCluster(PipelineGatewayHttpClusterName, map[string]Endpoint{ + pipelineGatewayHttpEndpointName: { + UpstreamHost: xds.pipelineGatewayDetails.Host, + UpstreamPort: uint32(xds.pipelineGatewayDetails.HttpPort), }, }, false, clientSecret) + resources[pipelineGatewayHttpCluster.Name] = pipelineGatewayHttpCluster - xds.logger.Infof("Add grpc pipeline cluster %s host:%s port:%d", PipelineGatewayGrpcClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.GrpcPort) - xds.permanentClusters[1] = MakeCluster(PipelineGatewayGrpcClusterName, []Endpoint{ - { - UpstreamHost: xds.PipelineGatewayDetails.Host, - UpstreamPort: uint32(xds.PipelineGatewayDetails.GrpcPort), + pipelineGatewayGrpcEndpointName := fmt.Sprintf("%s:%d", xds.pipelineGatewayDetails.Host, xds.pipelineGatewayDetails.GrpcPort) + pipelineGatewayGrpcCluster := makeCluster(PipelineGatewayGrpcClusterName, map[string]Endpoint{ + pipelineGatewayGrpcEndpointName: { + UpstreamHost: xds.pipelineGatewayDetails.Host, + UpstreamPort: uint32(xds.pipelineGatewayDetails.GrpcPort), }, }, true, clientSecret) + resources[pipelineGatewayGrpcCluster.Name] = pipelineGatewayGrpcCluster // Add Mirror clusters - xds.logger.Infof("Add http mirror cluster %s host:%s port:%d", MirrorHttpClusterName, mirrorListenerAddress, mirrorListenerPort) - xds.permanentClusters[2] = MakeCluster(MirrorHttpClusterName, []Endpoint{ - { + mirrorHttpEndpointName := fmt.Sprintf("%s:%d", mirrorListenerAddress, mirrorListenerPort) + mirrorHttpCluster := makeCluster(MirrorHttpClusterName, map[string]Endpoint{ + mirrorHttpEndpointName: { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, }, }, false, nil) - xds.logger.Infof("Add grpc mirror cluster %s host:%s port:%d", MirrorGrpcClusterName, mirrorListenerAddress, mirrorListenerPort) - xds.permanentClusters[3] = MakeCluster(MirrorGrpcClusterName, []Endpoint{ - { + resources[mirrorHttpCluster.Name] = mirrorHttpCluster + + mirrorGrpcEndpointName := fmt.Sprintf("%s:%d", mirrorListenerAddress, mirrorListenerPort) + mirrorGprcCluster := makeCluster(MirrorGrpcClusterName, map[string]Endpoint{ + mirrorGrpcEndpointName: { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, }, }, true, nil) + resources[mirrorGprcCluster.Name] = mirrorGprcCluster + + return xds.cds.UpdateResources(resources, nil) } -func (xds *SeldonXDSCache) ClusterContents() []types.Resource { +func (xds *SeldonXDSCache) ClusterResources() []types.Resource { var r []types.Resource + for _, cluster := range xds.cds.GetResources() { + r = append(r, cluster) + } + return r +} +func (xds *SeldonXDSCache) RouteResources() []types.Resource { + defaultRoutes, mirrorRoutes := makeRoutes(xds.Routes, xds.Pipelines) + return []types.Resource{defaultRoutes, mirrorRoutes} +} + +func (xds *SeldonXDSCache) UpdateRoutes(nodeId string) error { + logger := xds.logger.WithField("func", "UpdateRoutes") + // Create the snapshot that we'll serve to Envoy + snapshot, err := cache.NewSnapshot( + xds.newSnapshotVersion(), // version + map[resource.Type][]types.Resource{ + resource.RouteType: xds.RouteResources(), // routes + }) + if err != nil { + logger.Errorf("could not create snapshot %+v", snapshot) + return err + } + + logger.Debugf("will serve snapshot %+v", snapshot) + + // Add the snapshot to the cache + if err := xds.snapshotCache.SetSnapshot(context.Background(), nodeId, snapshot); err != nil { + return err + } + return nil +} + +func (xds *SeldonXDSCache) AddClusters() error { var clientSecret *Secret - if xds.TLSActive { - if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { + if xds.tlsActive { + if secret, ok := xds.secrets[envoyUpstreamClientCertName]; ok { clientSecret = &secret } } - r = append(r, xds.permanentClusters...) + resources := make(map[string]types.Resource) + for clusterName := range xds.clustersToAdd { + cluster, ok := xds.Clusters[clusterName] + if ok { + resource := makeCluster(cluster.Name, cluster.Endpoints, cluster.Grpc, clientSecret) + resources[cluster.Name] = resource - for _, c := range xds.Clusters { - endpoints := make([]Endpoint, 0, len(c.Endpoints)) - for _, value := range c.Endpoints { // Likely to be small (<100?) as is number of model replicas - endpoints = append(endpoints, value) } - r = append(r, MakeCluster(c.Name, endpoints, c.Grpc, clientSecret)) + delete(xds.clustersToAdd, clusterName) } - return r -} - -func (xds *SeldonXDSCache) RouteContents() []types.Resource { - defaultRoutes, mirrorRoutes := MakeRoutes(xds.Routes, xds.Pipelines) - return []types.Resource{defaultRoutes, mirrorRoutes} + return xds.cds.UpdateResources(resources, nil) } -func (xds *SeldonXDSCache) ListenerContents() []types.Resource { - return xds.permanentListeners -} - -func (xds *SeldonXDSCache) SecretContents() []types.Resource { - logger := xds.logger.WithField("func", "SecretContents") - var r []types.Resource - - for _, s := range xds.Secrets { - secrets := MakeSecretResource(s.Name, s.ValidationSecretName, s.Certificate) - logger.Infof("Adding secrets for %s(%s) of length %d", s.Name, s.ValidationSecretName, len(secrets)) - for _, secret := range secrets { - r = append(r, secret) +func (xds *SeldonXDSCache) RemoveClusters() error { + clustersToRemove := make([]string, 0) + for clusterName := range xds.clustersToRemove { + if xds.shouldRemoveCluster(clusterName) { + clustersToRemove = append(clustersToRemove, clusterName) } + delete(xds.clustersToRemove, clusterName) } - return r + return xds.cds.UpdateResources(nil, clustersToRemove) +} + +func (xds *SeldonXDSCache) shouldRemoveCluster(name string) bool { + cluster, ok := xds.Clusters[name] + return !ok || len(cluster.Routes) < 1 } func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []PipelineTrafficSplit, mirror *PipelineTrafficSplit) { @@ -216,14 +381,6 @@ func (xds *SeldonXDSCache) RemovePipelineRoute(pipelineName string) { delete(xds.Pipelines, pipelineName) } -func (xds *SeldonXDSCache) AddSecret(name string, validationSecretName string, certificate *seldontls.CertificateStore) { - xds.Secrets[name] = Secret{ - Name: name, - ValidationSecretName: validationSecretName, - Certificate: certificate, - } -} - func (xds *SeldonXDSCache) AddRouteClusterTraffic( routeName, modelName, httpClusterName, grpcClusterName string, modelVersion uint32, @@ -280,8 +437,6 @@ func (xds *SeldonXDSCache) AddClustersForRoute( Grpc: false, } } - xds.Clusters[httpClusterName] = httpCluster - httpCluster.Routes[routeVersionKey] = true grpcCluster, ok := xds.Clusters[grpcClusterName] if !ok { @@ -303,15 +458,23 @@ func (xds *SeldonXDSCache) AddClustersForRoute( UpstreamHost: replica.GetInferenceSvc(), UpstreamPort: uint32(replica.GetInferenceHttpPort()), } + grpcEndpointName := fmt.Sprintf("%s:%d", replica.GetInferenceSvc(), replica.GetInferenceGrpcPort()) grpcCluster.Endpoints[grpcEndpointName] = Endpoint{ UpstreamHost: replica.GetInferenceSvc(), UpstreamPort: uint32(replica.GetInferenceGrpcPort()), } + } } + + xds.Clusters[httpClusterName] = httpCluster + httpCluster.Routes[routeVersionKey] = true + xds.clustersToAdd[httpClusterName] = true + xds.Clusters[grpcClusterName] = grpcCluster grpcCluster.Routes[routeVersionKey] = true + xds.clustersToAdd[grpcClusterName] = true } func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { @@ -347,6 +510,7 @@ func (xds *SeldonXDSCache) removeRouteFromCluster(route Route, cluster TrafficSp delete(cluster.Routes, RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion}) if len(cluster.Routes) == 0 { delete(xds.Clusters, clusterName) + xds.clustersToRemove[clusterName] = true } return nil } diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go index 92d411b9a4..0ffb5e2610 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go @@ -22,7 +22,7 @@ import ( var results []types.Resource func benchmarkRouteContents(b *testing.B, numResources uint) { - x := NewSeldonXDSCache(logrus.New(), nil) + x, _ := NewSeldonXDSCache(logrus.New(), nil) for n := 0; n < int(numResources); n++ { x.AddPipelineRoute(strconv.Itoa(n), []PipelineTrafficSplit{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil) @@ -42,7 +42,7 @@ func benchmarkRouteContents(b *testing.B, numResources uint) { // Prevent compiler optimising away function calls var r []types.Resource for i := 0; i < b.N; i++ { - r = x.RouteContents() + r = x.RouteResources() } results = r } diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_test.go index 3bc205c58c..2b25361791 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_test.go @@ -27,8 +27,9 @@ import ( func TestAddRemoveHttpAndGrpcRoute(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) httpCluster := "m1_1_http" grpcCluster := "m1_1_grpc" model1 := "m1" @@ -38,9 +39,14 @@ func TestAddRemoveHttpAndGrpcRoute(t *testing.T) { addVersionedRoute(c, route1, model1, httpCluster, grpcCluster, 100, 1) addVersionedRoute(c, route2, model1, httpCluster, grpcCluster, 100, 1) - err := c.RemoveRoute(route1) + _, ok := c.clustersToAdd[httpCluster] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcCluster] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + + err = c.RemoveRoute(route1) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpCluster] + _, ok = c.Clusters[httpCluster] g.Expect(ok).To(BeTrue()) // http Cluster remains as r2 still connected _, ok = c.Clusters[grpcCluster] g.Expect(ok).To(BeTrue()) // grpc Cluster remains as r2 still connected @@ -57,7 +63,8 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) httpCluster1 := "m1_1_http" grpcCluster1 := "m1_1_grpc" @@ -70,6 +77,15 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { addVersionedRoute(c, route1, model1, httpCluster1, grpcCluster1, 40, 1) addVersionedRoute(c, route1, model1, httpCluster2, grpcCluster2, 60, 2) + _, ok := c.clustersToAdd[httpCluster1] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcCluster1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + _, ok = c.clustersToAdd[httpCluster2] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcCluster2] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + // check what we have added g.Expect(len(c.Routes[route1].Clusters)).To(Equal(2)) clusters := c.Routes[route1].Clusters @@ -95,18 +111,36 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { g.Expect(len(c.Clusters[httpCluster1].Endpoints)).To(Equal(1)) g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) - err := c.RemoveRoute(route1) + err = c.RemoveRoute(route1) + g.Expect(err).To(BeNil()) + err = c.RemoveClusters() // remove clusters to clear clustersToRemove g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpCluster1] + _, ok = c.Clusters[httpCluster1] g.Expect(ok).To(BeTrue()) // http Cluster remains as r2 still connected _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeTrue()) // grpc Cluster remains as r2 still connected + _, ok = c.clustersToRemove[httpCluster1] + g.Expect(ok).To(BeFalse()) // http Cluster not to be removed from cds + _, ok = c.clustersToRemove[grpcCluster1] + g.Expect(ok).To(BeFalse()) // grpc Cluster not to be removed from cds + ok = c.shouldRemoveCluster(httpCluster1) + g.Expect(ok).To(BeFalse()) // http Cluster not to be removed from cds + ok = c.shouldRemoveCluster(grpcCluster1) + g.Expect(ok).To(BeFalse()) // grpc Cluster not to be removed from cds err = c.RemoveRoute(route2) g.Expect(err).To(BeNil()) _, ok = c.Clusters[httpCluster1] g.Expect(ok).To(BeFalse()) // http Cluster removed _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed + _, ok = c.clustersToRemove[httpCluster1] + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + _, ok = c.clustersToRemove[grpcCluster1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + ok = c.shouldRemoveCluster(httpCluster1) + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + ok = c.shouldRemoveCluster(grpcCluster1) + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds } // Checks a cluster with multiple versions is created correctly @@ -114,7 +148,8 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForSameModel(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) routeName := "r1" httpCluster1 := "m1_1_http" @@ -126,6 +161,15 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForSameModel(t *testing.T) { addVersionedRoute(c, routeName, model1, httpCluster1, grpcCluster1, 40, 1) addVersionedRoute(c, routeName, model1, httpCluster2, grpcCluster2, 60, 2) + _, ok := c.clustersToAdd[httpCluster1] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcCluster1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + _, ok = c.clustersToAdd[httpCluster2] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcCluster2] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + // check what we have added g.Expect(len(c.Routes[routeName].Clusters)).To(Equal(2)) clusters := c.Routes[routeName].Clusters @@ -140,12 +184,20 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForSameModel(t *testing.T) { g.Expect(c.Clusters[httpCluster2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) g.Expect(c.Clusters[grpcCluster2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) - err := c.RemoveRoute(routeName) + err = c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpCluster1] + _, ok = c.Clusters[httpCluster1] g.Expect(ok).To(BeFalse()) // http Cluster removed _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed + _, ok = c.clustersToRemove[httpCluster1] + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + _, ok = c.clustersToRemove[grpcCluster1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + ok = c.shouldRemoveCluster(httpCluster1) + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + ok = c.shouldRemoveCluster(grpcCluster1) + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds } // Checks a cluster with multiple versions is created correctly @@ -153,7 +205,8 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) httpClusterModel1 := "m1_1_http" grpcClusterModel1 := "m1_1_grpc" @@ -166,6 +219,15 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { addVersionedRoute(c, routeName, model1, httpClusterModel1, grpcClusterModel1, 40, 1) addVersionedRoute(c, routeName, model2, httpClusterModel2, grpcClusterModel2, 60, 1) + _, ok := c.clustersToAdd[httpClusterModel1] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcClusterModel1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + _, ok = c.clustersToAdd[httpClusterModel2] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcClusterModel2] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + // check what we have added g.Expect(len(c.Routes[routeName].Clusters)).To(Equal(2)) clusters := c.Routes[routeName].Clusters @@ -180,9 +242,9 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { g.Expect(c.Clusters[httpClusterModel2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) g.Expect(c.Clusters[grpcClusterModel2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) - err := c.RemoveRoute(routeName) + err = c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpClusterModel1] + _, ok = c.Clusters[httpClusterModel1] g.Expect(ok).To(BeFalse()) // http Cluster removed _, ok = c.Clusters[grpcClusterModel1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed @@ -190,13 +252,30 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { g.Expect(ok).To(BeFalse()) // http Cluster removed _, ok = c.Clusters[grpcClusterModel2] g.Expect(ok).To(BeFalse()) // grpc Cluster removed + _, ok = c.clustersToRemove[httpClusterModel1] + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + _, ok = c.clustersToRemove[grpcClusterModel1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + ok = c.shouldRemoveCluster(httpClusterModel1) + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + ok = c.shouldRemoveCluster(grpcClusterModel1) + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + _, ok = c.clustersToRemove[httpClusterModel2] + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + _, ok = c.clustersToRemove[grpcClusterModel2] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + ok = c.shouldRemoveCluster(httpClusterModel2) + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + ok = c.shouldRemoveCluster(grpcClusterModel2) + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds } func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentRoutesSameModel(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) route1 := "r1" route2 := "r2" @@ -207,6 +286,11 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentRoutesSameModel(t *testing addVersionedRoute(c, route1, model1, httpClusterModel1, grpcClusterModel1, 100, 1) addVersionedRoute(c, route2, model1, httpClusterModel1, grpcClusterModel1, 100, 1) + _, ok := c.clustersToAdd[httpClusterModel1] + g.Expect(ok).To(BeTrue()) // http Cluster to be added to cds + _, ok = c.clustersToAdd[grpcClusterModel1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be added to cds + // check what we have added g.Expect(len(c.Routes[route1].Clusters)).To(Equal(1)) g.Expect(len(c.Routes[route2].Clusters)).To(Equal(1)) @@ -221,18 +305,30 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentRoutesSameModel(t *testing g.Expect(c.Clusters[httpClusterModel1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) g.Expect(c.Clusters[grpcClusterModel1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) - err := c.RemoveRoute(route1) + err = c.RemoveRoute(route1) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpClusterModel1] + _, ok = c.Clusters[httpClusterModel1] g.Expect(ok).To(BeTrue()) // http Cluster not removed _, ok = c.Clusters[grpcClusterModel1] g.Expect(ok).To(BeTrue()) // grpc Cluster not removed + ok = c.shouldRemoveCluster(httpClusterModel1) + g.Expect(ok).To(BeFalse()) // http Cluster not to be removed from cds + ok = c.shouldRemoveCluster(grpcClusterModel1) + g.Expect(ok).To(BeFalse()) // grpc Cluster not to be removed from cds err = c.RemoveRoute(route2) g.Expect(err).To(BeNil()) _, ok = c.Clusters[httpClusterModel1] g.Expect(ok).To(BeFalse()) // http Cluster removed _, ok = c.Clusters[grpcClusterModel1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed + _, ok = c.clustersToRemove[httpClusterModel1] + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + _, ok = c.clustersToRemove[grpcClusterModel1] + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds + ok = c.shouldRemoveCluster(httpClusterModel1) + g.Expect(ok).To(BeTrue()) // http Cluster to be removed from cds + ok = c.shouldRemoveCluster(grpcClusterModel1) + g.Expect(ok).To(BeTrue()) // grpc Cluster to be removed from cds } func TestSetupTLS(t *testing.T) { @@ -275,7 +371,8 @@ func TestSetupTLS(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + c, err := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) + g.Expect(err).To(BeNil()) if test.setTLS { t.Setenv(fmt.Sprintf("%s%s", seldontls.EnvSecurityPrefixEnvoy, seldontls.EnvSecurityProtocolSuffix), seldontls.SecurityProtocolSSL) } @@ -288,7 +385,7 @@ func TestSetupTLS(t *testing.T) { t.Setenv(fmt.Sprintf("%s%s", envPrefix, seldontls.EnvKeyLocationSuffix), fmt.Sprintf("%s/tls.key", tmpFolder)) t.Setenv(fmt.Sprintf("%s%s", envPrefix, seldontls.EnvCaLocationSuffix), fmt.Sprintf("%s/ca.crt", tmpFolder)) } - err := c.SetupTLS() + err = c.setupTLS() if test.err { g.Expect(err).ToNot(BeNil()) } else { @@ -313,41 +410,12 @@ func addVersionedRoute(c *SeldonXDSCache, modelRouteName string, modelName strin store.ModelAvailable, ) - addCluster(c, httpCluster, modelRouteName, modelName, version, false) - addCluster(c, grpcCluster, modelRouteName, modelName, version, true) - c.AddRouteClusterTraffic(modelRouteName, modelName, httpCluster, grpcCluster, modelVersion.GetVersion(), traffic, false, false) - addEndpoint(c, httpCluster, "0.0.0.0", 9000) - addEndpoint(c, grpcCluster, "0.0.0.0", 9001) -} - -func addCluster( - xds *SeldonXDSCache, - name string, - routeName string, - modelName string, - modelVersion uint32, - isGrpc bool, -) { - cluster, ok := xds.Clusters[name] - if !ok { - cluster = Cluster{ - Name: name, - Endpoints: make(map[string]Endpoint), - Routes: make(map[RouteVersionKey]bool), - Grpc: isGrpc, - } - } - cluster.Routes[RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion}] = true - xds.Clusters[name] = cluster -} - -func addEndpoint(xds *SeldonXDSCache, clusterName, upstreamHost string, upstreamPort uint32) { - cluster := xds.Clusters[clusterName] - k := fmt.Sprintf("%s:%d", upstreamHost, upstreamPort) - cluster.Endpoints[k] = Endpoint{ - UpstreamHost: upstreamHost, - UpstreamPort: upstreamPort, + server := &store.ServerSnapshot{ + Name: "server", + Replicas: map[int]*store.ServerReplica{ + 1: store.NewServerReplica("0.0.0.0", 9000, 9001, 1, store.NewServer("server", false), nil, 100, 100, 0, nil, 100), + }, } - - xds.Clusters[clusterName] = cluster + c.AddClustersForRoute(modelRouteName, modelName, httpCluster, grpcCluster, modelVersion.GetVersion(), []int{1}, server) + c.AddRouteClusterTraffic(modelRouteName, modelName, httpCluster, grpcCluster, modelVersion.GetVersion(), traffic, false, false) } diff --git a/scheduler/pkg/envoy/xdscache/tls.go b/scheduler/pkg/envoy/xdscache/tls.go index fe33c3ceb1..98ef56e6cd 100644 --- a/scheduler/pkg/envoy/xdscache/tls.go +++ b/scheduler/pkg/envoy/xdscache/tls.go @@ -17,10 +17,6 @@ import ( "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" ) -const ( - xdsClusterName = "xds_cluster" -) - func MakeSecretResource(name string, validationName string, certStore tls.CertificateStoreHandler) []*tlsv3.Secret { var secrets []*tlsv3.Secret @@ -54,22 +50,8 @@ func MakeSecretResource(name string, validationName string, certStore tls.Certif } var configSource = &core.ConfigSource{ - ResourceApiVersion: core.ApiVersion_V3, - ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{ - ApiConfigSource: &core.ApiConfigSource{ - ApiType: core.ApiConfigSource_GRPC, - TransportApiVersion: core.ApiVersion_V3, - GrpcServices: []*core.GrpcService{ - { - TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ - EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ - ClusterName: xdsClusterName, - }, - }, - }, - }, - }, - }, + ResourceApiVersion: core.ApiVersion_V3, + ConfigSourceSpecifier: &core.ConfigSource_Ads{}, } func createDownstreamTransportSocket(serverSecret *Secret) *core.TransportSocket { diff --git a/scheduler/pkg/kafka/gateway/worker_test.go b/scheduler/pkg/kafka/gateway/worker_test.go index 1cfb6089fd..a21d5356ae 100644 --- a/scheduler/pkg/kafka/gateway/worker_test.go +++ b/scheduler/pkg/kafka/gateway/worker_test.go @@ -200,7 +200,8 @@ func createInferWorkerWithMockConn( logger log.FieldLogger, serverConfig *InferenceServerConfig, modelConfig *KafkaModelConfig, - g *WithT) (*InferKafkaHandler, *InferWorker) { + g *WithT, +) (*InferKafkaHandler, *InferWorker) { conn, _ := grpc.NewClient("passthrough://", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return grpcServer.listener.Dial() }), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -446,7 +447,7 @@ func TestProcessRequest(t *testing.T) { timeout: util.InferTimeoutDefault, }, { - name: "grpc request with json header treated as json", //TODO maybe fail in this case as it will fail at server + name: "grpc request with json header treated as json", // TODO maybe fail in this case as it will fail at server job: &InferWork{ modelName: "foo", headers: map[string]string{HeaderKeyType: HeaderValueJsonReq},