Skip to content

Commit

Permalink
fix duplicated log collect for host path bind
Browse files Browse the repository at this point in the history
  • Loading branch information
quanzhao.cqz committed Mar 23, 2018
1 parent 918d3e6 commit 3a6c450
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 54 deletions.
10 changes: 7 additions & 3 deletions Dockerfile.filebeat
Expand Up @@ -11,17 +11,21 @@ RUN go install

FROM alpine:3.6

ENV FILEBEAT_VERSION=6.1.1
ENV FILEBEAT_VERSION=6.1.1-2
COPY assets/glibc/glibc-2.26-r0.apk /tmp/
RUN apk update && \
apk add python && \
apk add ca-certificates && \
apk add wget && \
update-ca-certificates && \
wget http://acs-logging.oss-cn-hangzhou.aliyuncs.com/beats/filebeat/filebeat-${FILEBEAT_VERSION}-linux-x86_64.tar.gz -P /tmp/ && \
mkdir -p /usr/local/filebeat && \
mkdir -p /etc/filebeat /var/lib/filebeat /var/log/filebeat && \
tar zxf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64.tar.gz -C /tmp/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/* /usr/local/filebeat/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/filebeat /usr/bin/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/fields.yml /etc/filebeat/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/kibana /etc/filebeat/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/module /etc/filebeat/ && \
cp -rf /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64/modules.d /etc/filebeat/ && \
apk add --allow-untrusted /tmp/glibc-2.26-r0.apk && \
rm -rf /var/cache/apk/* /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64.tar.gz /tmp/filebeat-${FILEBEAT_VERSION}-linux-x86_64 /tmp/glibc-2.26-r0.apk

Expand Down
1 change: 1 addition & 0 deletions assets/filebeat/config.filebeat
Expand Up @@ -4,6 +4,7 @@ set -e

FILEBEAT_CONFIG=/etc/filebeat/filebeat.yml
if [ -f "$FILEBEAT_CONFIG" ]; then
echo "$FILEBEAT_CONFIG has been existed"
exit
fi

Expand Down
2 changes: 1 addition & 1 deletion examples/pilot-elastisearch-kubernetes-2.yml
Expand Up @@ -19,7 +19,7 @@ spec:
hostNetwork: true
containers:
- name: log-pilot
image: registry.cn-hangzhou.aliyuncs.com/acs-sample/log-pilot:0.9.1-filebeat
image: registry.cn-hangzhou.aliyuncs.com/acs-sample/log-pilot:0.9-filebeat
env:
- name: "FILEBEAT_OUTPUT"
value: "elasticsearch"
Expand Down
22 changes: 22 additions & 0 deletions examples/tomcat.yml
@@ -0,0 +1,22 @@
apiVersion: v1
kind: Pod
metadata:
name: tomcat
spec:
tolerations:
- key: "node-role.kubernetes.io/master"
effect: "NoSchedule"
containers:
- name: tomcat
image: "tomcat:7.0"
env:
- name: aliyun_logs_catalina
value: "stdout"
- name: aliyun_logs_access
value: "/usr/local/tomcat/logs/catalina.*.log"
volumeMounts:
- name: tomcat-log
mountPath: /usr/local/tomcat/logs
volumes:
- name: tomcat-log
emptyDir: {}
141 changes: 95 additions & 46 deletions pilot/filebeat_wrapper.go
Expand Up @@ -10,28 +10,36 @@ import (
"os/exec"
"path/filepath"
"time"
"regexp"
"io/ioutil"
"strings"
)

const PILOT_FILEBEAT = "filebeat"
const FILEBEAT_HOME = "/usr/local/filebeat"
const FILEBEAT_EXEC_BIN = "/usr/bin/filebeat"
const FILEBEAT_CONF_HOME = "/etc/filebeat"
const FILEBEAT_CONF_DIR = FILEBEAT_CONF_HOME + "/prospectors.d"
const FILEBEAT_CONF_FILE = FILEBEAT_CONF_HOME + "/filebeat.yml"
const FILEBEAT_EXEC_BIN = FILEBEAT_HOME + "/filebeat"
const FILEBEAT_REGISTRY_FILE = "/var/lib/filebeat/registry"
const FILEBEAT_LOG_DIR = "/var/log/filebeat"

const DOCKER_HOME_PATH = "/var/lib/docker/"
const KUBELET_HOME_PATH = "/var/lib/kubelet/"

var filebeat *exec.Cmd

type FilebeatPiloter struct {
name string
base string
watchDone chan bool
watchDuration time.Duration
watchContainer map[string]string
}

func NewFilebeatPiloter() (Piloter, error) {
func NewFilebeatPiloter(base string) (Piloter, error) {
return &FilebeatPiloter{
name: PILOT_FILEBEAT,
base: base,
watchDone: make(chan bool),
watchContainer: make(map[string]string, 0),
watchDuration: 60 * time.Second,
Expand Down Expand Up @@ -86,65 +94,106 @@ func (p *FilebeatPiloter) scan() error {
return nil
}

configPaths := p.loadConfigPaths()
for container := range p.watchContainer {
confPath := p.ConfPathOf(container)
if _, err := os.Stat(confPath); err != nil && os.IsNotExist(err) {
log.Infof("log config %s.yml has removed and ignore", container)
log.Infof("log config %s.yml has been removed and ignore", container)
delete(p.watchContainer, container)
continue
}

c, err := yaml.NewConfigWithFile(confPath, configOpts...)
if err != nil {
log.Errorf("read %s.yml log config error: %v", container, err)
continue
} else if p.canRemoveConf(container, states, configPaths) {
log.Infof("try to remove log config %s.yml", container)
if err := os.Remove(confPath); err != nil {
log.Errorf("remove log config %s.yml fail: %v", container, err)
} else {
delete(p.watchContainer, container)
}
}
}
return nil
}

var config Config
if err := c.Unpack(&config); err != nil {
log.Errorf("parse %s.yml log config error: %v", container, err)
continue
}
func (p *FilebeatPiloter) canRemoveConf(container string, registry map[string]RegistryState,
configPaths map[string]string) bool {
config, err := p.loadConfig(container)
if err != nil {
return false
}

finished := true
for _, path := range config.Paths {
log.Debugf("scan %s log path: %s", container, path)
files, _ := filepath.Glob(path)
for _, file := range files {
info, err := os.Stat(file)
if err != nil && os.IsNotExist(err) {
log.Infof("%s->%s not exist", container, file)
continue
}
if _, ok := states[file]; !ok {
log.Infof("%s->%s registry not exist", container, file)
continue
}
if states[file].Offset < info.Size() {
log.Infof("%s->%s has not read finished", container, file)
finished = false
break
}
log.Infof("%s->%s has read finished", container, file)
for _, path := range config.Paths {
autoMount := p.isAutoMountPath(filepath.Dir(path))
logFiles, _ := filepath.Glob(path)
for _, logFile := range logFiles {
info, err := os.Stat(logFile)
if err != nil && os.IsNotExist(err) {
continue
}
if !finished {
break
if _, ok := registry[logFile]; !ok {
log.Warnf("%s->%s registry not exist", container, logFile)
continue
}
if registry[logFile].Offset < info.Size() {
if autoMount { // ephemeral logs
log.Infof("%s->%s does not finish to read", container, logFile)
return false
} else if _, ok := configPaths[path]; !ok { // host path bind
log.Infof("%s->%s does not finish to read and not exist in other config",
container, logFile)
return false
}
}
}
}
return true
}

if !finished {
log.Infof("ignore to remove log config %s.yml", container)
continue
func (p *FilebeatPiloter) loadConfig(container string) (*Config, error) {
confPath := p.ConfPathOf(container)
c, err := yaml.NewConfigWithFile(confPath, configOpts...)
if err != nil {
log.Errorf("read %s.yml log config error: %v", container, err)
return nil, err
}

var config Config
if err := c.Unpack(&config); err != nil {
log.Errorf("parse %s.yml log config error: %v", container, err)
return nil, err
}
return &config, nil
}

func (p *FilebeatPiloter) loadConfigPaths() map[string]string {
paths := make(map[string]string, 0)
confs, _ := ioutil.ReadDir(p.ConfHome())
for _, conf := range confs {
container := strings.TrimRight(conf.Name(), ".yml")
if _, ok := p.watchContainer[container]; ok {
continue // ignore removed container
}

log.Infof("try to remove log config %s.yml", container)
if err := os.Remove(confPath); err != nil {
log.Errorf("remove log config failure %s.yml", container)
config, err := p.loadConfig(container)
if err != nil || config == nil {
continue
}
delete(p.watchContainer, container)

for _, path := range config.Paths {
if _, ok := paths[path]; !ok {
paths[path] = container
}
}
}
return nil
return paths
}

func (p *FilebeatPiloter) isAutoMountPath(path string) bool {
dockerVolumePattern := fmt.Sprintf("^%s.*$", filepath.Join(p.base, DOCKER_HOME_PATH))
if ok, _ := regexp.MatchString(dockerVolumePattern, path); ok {
return true
}

kubeletVolumePattern := fmt.Sprintf("^%s.*$", filepath.Join(p.base, KUBELET_HOME_PATH))
ok, _ := regexp.MatchString(kubeletVolumePattern, path)
return ok
}

func (p *FilebeatPiloter) getRegsitryState() (map[string]RegistryState, error) {
Expand Down
3 changes: 1 addition & 2 deletions pilot/pilot.go
Expand Up @@ -98,7 +98,7 @@ func New(tplStr string, baseDir string) (*Pilot, error) {

piloter, _ := NewFluentdPiloter()
if os.Getenv(ENV_PILOT_TYPE) == PILOT_FILEBEAT {
piloter, _ = NewFilebeatPiloter()
piloter, _ = NewFilebeatPiloter(baseDir)
}

logPrefix := []string{"aliyun"}
Expand Down Expand Up @@ -511,7 +511,6 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}
}


format := info.children["format"]
if format == nil || format.value == "none" {
format = newLogInfoNode("nonex")
Expand Down
2 changes: 2 additions & 0 deletions quickstart/filebeat/es.yml
Expand Up @@ -21,6 +21,8 @@ services:
privileged: true
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/lib/filebeat:/var/lib/filebeat
- /var/log/filebeat:/var/log/filebeat
- /:/host
environment:
PILOT_TYPE: filebeat
Expand Down
2 changes: 2 additions & 0 deletions quickstart/filebeat/run
Expand Up @@ -45,9 +45,11 @@ done

blue "\nCleanup"
docker-compose -p tomcat -f tomcat.yml down
docker-compose -p tomcat2 -f tomcat2.yml down

blue "Starting tomcat"
docker-compose -p tomcat -f tomcat.yml up -d
docker-compose -p tomcat2 -f tomcat2.yml up -d

pwd=$(pwd)
project=$(basename $pwd)
Expand Down
4 changes: 2 additions & 2 deletions quickstart/filebeat/tomcat.yml
@@ -1,10 +1,10 @@
tomcat:
image: tomcat
ports:
- "8080:8080"
- "8080"
restart: always
volumes:
- /usr/local/tomcat/logs
- /tmp/tomcat:/usr/local/tomcat/logs
labels:
aliyun.logs.catalina: stdout
aliyun.logs.catalina.tags: app=tomcat,stage=test
Expand Down
12 changes: 12 additions & 0 deletions quickstart/filebeat/tomcat2.yml
@@ -0,0 +1,12 @@
tomcat2:
image: tomcat
ports:
- "8080"
restart: always
volumes:
- /tmp/tomcat:/usr/local/tomcat/logs
labels:
aliyun.logs.catalina: stdout
aliyun.logs.catalina.tags: app=tomcat2,stage=test
custom.logs.access: /usr/local/tomcat/logs/catalina.*.log
custom.logs.access.tags: app=tomcat2,stage=test,index=tomcat,topic=tomcat

0 comments on commit 3a6c450

Please sign in to comment.