Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #839 from Random-Liu/cherrypick-#761
Browse files Browse the repository at this point in the history
Backport #761 to release/1.0
  • Loading branch information
Random-Liu committed Jul 11, 2018
2 parents 40007b3 + 8347d40 commit 1895a01
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 108 deletions.
5 changes: 5 additions & 0 deletions docs/config.md
Expand Up @@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows:
# enable_tls_streaming enables the TLS streaming support.
enable_tls_streaming = false

# max_container_log_line_size is the maximum log line size in bytes for a container.
# Log line longer than the limit will be split into multiple lines. -1 means no
# limit.
max_container_log_line_size = 16384

# "plugins.cri.containerd" contains config related to containerd
[plugins.cri.containerd]

Expand Down
2 changes: 1 addition & 1 deletion hack/verify-lint.sh
Expand Up @@ -20,7 +20,7 @@ set -o pipefail
for d in $(find . -type d -a \( -iwholename './pkg*' -o -iwholename './cmd*' \) -not -iwholename './pkg/api*'); do
echo for directory ${d} ...
gometalinter \
--exclude='error return value not checked.*(Close|Log|Print).*\(errcheck\)$' \
--exclude='error return value not checked.*(Close|Log|Print|Fprint).*\(errcheck\)$' \
--exclude='.*_test\.go:.*error return value not checked.*\(errcheck\)$' \
--exclude='duplicate of.*_test.go.*\(dupl\)$' \
--exclude='.*/mock_.*\.go:.*\(golint\)$' \
Expand Down
113 changes: 113 additions & 0 deletions integration/container_log_test.go
@@ -0,0 +1,113 @@
/*
Copyright 2018 The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)

func TestLongContainerLog(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log")
require.NoError(t, err)
defer os.RemoveAll(testPodLogDir)

t.Log("Create a sandbox with log directory")
sbConfig := PodSandboxConfig("sandbox", "long-container-log",
WithPodLogDirectory(testPodLogDir),
)
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

const (
testImage = "busybox"
containerName = "test-container"
)
t.Logf("Pull test image %q", testImage)
img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()

t.Log("Create a container with log path")
config, err := CRIConfig()
require.NoError(t, err)
maxSize := config.MaxContainerLogLineSize
shortLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize-1, "a")
maxLenLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize, "b")
longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c")
cnConfig := ContainerConfig(
containerName,
"busybox",
WithCommand("sh", "-c",
fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)

t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))

t.Log("Wait for container to finish running")
require.NoError(t, Eventually(func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
if s.GetState() == runtime.ContainerState_CONTAINER_EXITED {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))

t.Log("Check container log")
content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName))
assert.NoError(t, err)
checkContainerLog(t, string(content), []string{
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("a", maxSize-1)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("b", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, strings.Repeat("c", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, "c"),
})
}

func checkContainerLog(t *testing.T, log string, messages []string) {
lines := strings.Split(strings.TrimSpace(log), "\n")
require.Len(t, lines, len(messages), "log line number should match")
for i, line := range lines {
parts := strings.SplitN(line, " ", 2)
require.Len(t, parts, 2)
_, err := time.Parse(time.RFC3339Nano, parts[0])
assert.NoError(t, err, "timestamp should be in RFC3339Nano format")
assert.Equal(t, messages[i], parts[1], "log content should match")
}
}
20 changes: 10 additions & 10 deletions integration/container_update_resources_test.go
Expand Up @@ -37,7 +37,7 @@ func checkMemoryLimit(t *testing.T, spec *runtimespec.Spec, memLimit int64) {
}

func TestUpdateContainerResources(t *testing.T) {
t.Logf("Create a sandbox")
t.Log("Create a sandbox")
sbConfig := PodSandboxConfig("sandbox", "update-container-resources")
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
Expand All @@ -46,7 +46,7 @@ func TestUpdateContainerResources(t *testing.T) {
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

t.Logf("Create a container with memory limit")
t.Log("Create a container with memory limit")
cnConfig := ContainerConfig(
"container",
pauseImage,
Expand All @@ -57,48 +57,48 @@ func TestUpdateContainerResources(t *testing.T) {
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
container, err := containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err := container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 2*1024*1024)

t.Logf("Update container memory limit after created")
t.Log("Update container memory limit after created")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 4 * 1024 * 1024,
})
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 4*1024*1024)

t.Logf("Start the container")
t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
task, err := container.Task(context.Background(), nil)
require.NoError(t, err)

t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid())))
require.NoError(t, err)
stat, err := cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(4*1024*1024), stat.Memory.Usage.Limit)

t.Logf("Update container memory limit after started")
t.Log("Update container memory limit after started")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 8 * 1024 * 1024,
})
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 8*1024*1024)

t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
stat, err = cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(8*1024*1024), stat.Memory.Usage.Limit)
Expand Down
84 changes: 64 additions & 20 deletions integration/test_utils.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package integration

import (
"context"
"encoding/json"
"flag"
"fmt"
"os/exec"
Expand All @@ -27,12 +29,15 @@ import (
"github.com/containerd/containerd"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/remote"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"

api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/client"
criconfig "github.com/containerd/cri/pkg/config"
"github.com/containerd/cri/pkg/constants"
"github.com/containerd/cri/pkg/util"
)
Expand Down Expand Up @@ -97,6 +102,7 @@ func ConnectDaemons() error {
// Opts sets specific information in pod sandbox config.
type PodSandboxOpts func(*runtime.PodSandboxConfig)

// Set host network.
func WithHostNetwork(p *runtime.PodSandboxConfig) {
if p.Linux == nil {
p.Linux = &runtime.LinuxPodSandboxConfig{}
Expand All @@ -111,6 +117,13 @@ func WithHostNetwork(p *runtime.PodSandboxConfig) {
}
}

// Add pod log directory.
func WithPodLogDirectory(dir string) PodSandboxOpts {
return func(p *runtime.PodSandboxConfig) {
p.LogDirectory = dir
}
}

// PodSandboxConfig generates a pod sandbox config for test.
func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig {
config := &runtime.PodSandboxConfig{
Expand All @@ -134,52 +147,59 @@ func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandb
type ContainerOpts func(*runtime.ContainerConfig)

func WithTestLabels() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Labels = map[string]string{"key": "value"}
return func(c *runtime.ContainerConfig) {
c.Labels = map[string]string{"key": "value"}
}
}

func WithTestAnnotations() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Annotations = map[string]string{"a.b.c": "test"}
return func(c *runtime.ContainerConfig) {
c.Annotations = map[string]string{"a.b.c": "test"}
}
}

// Add container resource limits.
func WithResources(r *runtime.LinuxContainerResources) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
cf.Linux.Resources = r
c.Linux.Resources = r
}
}

// Add container command.
func WithCommand(c string, args ...string) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Command = []string{c}
cf.Args = args
func WithCommand(cmd string, args ...string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.Command = []string{cmd}
c.Args = args
}
}

// Add pid namespace mode.
func WithPidNamespace(mode runtime.NamespaceMode) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
if cf.Linux.SecurityContext == nil {
cf.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
if c.Linux.SecurityContext == nil {
c.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
}
if cf.Linux.SecurityContext.NamespaceOptions == nil {
cf.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
if c.Linux.SecurityContext.NamespaceOptions == nil {
c.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
}
cf.Linux.SecurityContext.NamespaceOptions.Pid = mode
c.Linux.SecurityContext.NamespaceOptions.Pid = mode
}

}

// Add container log path.
func WithLogPath(path string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.LogPath = path
}
}

// ContainerConfig creates a container config given a name and image name
// and additional container config options
func ContainerConfig(name, image string, opts ...ContainerOpts) *runtime.ContainerConfig {
Expand Down Expand Up @@ -244,3 +264,27 @@ func PidOf(name string) (int, error) {
}
return strconv.Atoi(output)
}

// CRIConfig gets current cri config from containerd.
func CRIConfig() (*criconfig.Config, error) {
addr, dialer, err := kubeletutil.GetAddressAndDialer(*criEndpoint)
if err != nil {
return nil, errors.Wrap(err, "failed to get dialer")
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer))
if err != nil {
return nil, errors.Wrap(err, "failed to connect cri endpoint")
}
client := runtime.NewRuntimeServiceClient(conn)
resp, err := client.Status(ctx, &runtime.StatusRequest{Verbose: true})
if err != nil {
return nil, errors.Wrap(err, "failed to get status")
}
config := &criconfig.Config{}
if err := json.Unmarshal([]byte(resp.Info["config"]), config); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal config")
}
return config, nil
}
19 changes: 12 additions & 7 deletions pkg/config/config.go
Expand Up @@ -96,6 +96,10 @@ type PluginConfig struct {
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
}

// Config contains all configurations for cri server.
Expand Down Expand Up @@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
Root: "",
},
},
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{
Mirrors: map[string]Mirror{
"docker.io": {
Expand Down

0 comments on commit 1895a01

Please sign in to comment.