Skip to content

Commit

Permalink
Merge pull request #2956 from google/containerd-cri
Browse files Browse the repository at this point in the history
Integrate CRI and containerd file system stats
  • Loading branch information
bobbypage authored Oct 11, 2021
2 parents 167d846 + c379491 commit 0ab27c2
Show file tree
Hide file tree
Showing 28 changed files with 73,931 additions and 79 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ docker-build:
presubmit: vet
@echo ">> checking go formatting"
@./build/check_gofmt.sh
@echo ">> checking CRI definitions"
@./build/check_cri_defs.sh
@echo ">> checking go mod tidy"
@./build/check_gotidy.sh
@echo ">> checking file boilerplate"
Expand Down
2 changes: 1 addition & 1 deletion build/boilerplate/boilerplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def file_passes(filename, refs, regexs):
def file_extension(filename):
return os.path.splitext(filename)[1].split(".")[-1].lower()

skipped_dirs = ['Godeps', 'vendor', 'third_party', '_gopath', '_output', '.git']
skipped_dirs = ['Godeps', 'vendor', 'third_party', '_gopath', '_output', '.git', 'cri-api']
def normalize_files(files):
newfiles = []
for pathname in files:
Expand Down
41 changes: 41 additions & 0 deletions build/check_cri_defs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash

# Copyright 2021 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Presubmit script that ensures that https://github.com/kubernetes/cri-api
# definitions that are copied into cAdvisor codebase exactly match the ones upstream.

set -e

function check_git_dirty() {
if ! [ -z "$(git status --porcelain)" ]; then
echo ">>> working tree is not clean"
echo ">>> git status:"
echo "$(git status)"
exit 1
fi
}

GIT_ROOT=$(dirname "${BASH_SOURCE}")/..

check_git_dirty

echo ">>> updating k8s CRI definitions..."
"${GIT_ROOT}/build/update_cri_defs.sh"

# If k8s CRI definitions were manually modified or don't match the ones
# upstream, git tree will be unclean.
echo ">>> checking git tree clean after updating k8s CRI definitions..."
check_git_dirty
30 changes: 30 additions & 0 deletions build/update_cri_defs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

# Copyright 2021 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

CRI_VERSION="release-1.22"
GIT_ROOT=$(dirname "${BASH_SOURCE}")/..

TMP_DIR="$(mktemp -d)"
git clone --depth=1 https://github.com/kubernetes/cri-api.git --branch="${CRI_VERSION}" "${TMP_DIR}"

# rewrite import paths
find "${TMP_DIR}" -type f -name "*.go" -exec sed -i 's:k8s.io/cri-api:github.com/google/cadvisor/cri-api:g' {} +

mkdir -p "${GIT_ROOT}/cri-api"
cp -r "${TMP_DIR}/pkg" "${GIT_ROOT}/cri-api"
rm -rf "${TMP_DIR}"
2 changes: 1 addition & 1 deletion cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/mesos/mesos-go v0.0.7-0.20180413204204-29de6ff97b48
github.com/pquerna/ffjson v0.0.0-20171002144729-d49c2bc1aa13 // indirect
github.com/prometheus/client_golang v1.8.0
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
google.golang.org/api v0.34.0
gopkg.in/olivere/elastic.v2 v2.0.12
Expand Down
3 changes: 2 additions & 1 deletion cmd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,9 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
129 changes: 83 additions & 46 deletions container/common/fsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@ type FsUsage struct {
InodeUsage uint64
}

type FsUsageProvider interface {
// Usage returns the fs usage
Usage() (*FsUsage, error)
// Targets returns where the fs usage metric is collected,it maybe a directory, a file or some
// information about the snapshotter(for containerd)
Targets() []string
}

type realFsHandler struct {
sync.RWMutex
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
rootfs string
extraDir string
fsInfo fs.FsInfo
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
usageProvider FsUsageProvider
// Tells the container to stop.
stopChan chan struct{}
}
Expand All @@ -58,56 +64,33 @@ const DefaultPeriod = time.Minute

var _ FsHandler = &realFsHandler{}

func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler {
func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler {
return &realFsHandler{
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
rootfs: rootfs,
extraDir: extraDir,
fsInfo: fsInfo,
stopChan: make(chan struct{}, 1),
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
usageProvider: provider,
stopChan: make(chan struct{}, 1),
}
}

func (fh *realFsHandler) update() error {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)
// TODO(vishh): Add support for external mounts.
if fh.rootfs != "" {
rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs)
}

if fh.extraDir != "" {
extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir)
usage, err := fh.usageProvider.Usage()

if err != nil {
return err
}

