Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(envoy): add clusters before updating routes (2) #6145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ the Change License after the Change Date as each is defined in accordance with t
package main

import (
"context"
"flag"
"math/rand"
"os"
"os/signal"
"syscall"
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoyServerControlPlaneV3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
log "github.com/sirupsen/logrus"

kafka_config "github.com/seldonio/seldon-core/components/kafka/v2/pkg/config"
Expand Down Expand Up @@ -166,9 +163,6 @@ func main() {
defer eventHub.Close()
go makeSignalHandler(logger, done)

// Create a cache
xdsCache := cache.NewSnapshotCache(true, cache.IDHash{}, logger)

tracer, err := tracing.NewTraceProvider("seldon-scheduler", &tracingConfigPath, logger)
if err != nil {
logger.WithError(err).Fatalf("Failed to configure otel tracer")
Expand All @@ -189,7 +183,7 @@ func main() {
}

// Create envoy incremental processor
_, err = processor.NewIncrementalProcessor(xdsCache, nodeID, logger, ss, es, ps, eventHub, &pipelineGatewayDetails, cleaner)
incrementalProcessor, err := processor.NewIncrementalProcessor(nodeID, logger, ss, es, ps, eventHub, &pipelineGatewayDetails, cleaner)
if err != nil {
log.WithError(err).Fatalf("Failed to create incremental processor")
}
Expand Down Expand Up @@ -266,11 +260,8 @@ func main() {
// extra wait to allow routes state to get created
time.Sleep(xDSWaitTimeout)

// Start envoy xDS server, this is done after the scheduler is ready
// so that the xDS server can start sending valid updates to envoy.
ctx := context.Background()
srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil)
xdsServer := processor.NewXDSServer(srv, logger)
// create the processor separately, so it receives all updates
xdsServer := processor.NewXDSServer(incrementalProcessor, logger)
err = xdsServer.StartXDSServer(envoyPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start envoy xDS server")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
goos: linux
goarch: amd64
pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor
cpu: 12th Gen Intel(R) Core(TM) i7-12800H
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 112 10239625 ns/op 5885041 B/op 82877 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 98 12020676 ns/op 11498158 B/op 148125 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 28 46546621 ns/op 129625468 B/op 1804667 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 3 412763452 ns/op 1168723194 B/op 17179528 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 99 12240680 ns/op 53776819 B/op 819509 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 37 33453806 ns/op 170619219 B/op 2398968 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 4 317292666 ns/op 1696447430 B/op 24553734 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 2133085451 ns/op 6621219192 B/op 99532927 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 10000 133839 ns/op 437098 B/op 3280 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 10 102003642 ns/op 164723263 B/op 2314898 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 115990545 ns/op 239089298 B/op 3302609 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 2 532899776 ns/op 1594935896 B/op 22465678 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 10 102020346 ns/op 203750677 B/op 3057694 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 114302062 ns/op 239748963 B/op 3458251 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 301749396 ns/op 926810037 B/op 11268314 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 2109146258 ns/op 4159537240 B/op 58957736 allocs/op
PASS
ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 32.254s
driev marked this conversation as resolved.
Show resolved Hide resolved
80 changes: 14 additions & 66 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@ the Change License after the Change Date as each is defined in accordance with t
package processor

import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
Expand All @@ -40,10 +34,7 @@ const (
)

type IncrementalProcessor struct {
cache cache.SnapshotCache
nodeID string
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
nodeID string
logger logrus.FieldLogger
xdsCache *xdscache.SeldonXDSCache
mu sync.RWMutex
Expand All @@ -64,22 +55,24 @@ type pendingModelVersion struct {
}

func NewIncrementalProcessor(
cache cache.SnapshotCache,
nodeID string,
log logrus.FieldLogger,
logger logrus.FieldLogger,
modelStore store.ModelStore,
experimentServer experiment.ExperimentServer,
pipelineHandler pipeline.PipelineHandler,
hub *coordinator.EventHub,
pipelineGatewayDetails *xdscache.PipelineGatewayDetails,
versionCleaner cleaner.ModelVersionCleaner,
) (*IncrementalProcessor, error) {
xdsCache, err := xdscache.NewSeldonXDSCache(logger, pipelineGatewayDetails)
if err != nil {
return nil, err
}

ip := &IncrementalProcessor{
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
logger: log.WithField("source", "IncrementalProcessor"),
xdsCache: xdscache.NewSeldonXDSCache(log, pipelineGatewayDetails),
logger: logger.WithField("source", "IncrementalProcessor"),
xdsCache: xdsCache,
modelStore: modelStore,
experimentServer: experimentServer,
pipelineHandler: pipelineHandler,
Expand All @@ -90,11 +83,6 @@ func NewIncrementalProcessor(
batchTriggerManual: nil,
}

err := ip.init()
if err != nil {
return nil, err
}

hub.RegisterModelEventHandler(
modelEventHandlerName,
pendingSyncsQueueSize,
Expand Down Expand Up @@ -185,57 +173,17 @@ func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg
}()
}

func (p *IncrementalProcessor) init() error {
p.mu.Lock()
defer p.mu.Unlock()
err := p.xdsCache.SetupTLS()
if err != nil {
return err
}
p.xdsCache.AddPermanentListeners()
p.xdsCache.AddPermanentClusters()
return nil
}

// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *IncrementalProcessor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
}

// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
}

func (p *IncrementalProcessor) updateEnvoy() error {
logger := p.logger.WithField("func", "updateEnvoy")
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(
p.newSnapshotVersion(), // version
map[rsrc.Type][]types.Resource{
rsrc.ClusterType: p.xdsCache.ClusterContents(), // clusters
rsrc.RouteType: p.xdsCache.RouteContents(), // routes
rsrc.ListenerType: p.xdsCache.ListenerContents(), // listeners
rsrc.SecretType: p.xdsCache.SecretContents(), // Secrets
})
// sequencing is important here - send clusters before updating routes and removing old clusters
err := p.xdsCache.AddClusters()
if err != nil {
return err
}

if err := snapshot.Consistent(); err != nil {
return err
driev marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Debugf("will serve snapshot %+v", snapshot)

// Add the snapshot to the cache
if err := p.cache.SetSnapshot(context.Background(), p.nodeID, snapshot); err != nil {
err = p.xdsCache.UpdateRoutes(p.nodeID)
if err != nil {
return err
}

return nil
return p.xdsCache.RemoveClusters()
driev marked this conversation as resolved.
Show resolved Hide resolved
}

// This function does not call `updateEnvoy` directly and therefore callers should make sure
Expand Down
14 changes: 12 additions & 2 deletions scheduler/pkg/envoy/processor/incremental_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"testing"
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -125,7 +124,6 @@ func benchmarkModelUpdate(
memoryStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
pipelineStore := pipeline.NewPipelineStore(logger, eventHub, memoryStore)
ip, err := NewIncrementalProcessor(
cache.NewSnapshotCache(false, cache.IDHash{}, logger),
"some node",
logger,
memoryStore,
Expand Down Expand Up @@ -163,12 +161,15 @@ func benchmarkModelUpdate(
func BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 10, 1, 1, 10)
}

func BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 100, 1, 1, 10)
}

func BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 1_000, 1, 1, 10)
}

func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 10_000, 1, 1, 10)
}
Expand All @@ -177,12 +178,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms(b *testing.B) {
func BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 10, 1, 10, 10)
}

func BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 100, 1, 10, 10)
}

func BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 1_000, 1, 10, 10)
}

func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms(b *testing.B) {
benchmarkModelUpdate(b, 10_000, 1, 10, 10)
}
Expand All @@ -191,12 +195,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms(b *testing.B) {
func BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 1, 1, 1, 100)
}

func BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 100, 1, 1, 100)
}

func BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 1_000, 1, 1, 100)
}

func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 10_000, 1, 1, 100)
}
Expand All @@ -205,12 +212,15 @@ func BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms(b *testing.B) {
func BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 10, 1, 10, 100)
}

func BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 100, 1, 10, 100)
}

func BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 1_000, 1, 10, 100)
}

func BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms(b *testing.B) {
benchmarkModelUpdate(b, 10_000, 1, 10, 100)
}
Loading
Loading