Skip to content

Commit

Permalink
Merge pull request containerd#111 from klihub/devel/split-sync-messages
Browse files Browse the repository at this point in the history
fixes: use multiple sync messages if we hit ttrpc max message limit.
  • Loading branch information
fuweid authored Oct 11, 2024
2 parents 43b5c7f + 159f575 commit 229236e
Show file tree
Hide file tree
Showing 26 changed files with 867 additions and 621 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ module github.com/containerd/nri
go 1.21

require (
github.com/containerd/ttrpc v1.2.3
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287
github.com/moby/sys/mountinfo v0.6.2
github.com/onsi/ginkgo/v2 v2.19.1
github.com/onsi/gomega v1.34.0
github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb
github.com/opencontainers/runtime-tools v0.9.0
github.com/sirupsen/logrus v1.8.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
golang.org/x/sys v0.21.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.34.1
k8s.io/cri-api v0.25.3
sigs.k8s.io/yaml v1.3.0
)

require (
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
Expand All @@ -31,7 +33,6 @@ require (
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect
google.golang.org/grpc v1.57.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
12 changes: 9 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/containerd/ttrpc v1.2.3 h1:4jlhbXIGvijRtNC8F/5CpuJZ7yKOBFGFOOXg1bkISz0=
github.com/containerd/ttrpc v1.2.3/go.mod h1:ieWsXucbb8Mj9PH0rXCw1i8IunRbbAiDkpXkbfflWBM=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287 h1:zwv64tCdT888KxuXQuv5i36cEdljoXq3sVqLmOEbCQI=
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -48,11 +50,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI=
Expand Down Expand Up @@ -86,6 +90,7 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -115,6 +120,7 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8X
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/cri-api v0.25.3 h1:YaiQ05CM4+5L2DAz0KoSa4sv4/VlQvLbf3WHKICPSXs=
Expand Down
100 changes: 92 additions & 8 deletions pkg/adaptation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/containerd/nri/pkg/net"
"github.com/containerd/nri/pkg/net/multiplex"
"github.com/containerd/ttrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -414,19 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout())
defer cancel()

req := &SynchronizeRequest{
Pods: pods,
Containers: containers,
}
rpl, err := p.stub.Synchronize(ctx, req)
if err != nil {
p.close()
return nil, err
var (
podsToSend = pods
ctrsToSend = containers
podsPerMsg = len(pods)
ctrsPerMsg = len(containers)

rpl *SynchronizeResponse
err error
)

for {
req := &SynchronizeRequest{
Pods: podsToSend[:podsPerMsg],
Containers: ctrsToSend[:ctrsPerMsg],
More: len(podsToSend) > podsPerMsg || len(ctrsToSend) > ctrsPerMsg,
}

log.Debugf(ctx, "sending sync message, %d/%d, %d/%d (more: %v)",
len(req.Pods), len(podsToSend), len(req.Containers), len(ctrsToSend), req.More)

rpl, err = p.stub.Synchronize(ctx, req)
if err == nil {
if !req.More {
break
}

if len(rpl.Update) > 0 || rpl.More != req.More {
p.close()
return nil, fmt.Errorf("plugin does not handle split sync requests")
}

podsToSend = podsToSend[podsPerMsg:]
ctrsToSend = ctrsToSend[ctrsPerMsg:]

if podsPerMsg > len(podsToSend) {
podsPerMsg = len(podsToSend)
}
if ctrsPerMsg > len(ctrsToSend) {
ctrsPerMsg = len(ctrsToSend)
}
} else {
podsPerMsg, ctrsPerMsg, err = recalcObjsPerSyncMsg(podsPerMsg, ctrsPerMsg, err)
if err != nil {
p.close()
return nil, err
}

log.Debugf(ctx, "oversized message, retrying in smaller chunks")
}
}

return rpl.Update, nil
}

func recalcObjsPerSyncMsg(pods, ctrs int, err error) (int, int, error) {
const (
minObjsPerMsg = 8
)

if status.Code(err) != codes.ResourceExhausted {
return pods, ctrs, err
}

if pods+ctrs <= minObjsPerMsg {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}

var e *ttrpc.OversizedMessageErr
if !errors.As(err, &e) {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}

maxLen := e.MaximumLength()
msgLen := e.RejectedLength()

if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}

factor := float64(maxLen) / float64(msgLen)
if factor > 0.9 {
factor = 0.9
}

pods = int(float64(pods) * factor)
ctrs = int(float64(ctrs) * factor)

if pods+ctrs < minObjsPerMsg {
pods = minObjsPerMsg / 2
ctrs = minObjsPerMsg / 2
}

return pods, ctrs, nil
}

// Relay CreateContainer request to plugin.
func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
if !p.events.IsSet(Event_CREATE_CONTAINER) {
Expand Down
Loading

0 comments on commit 229236e

Please sign in to comment.