Skip to content

Commit

Permalink
Add initial implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Kulazhenkov <[email protected]>
  • Loading branch information
ykulazhenkov committed Sep 19, 2023
1 parent fbf7cff commit f43fa15
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 56 deletions.
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,73 @@
# network-operator-init-container
Init container for NVIDIA Network Operator

The network-operator-init-container container has two required command line arguments:

- `--config` path to the configuration file
- `--node-name` name of the k8s node on which this app runs

The configuration file should be in JSON format:

```
{
"safeDriverLoad": {
"enable": true,
"annotation": "some-annotation"
}
}
```

- `safeDriverLoad` - contains settings related to safeDriverLoad feature
- `safeDriverLoad.enable` - enable safeDriveLoad feature
- `safeDriverLoad.annotation` - annotation to use for safeDriverLoad feature


If `safeDriverLoad` feature is enabled then the network-operator-init-container container will set annotation
provided in `safeDriverLoad.annotation` on the Kubernetes Node object identified by `--node-name`.
The container exits with code 0 when the annotation is removed from the Node object.

If `safeDriverLoad` feature is disabled then the container will immediately exit with code 0.

```
NVIDIA Network Operator init container
Usage:
network-operator-init-container [flags]
Config flags:
--config string
path to the configuration file
--node-name string
name of the k8s node on which this app runs
Logging flags:
--log-flush-frequency duration
Maximum number of seconds between log flushes (default 5s)
--log-json-info-buffer-size quantity
[Alpha] In JSON format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can
be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this.
--log-json-split-stream
[Alpha] In JSON format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use
this.
--logging-format string
Sets the log format. Permitted formats: "json" (gated by LoggingBetaOptions), "text". (default "text")
-v, --v Level
number for the log level verbosity
--vmodule pattern=N,...
comma-separated list of pattern=N settings for file-filtered logging (only works for text log format)
General flags:
-h, --help
print help and exit
--version
print version and exit
Kubernetes flags:
--kubeconfig string
Paths to a kubeconfig. Only required if out-of-cluster.
```
5 changes: 5 additions & 0 deletions Taskfile.dist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ vars:
IMAGE_NAME_FULL: "{{.IMAGE_REGISTRY}}/{{.IMAGE_REPOSITORY}}:{{.IMAGE_TAG}}"
# Coverage related vars
COVER_PROFILE: "{{.PROJECT_DIR}}/network-operator-init-container.cover"
# Test related vars
ENVTEST_K8S_VERSION: 1.27.1

includes:
version: ./taskfiles/Version.yaml
Expand Down Expand Up @@ -65,11 +67,14 @@ tasks:

test:
desc: run unit tests
deps:
- install:setup-envtest
vars:
COVER_MODE: atomic
GO_PKGS:
sh: go list ./... | grep -v ".*/mocks"
cmd: |
export KUBEBUILDER_ASSETS=$({{.LOCAL_BIN}}/setup-envtest use {{.ENVTEST_K8S_VERSION}} -p path);
go test -covermode={{.COVER_MODE}} -coverprofile={{.COVER_PROFILE}} {{.GO_PKGS | catLines}}
lint:
Expand Down
165 changes: 154 additions & 11 deletions cmd/network-operator-init-container/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,37 @@ package app
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
// register json format for logger
_ "k8s.io/component-base/logs/json/register"

"github.com/Mellanox/network-operator-init-container/pkg/utils/signals"
"github.com/Mellanox/network-operator-init-container/pkg/utils/version"

"github.com/Mellanox/network-operator-init-container/cmd/network-operator-init-container/app/options"
configPgk "github.com/Mellanox/network-operator-init-container/pkg/config"
"github.com/Mellanox/network-operator-init-container/pkg/utils/version"
)

