Skip to content

Commit

Permalink
Merge pull request #194 from chenquanzhao/master
Browse files Browse the repository at this point in the history
Check valid kafka topics when disable auto-create topic
  • Loading branch information
chenquanzhao committed Apr 26, 2019
2 parents 4a32add + eae859d commit f987d49
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -51,6 +51,7 @@ container: clean-container .container-$(ARCH)
$(DOCKER) build --no-cache --pull -t $(MULTI_ARCH_IMG)-filebeat:$(TAG) -f Dockerfile.filebeat $(TEMP_DIR)/
ifeq ($(ARCH), amd64)
$(DOCKER) tag $(MULTI_ARCH_IMG)-filebeat:$(TAG) $(IMAGE)-filebeat:$(TAG)
$(DOCKER) tag $(MULTI_ARCH_IMG)-filebeat:$(TAG) $(IMGNAME):$(TAG)
endif

@echo "+ Building container image $(MULTI_ARCH_IMG)-fluentd:$(TAG)"
Expand Down
96 changes: 96 additions & 0 deletions examples/with-configmap/filebeat-pilot-kafka-kubernetes.yml
@@ -0,0 +1,96 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: log-pilot-configuration
data:
logging_output: "kafka"
kafka_brokers: "kafka1:9092,kafka2:9092"
# configure all valid topics in kafka
# when disable auto-create topic
kafka_topics: "topic1,topic2,topic3"
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: log-pilot
labels:
k8s-app: log-pilot
spec:
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
k8s-app: log-pilot
spec:
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: log-pilot
image: registry.cn-hangzhou.aliyuncs.com/acs/log-pilot:0.9.6-filebeat
env:
- name: "LOGGING_OUTPUT"
valueFrom:
configMapKeyRef:
name: log-pilot-configuration
key: logging_output
- name: "KAFKA_BROKERS"
valueFrom:
configMapKeyRef:
name: log-pilot-configuration
key: kafka_brokers
- name: "NODE_NAME"
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: sock
mountPath: /var/run/docker.sock
- name: logs
mountPath: /var/log/filebeat
- name: state
mountPath: /var/lib/filebeat
- name: root
mountPath: /host
readOnly: true
- name: localtime
mountPath: /etc/localtime
# configure all valid topics in kafka
# when disable auto-create topic
- name: config-volume
mountPath: /etc/filebeat/config
securityContext:
capabilities:
add:
- SYS_ADMIN
terminationGracePeriodSeconds: 30
volumes:
- name: sock
hostPath:
path: /var/run/docker.sock
type: Socket
- name: logs
hostPath:
path: /var/log/filebeat
type: DirectoryOrCreate
- name: state
hostPath:
path: /var/lib/filebeat
type: DirectoryOrCreate
- name: root
hostPath:
path: /
type: Directory
- name: localtime
hostPath:
path: /etc/localtime
type: File
# kubelet sync period
- name: config-volume
configMap:
name: log-pilot-configuration
items:
- key: kafka_topics
path: kafka_topics
5 changes: 5 additions & 0 deletions pilot/filebeat_piloter.go
Expand Up @@ -303,3 +303,8 @@ func (p *FilebeatPiloter) Name() string {
func (p *FilebeatPiloter) OnDestroyEvent(container string) error {
return p.feed(container)
}

// GetBaseConf returns plugin root directory
func (p *FilebeatPiloter) GetBaseConf() string {
return FILEBEAT_BASE_CONF
}
5 changes: 5 additions & 0 deletions pilot/fluentd_piloter.go
Expand Up @@ -142,3 +142,8 @@ func (p *FluentdPiloter) OnDestroyEvent(container string) error {
log.Info("refactor in the future!!!")
return nil
}

// GetBaseConf returns plugin root directory
func (p *FluentdPiloter) GetBaseConf() string {
return FLUENTD_BASE_CONF
}
31 changes: 31 additions & 0 deletions pilot/pilot.go
Expand Up @@ -505,6 +505,32 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) {
return tagMap, nil
}

func (p *Pilot) tryCheckKafkaTopic(topic string) error {
output := os.Getenv(ENV_LOGGING_OUTPUT)
if output != "kafka" {
return nil
}

topicPath := filepath.Join(p.piloter.GetBaseConf(), "config", "kafka_topics")
if _, err := os.Stat(topicPath); os.IsNotExist(err) {
log.Info("ignore checking the validity of kafka topic")
return nil
}

topics, err := ReadFile(topicPath, ",")
if err != nil {
return err
}

for _, t := range topics {
if t == topic {
return nil
}
}

return fmt.Errorf("invalid topic: %s, supported topics: %v", topic, topics)
}

func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {
path := strings.TrimSpace(info.value)
if path == "" {
Expand Down Expand Up @@ -535,6 +561,11 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}
}

// try to check the validity of the target topic for kafka
if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil {
return nil, err
}

format := info.children["format"]
if format == nil || format.value == "none" {
format = newLogInfoNode("nonex")
Expand Down
1 change: 1 addition & 0 deletions pilot/piloter.go
Expand Up @@ -21,6 +21,7 @@ type Piloter interface {
Reload() error
Stop() error

GetBaseConf() string
GetConfHome() string
GetConfPath(container string) string

Expand Down
16 changes: 16 additions & 0 deletions pilot/util.go
@@ -0,0 +1,16 @@
package pilot

import (
"io/ioutil"
"strings"
)

// ReadFile return string list separated by separator
func ReadFile(path string, separator string) ([]string, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}

return strings.Split(string(data), separator), nil
}
4 changes: 2 additions & 2 deletions quickstart/es.yml
Expand Up @@ -3,10 +3,10 @@ services:
elasticsearch:
ports:
- 9200:9200
image: elasticsearch
image: elasticsearch:5.5.1

kibana:
image: kibana
image: kibana:5.5.1
ports:
- 5601:5601
environment:
Expand Down
4 changes: 2 additions & 2 deletions quickstart/filebeat/es.yml
Expand Up @@ -3,10 +3,10 @@ services:
elasticsearch:
ports:
- 9200:9200
image: elasticsearch
image: elasticsearch:5.5.1

kibana:
image: kibana
image: kibana:5.5.1
ports:
- 5601:5601
environment:
Expand Down

0 comments on commit f987d49

Please sign in to comment.