// Wait to handle errors until after all operartions are run.
// An error in one will not cause an early return, skipping others
fh.Lock()
defer fh.Unlock()
fh.lastUpdate = time.Now()
if fh.rootfs != "" && rootErr == nil {
fh.usage.InodeUsage = rootUsage.Inodes
fh.usage.BaseUsageBytes = rootUsage.Bytes
fh.usage.TotalUsageBytes = rootUsage.Bytes
}
if fh.extraDir != "" && extraErr == nil {
if fh.rootfs != "" {
fh.usage.TotalUsageBytes += extraUsage.Bytes
} else {
// rootfs is empty, totalUsageBytes use extra usage bytes
fh.usage.TotalUsageBytes = extraUsage.Bytes
}
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}
fh.usage.InodeUsage = usage.InodeUsage
fh.usage.BaseUsageBytes = usage.BaseUsageBytes
fh.usage.TotalUsageBytes = usage.TotalUsageBytes

return nil
}

Expand All @@ -130,7 +113,8 @@ func (fh *realFsHandler) trackUsage() {
// if the long duration is persistent either because of slow
// disk or lots of containers.
longOp = longOp + time.Second
klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp)
klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+
`will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp)
}
select {
case <-fh.stopChan:
Expand All @@ -153,3 +137,56 @@ func (fh *realFsHandler) Usage() FsUsage {
defer fh.RUnlock()
return fh.usage
}

type fsUsageProvider struct {
fsInfo fs.FsInfo
rootFs string
// The directory consumed by the container but outside rootFs, e.g. directory of saving logs
extraDir string
}

func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider {
return &fsUsageProvider{
fsInfo: fsInfo,
rootFs: rootFs,
extraDir: extraDir,
}
}

func (f *fsUsageProvider) Targets() []string {
return []string{f.rootFs, f.extraDir}
}

func (f *fsUsageProvider) Usage() (*FsUsage, error) {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)

if f.rootFs != "" {
rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs)
}

if f.extraDir != "" {
extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir)
}

usage := &FsUsage{}

if f.rootFs != "" && rootErr == nil {
usage.InodeUsage = rootUsage.Inodes
usage.BaseUsageBytes = rootUsage.Bytes
usage.TotalUsageBytes = rootUsage.Bytes
}

if f.extraDir != "" && extraErr == nil {
usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return nil, fmt.Errorf("failed to obtain filesystem usage; rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}

return usage, nil
}
42 changes: 42 additions & 0 deletions container/containerd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
"time"

containersapi "github.com/containerd/containerd/api/services/containers/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshots/v1"
tasksapi "github.com/containerd/containerd/api/services/tasks/v1"
versionapi "github.com/containerd/containerd/api/services/version/v1"
types "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/dialer"
ptypes "github.com/gogo/protobuf/types"
criapi "github.com/google/cadvisor/cri-api/pkg/apis/runtime/v1alpha2"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
Expand All @@ -36,12 +39,17 @@ type client struct {
containerService containersapi.ContainersClient
taskService tasksapi.TasksClient
versionService versionapi.VersionClient
snapshotService snapshotapi.SnapshotsClient
criService criapi.RuntimeServiceClient
}

type ContainerdClient interface {
LoadContainer(ctx context.Context, id string) (*containers.Container, error)
TaskPid(ctx context.Context, id string) (uint32, error)
Version(ctx context.Context) (string, error)
SnapshotMounts(ctx context.Context, snapshotter, key string) ([]*types.Mount, error)
ContainerStatus(ctx context.Context, id string) (*criapi.ContainerStatus, error)
ContainerStats(ctx context.Context, id string) (*criapi.ContainerStats, error)
}

var once sync.Once
Expand Down Expand Up @@ -92,6 +100,8 @@ func Client(address, namespace string) (ContainerdClient, error) {
containerService: containersapi.NewContainersClient(conn),
taskService: tasksapi.NewTasksClient(conn),
versionService: versionapi.NewVersionClient(conn),
snapshotService: snapshotapi.NewSnapshotsClient(conn),
criService: criapi.NewRuntimeServiceClient(conn),
}
})
return ctrdClient, retErr
Expand Down Expand Up @@ -125,6 +135,38 @@ func (c *client) Version(ctx context.Context) (string, error) {
return response.Version, nil
}

func (c *client) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]*types.Mount, error) {
response, err := c.snapshotService.Mounts(ctx, &snapshotapi.MountsRequest{
Snapshotter: snapshotter,
Key: key,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return response.Mounts, nil
}

func (c *client) ContainerStatus(ctx context.Context, id string) (*criapi.ContainerStatus, error) {
response, err := c.criService.ContainerStatus(ctx, &criapi.ContainerStatusRequest{
ContainerId: id,
Verbose: false,
})
if err != nil {
return nil, err
}
return response.Status, nil
}

func (c *client) ContainerStats(ctx context.Context, id string) (*criapi.ContainerStats, error) {
response, err := c.criService.ContainerStats(ctx, &criapi.ContainerStatsRequest{
ContainerId: id,
})
if err != nil {
return nil, err
}
return response.Stats, nil
}

func containerFromProto(containerpb containersapi.Container) *containers.Container {
var runtime containers.RuntimeInfo
if containerpb.Runtime != nil {
Expand Down
Loading

0 comments on commit 0ab27c2

Please sign in to comment.