Skip to content

Commit

Permalink
This PR implements AdmissionOptions.ApplyTo
Browse files Browse the repository at this point in the history
ApplyTo adds the admission chain to the server configuration the method lazily initializes a generic plugin
that is appended to the list of pluginInitializers.

apiserver.Config will hold an instance of SharedInformerFactory to ensure we only have once instance.
The field will be initialized in apisever.SecureServingOptions
  • Loading branch information
p0lyn0mial committed May 14, 2017
1 parent 4807e6c commit 8cea69a
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 37 deletions.
2 changes: 2 additions & 0 deletions cmd/kube-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

Expand Down
26 changes: 14 additions & 12 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,39 +359,41 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}

genericConfig.AdmissionControl, err = BuildAdmission(s,
s.Admission.Plugins,
pluginInitializer, err := BuildAdmissionPluginInitializer(
s,
client,
sharedInformers,
genericConfig.Authorizer,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}

err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, sharedInformers, insecureServingOptions, nil
}

// BuildAdmission constructs the admission chain
func BuildAdmission(s *options.ServerRunOptions, plugins *admission.Plugins, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) {
admissionControlPluginNames := strings.Split(s.Admission.Control, ",")
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
var cloudConfig []byte
var err error

if s.CloudProvider.CloudConfigFile != "" {
var err error
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}

// TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615.
restMapper := api.Registry.RESTMapper()
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to read plugin config: %v", err)
}
return plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
return pluginInitializer, nil
}

// BuildAuthenticator constructs the authenticator
Expand Down
2 changes: 2 additions & 0 deletions federation/cmd/federation-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

Expand Down
15 changes: 6 additions & 9 deletions federation/cmd/federation-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
Expand Down Expand Up @@ -185,21 +184,20 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
return fmt.Errorf("invalid Authorization Config: %v", err)
}

admissionControlPluginNames := strings.Split(s.Admission.Control, ",")
var cloudConfig []byte

if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}

pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}
admissionController, err := kubeapiserveradmission.Plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)

err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer,
)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %v", err)
}
Expand All @@ -208,7 +206,6 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
genericConfig.Version = &kubeVersion
genericConfig.Authenticator = apiAuthenticator
genericConfig.Authorizer = apiAuthorizer
genericConfig.AdmissionControl = admissionController
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, api.Scheme)
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
Expand Down
8 changes: 8 additions & 0 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
Expand Down Expand Up @@ -101,6 +103,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
TLSClientConfig: &tls.Config{},
})

clientset, err := kubernetes.NewForConfig(config.GenericConfig.LoopbackClientConfig)
if err != nil {
t.Fatalf("unable to create client set due to %v", err)
}
config.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.GenericConfig.LoopbackClientConfig.Timeout)

return server, *config, assert.New(t)
}

Expand Down
13 changes: 13 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert"

Expand Down Expand Up @@ -109,6 +110,8 @@ type Config struct {
// Will default to a value based on secure serving info and available ipv4 IPs.
ExternalAddress string

// SharedInformerFactory provides shared informers for resources
SharedInformerFactory informers.SharedInformerFactory
//===========================================================================
// Fields you probably don't care about changing
//===========================================================================
Expand Down Expand Up @@ -405,6 +408,16 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
s.postStartHooks[k] = v
}

genericApiServerHookName := "generic-apiserver-start-informers"
if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
Expand Down
10 changes: 10 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
)

Expand All @@ -38,6 +40,11 @@ func TestNewWithDelegate(t *testing.T) {
delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
delegateConfig.LoopbackClientConfig = &rest.Config{}
delegateConfig.SwaggerConfig = DefaultSwaggerConfig()
clientset := fake.NewSimpleClientset()
if clientset == nil {
t.Fatal("unable to create fake client set")
}
delegateConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout)