// NewNetworkOperatorInitContainerCommand creates a new command
func NewNetworkOperatorInitContainerCommand() *cobra.Command {
opts := options.New()
ctx := signals.SetupShutdownSignals()
ctx := ctrl.SetupSignalHandler()

cmd := &cobra.Command{
Use: "network-operator-init-container",
Expand All @@ -46,9 +57,11 @@ func NewNetworkOperatorInitContainerCommand() *cobra.Command {
if err := opts.Validate(); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
klog.EnableContextualLogging(true)

return RunNetworkOperatorInitContainer(klog.NewContext(ctx, klog.NewKlogr()), opts)
conf, err := ctrl.GetConfig()
if err != nil {
return fmt.Errorf("failed to read config for k8s client: %v", err)
}
return RunNetworkOperatorInitContainer(logr.NewContext(ctx, klog.NewKlogr()), conf, opts)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand Down Expand Up @@ -76,9 +89,139 @@ func NewNetworkOperatorInitContainerCommand() *cobra.Command {
}

// RunNetworkOperatorInitContainer runs init container main loop
func RunNetworkOperatorInitContainer(ctx context.Context, opts *options.Options) error {
func RunNetworkOperatorInitContainer(ctx context.Context, config *rest.Config, opts *options.Options) error {
logger := logr.FromContextOrDiscard(ctx)
logger.Info("start network-operator-init-container", "Options", opts)
ctx, cFunc := context.WithCancel(ctx)
defer cFunc()
logger.Info("start network-operator-init-container",
"Options", opts, "Version", version.GetVersionString())
ctrl.SetLogger(logger)

initContCfg, err := configPgk.FromFile(opts.ConfigPath)
if err != nil {
logger.Error(err, "failed to read configuration")
return err
}
logger.Info("network-operator-init-container configuration", "config", initContCfg.String())

if !initContCfg.SafeDriverLoad.Enable {
logger.Info("safe driver loading is disabled, exit")
return nil
}

mgr, err := ctrl.NewManager(config, ctrl.Options{
Metrics: metricsserver.Options{BindAddress: "0"},
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&corev1.Node{}: {Field: fields.ParseSelectorOrDie(
fmt.Sprintf("metadata.name=%s", opts.NodeName))}}},
})
if err != nil {
logger.Error(err, "unable to start manager")
return err
}

k8sClient, err := client.New(config,
client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
logger.Error(err, "failed to create k8sClient client")
return err
}

errCh := make(chan error, 1)

if err = (&NodeReconciler{
ErrCh: errCh,
SafeLoadAnnotation: initContCfg.SafeDriverLoad.Annotation,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "Node")
return err
}

node := &corev1.Node{}
err = k8sClient.Get(ctx, types.NamespacedName{Name: opts.NodeName}, node)
if err != nil {
logger.Error(err, "failed to read node object from the API", "node", opts.NodeName)
return err
}
err = k8sClient.Patch(ctx, node, client.RawPatch(
types.MergePatchType, []byte(
fmt.Sprintf(`{"metadata":{"annotations":{%q: %q}}}`,
initContCfg.SafeDriverLoad.Annotation, "true"))))
if err != nil {
logger.Error(err, "unable to set annotation for node", "node", opts.NodeName)
return err
}

logger.Info("wait for annotation to be removed",
"annotation", initContCfg.SafeDriverLoad.Annotation, "node", opts.NodeName)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "problem running manager")
writeCh(errCh, err)
}
}()
defer wg.Wait()
select {
case <-ctx.Done():
return fmt.Errorf("waiting canceled")
case err = <-errCh:
cFunc()
return err
}
}

// NodeReconciler reconciles Node object
type NodeReconciler struct {
ErrCh chan error
SafeLoadAnnotation string
client.Client
Scheme *runtime.Scheme
}

// Reconcile contains logic to sync Node object
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLog := log.FromContext(ctx).WithValues("annotation", r.SafeLoadAnnotation)

node := &corev1.Node{}
err := r.Client.Get(ctx, req.NamespacedName, node)
if err != nil {
if apiErrors.IsNotFound(err) {
reqLog.Info("Node object not found, exit")
writeCh(r.ErrCh, err)
return ctrl.Result{}, err
}
reqLog.Error(err, "failed to get Node object from the cache")
writeCh(r.ErrCh, err)
return ctrl.Result{}, err
}

if node.GetAnnotations()[r.SafeLoadAnnotation] == "" {
reqLog.Info("annotation removed, unblock loading")
writeCh(r.ErrCh, nil)
return ctrl.Result{}, nil
}
reqLog.Info("annotation still present, waiting")

return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

func writeCh(ch chan error, err error) {
select {
case ch <- err:
default:
}
}

return nil
// SetupWithManager sets up the controller with the Manager.
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
Complete(r)
}
64 changes: 64 additions & 0 deletions cmd/network-operator-init-container/app/app_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2023, NVIDIA CORPORATION & AFFILIATES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app_test

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
)

func TestApp(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Network Operator Init Container Suite")
}

var _ = BeforeSuite(func() {
By("bootstrapping test environment")
testEnv = &envtest.Environment{}
ctx, cFunc = context.WithCancel(context.Background())

var err error
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())

createNode(testNodeName)
})

var _ = AfterSuite(func() {
cFunc()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
Loading

0 comments on commit f43fa15

Please sign in to comment.