Skip to content

Commit

Permalink
kube: prepare for k8s v1.32.0 deps (#51798)
Browse files Browse the repository at this point in the history
* kube: prepare for k8s v1.32.0 deps

This PR updates the Kubernetes proxy and server mock to align with the default content-type change introduced in version v1.32.0 of client-go.

In `k8s.io/client-go`, the client's default content type was changed from `application/json` to `application/vnd.kubernetes.protobuf`.

Kube mock was not equipped to decode protobuf messages, leading to multiple failures.
Additionally, there was a typo where both request content-types were inconsistently mixed.

Signed-off-by: Tiago Silva <[email protected]>

* handle review comments

---------

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Feb 4, 2025
1 parent 6a3093f commit 62bc8c8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 24 deletions.
26 changes: 21 additions & 5 deletions lib/kube/proxy/resource_deletecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ func (f *Forwarder) handleDeleteCollectionReq(req *http.Request, sess *clusterSe

const internalErrStatus = http.StatusInternalServerError
// get content-type value
contentType := responsewriters.GetContentTypeHeader(memWriter.Header())
encoder, decoder, err := newEncoderAndDecoderForContentType(contentType, newClientNegotiator(sess.codecFactory))
deleteRequestContentType := responsewriters.GetContentTypeHeader(req.Header)
deleteRequestEncoder, deleteRequestDecoder, err := newEncoderAndDecoderForContentType(
deleteRequestContentType,
newClientNegotiator(sess.codecFactory),
)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}

deleteOptions, err := parseDeleteCollectionBody(req.Body, decoder)
deleteOptions, err := parseDeleteCollectionBody(req.Body, deleteRequestDecoder)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}
Expand All @@ -142,7 +145,14 @@ func (f *Forwarder) handleDeleteCollectionReq(req *http.Request, sess *clusterSe
// decode memory rw body.
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
obj, err := decodeAndSetGVK(decoder, memWriter.Buffer().Bytes(), nil /* defaults GVK */)
_, listRequestDecoder, err := newEncoderAndDecoderForContentType(
responsewriters.GetContentTypeHeader(memWriter.Header()),
newClientNegotiator(sess.codecFactory),
)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}
obj, err := decodeAndSetGVK(listRequestDecoder, memWriter.Buffer().Bytes(), nil /* defaults GVK */)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}
Expand Down Expand Up @@ -478,8 +488,14 @@ func (f *Forwarder) handleDeleteCollectionReq(req *http.Request, sess *clusterSe
}
// reset the memory buffer.
memWriter.Buffer().Reset()
// set the content type to the response writer to match the delete
// request content type instead of the list request content type.
memWriter.Header().Set(
responsewriters.ContentTypeHeader,
deleteRequestContentType,
)
// encode the filtered response into the memory buffer.
if err := encoder.Encode(obj, memWriter.Buffer()); err != nil {
if err := deleteRequestEncoder.Encode(obj, memWriter.Buffer()); err != nil {
return internalErrStatus, trace.Wrap(err)
}
// copy the output into the user's ResponseWriter and return.
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/testing/kube_server/clusterrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *KubeMockServer) getClusterRole(w http.ResponseWriter, req *http.Request

func (s *KubeMockServer) deleteClusterRole(w http.ResponseWriter, req *http.Request, p httprouter.Params) (any, error) {
name := p.ByName("name")
deleteOpts, err := parseDeleteCollectionBody(req.Body)
deleteOpts, err := parseDeleteCollectionBody(req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/testing/kube_server/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ const (
func (s *KubeMockServer) deleteTeleportRole(w http.ResponseWriter, req *http.Request, p httprouter.Params) (any, error) {
namespace := p.ByName("namespace")
name := p.ByName("name")
deleteOpts, err := parseDeleteCollectionBody(req.Body)
deleteOpts, err := parseDeleteCollectionBody(req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
81 changes: 65 additions & 16 deletions lib/kube/proxy/testing/kube_server/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package kubeserver

import (
"encoding/json"
"io"
"mime"
"net/http"
"path/filepath"
"sort"
Expand All @@ -29,8 +29,11 @@ import (
"github.com/julienschmidt/httprouter"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/kube/proxy/responsewriters"
)

var podList = corev1.PodList{
Expand Down Expand Up @@ -109,7 +112,7 @@ func (s *KubeMockServer) getPod(w http.ResponseWriter, req *http.Request, p http
func (s *KubeMockServer) deletePod(w http.ResponseWriter, req *http.Request, p httprouter.Params) (any, error) {
namespace := p.ByName("namespace")
name := p.ByName("name")
deleteOpts, err := parseDeleteCollectionBody(req.Body)
deleteOpts, err := parseDeleteCollectionBody(req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -131,27 +134,73 @@ func (s *KubeMockServer) deletePod(w http.ResponseWriter, req *http.Request, p h
return nil, trace.NotFound("pod %q not found", filepath.Join(namespace, name))
}

func (s *KubeMockServer) DeletedPods(reqID string) []string {
s.mu.Lock()
key := deletedResource{kind: types.KindKubePod, requestID: reqID}
deleted := make([]string, len(s.deletedResources[key]))
copy(deleted, s.deletedResources[key])
s.mu.Unlock()
sort.Strings(deleted)
return deleted
}

// parseDeleteCollectionBody parses the request body targeted to pod collection
// endpoints.
func parseDeleteCollectionBody(r io.Reader) (metav1.DeleteOptions, error) {
func parseDeleteCollectionBody(r *http.Request) (metav1.DeleteOptions, error) {
into := metav1.DeleteOptions{}
data, err := io.ReadAll(r)
data, err := io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
return into, trace.Wrap(err)
}
if len(data) == 0 {
return into, nil
}
err = json.Unmarshal(data, &into)
return into, trace.Wrap(err)
decoder, err := newDecoderForContentType(r.Header.Get(responsewriters.ContentTypeHeader))
if err != nil {
return into, trace.Wrap(err)
}
objI, _, err := decoder.Decode(data, nil /* defaults */, &into)
if err != nil {
return into, trace.Wrap(err)
}
obj, ok := objI.(*metav1.DeleteOptions)
if !ok {
return into, trace.BadParameter("expected DeleteOptions, got %T", objI)
}
return *obj, trace.Wrap(err)
}

func newDecoderForContentType(contentType string) (runtime.Decoder, error) {
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, trace.Wrap(
err,
"unable to parse %q header %q",
responsewriters.ContentTypeHeader,
contentType,
)
}
negotiator := newClientNegotiator()
dec, err := negotiator.Decoder(mediaType, params)
if err != nil {
return nil, trace.Wrap(err)
}
return dec, nil
}

// newClientNegotiator creates a negotiator that based on `Content-Type` header
// from the Kubernetes API response is able to create a different encoder/decoder.
// Supported content types:
// - "application/json"
// - "application/yaml"
// - "application/vnd.kubernetes.protobuf"
func newClientNegotiator() runtime.ClientNegotiator {
return runtime.NewClientNegotiator(
kubeCodecs.WithoutConversion(),
schema.GroupVersion{
// create a serializer for Kube API v1
Version: "v1",
},
)
}

func (s *KubeMockServer) DeletedPods(reqID string) []string {
s.mu.Lock()
key := deletedResource{kind: types.KindKubePod, requestID: reqID}
deleted := make([]string, len(s.deletedResources[key]))
copy(deleted, s.deletedResources[key])
s.mu.Unlock()
sort.Strings(deleted)
return deleted
}
2 changes: 1 addition & 1 deletion lib/kube/proxy/testing/kube_server/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *KubeMockServer) getSecret(w http.ResponseWriter, req *http.Request, p h
func (s *KubeMockServer) deleteSecret(w http.ResponseWriter, req *http.Request, p httprouter.Params) (any, error) {
namespace := p.ByName("namespace")
name := p.ByName("name")
deleteOpts, err := parseDeleteCollectionBody(req.Body)
deleteOpts, err := parseDeleteCollectionBody(req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down

0 comments on commit 62bc8c8

Please sign in to comment.