delegateHealthzCalled := false
delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error {
Expand Down Expand Up @@ -66,6 +73,7 @@ func TestNewWithDelegate(t *testing.T) {
wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
wrappingConfig.LoopbackClientConfig = &rest.Config{}
wrappingConfig.SwaggerConfig = DefaultSwaggerConfig()
wrappingConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout)

wrappingHealthzCalled := false
wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error {
Expand Down Expand Up @@ -102,6 +110,7 @@ func TestNewWithDelegate(t *testing.T) {
"/healthz/delegate-health",
"/healthz/ping",
"/healthz/poststarthook/delegate-post-start-hook",
"/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/wrapping-post-start-hook",
"/healthz/wrapping-health",
"/swaggerapi/"
Expand All @@ -110,6 +119,7 @@ func TestNewWithDelegate(t *testing.T) {
checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok
[-]wrapping-health failed: reason withheld
[-]delegate-health failed: reason withheld
[+]poststarthook/generic-apiserver-start-informers ok
[+]poststarthook/delegate-post-start-hook ok
[+]poststarthook/wrapping-post-start-hook ok
healthz check failed
Expand Down
16 changes: 10 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
)

Expand Down Expand Up @@ -90,6 +92,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
config.LegacyAPIGroupPrefixes = sets.NewString("/api")
config.LoopbackClientConfig = &restclient.Config{}

clientset := fake.NewSimpleClientset()
if clientset == nil {
t.Fatal("unable to create fake client set")
}
config.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout)

// TODO restore this test, but right now, eliminate our cycle
// config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme())
// config.OpenAPIConfig.Info = &spec.Info{
Expand Down Expand Up @@ -300,17 +308,13 @@ func TestPrepareRun(t *testing.T) {
defer etcdserver.Terminate(t)

assert.NotNil(config.SwaggerConfig)
// assert.NotNil(config.OpenAPIConfig)

server := httptest.NewServer(s.Handler.GoRestfulContainer.ServeMux)
defer server.Close()
done := make(chan struct{})

s.PrepareRun()

// openapi is installed in PrepareRun
// resp, err := http.Get(server.URL + "/swagger.json")
// assert.NoError(err)
// assert.Equal(http.StatusOK, resp.StatusCode)
s.RunPostStartHooks(done)

// swagger is installed in PrepareRun
resp, err := http.Get(server.URL + "/swaggerapi/")
Expand Down
9 changes: 8 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/server/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
}
}

// isHookRegistered checks whether a given hook is registered
func (s *GenericAPIServer) isHookRegistered(name string) bool {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
_, exists := s.postStartHooks[name]
return exists
}

func runPostStartHook(name string, entry postStartHookEntry, context PostStartHookContext) {
var err error
func() {
Expand All @@ -117,7 +125,6 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo
if err != nil {
glog.Fatalf("PostStartHook %q failed: %v", name, err)
}

close(entry.done)
}

Expand Down
51 changes: 44 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/server/options/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,70 @@ limitations under the License.
package options

import (
"fmt"
"strings"

"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
)

// AdmissionOptions holds the admission options
type AdmissionOptions struct {
Control string
ControlConfigFile string
Plugins *admission.Plugins
PluginNames []string
ConfigFile string
Plugins *admission.Plugins
}

// NewAdmissionOptions creates a new instance of AdmissionOptions
func NewAdmissionOptions(plugins *admission.Plugins) *AdmissionOptions {
return &AdmissionOptions{
Plugins: plugins,
Control: "AlwaysAdmit",
Plugins: plugins,
PluginNames: []string{},
}
}

// AddFlags adds flags related to admission for a specific APIServer to the specified FlagSet
func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.Control, "admission-control", a.Control, ""+
fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, ""+
"Ordered list of plug-ins to do admission control of resources into cluster. "+
"Comma-delimited list of: "+strings.Join(a.Plugins.Registered(), ", ")+".")

fs.StringVar(&a.ControlConfigFile, "admission-control-config-file", a.ControlConfigFile,
fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile,
"File with admission control configuration.")
}

// ApplyTo adds the admission chain to the server configuration
// the method lazily initializes a generic plugin that is appended to the list of pluginInitializers
// note this method uses:
// genericconfig.LoopbackClientConfig
// genericconfig.SharedInformerFactory
// genericconfig.Authorizer
func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers ...admission.PluginInitializer) error {
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}

clientset, err := kubernetes.NewForConfig(serverCfg.LoopbackClientConfig)
if err != nil {
return err
}
genericInitializer, err := initializer.New(clientset, serverCfg.SharedInformerFactory, serverCfg.Authorizer)
if err != nil {
return err
}
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
initializersChain = append(initializersChain, pluginInitializers...)

admissionChain, err := a.Plugins.NewFromPlugins(a.PluginNames, pluginsConfigProvider, initializersChain)
if err != nil {
return err
}

serverCfg.AdmissionControl = admissionChain
return nil
}
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/options/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
certutil "k8s.io/client-go/util/cert"
)

Expand Down Expand Up @@ -167,6 +169,13 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert
}

// create shared informers
clientset, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return err
}
c.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, c.LoopbackClientConfig.Timeout)

return nil
}

Expand Down
1 change: 0 additions & 1 deletion test/integration/federation/framework/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func startServer(t *testing.T, runOptions *options.ServerRunOptions, stopChan <-
}

runOptions.InsecureServing.BindPort = port

err = app.NonBlockingRun(runOptions, stopChan)
if err != nil {
t.Logf("Error starting the %s: %v", apiNoun, err)
Expand Down
Loading

0 comments on commit 8cea69a

Please sign in to comment.