log-pilot5配置kafka
原创大约 4 分钟
问题
提示配置文件没有kafka_username,
解决:把DaemonSet的配置字段删除(DaemonSet定义了但是配置已经被注释了找不到)
- name: "KAFKA_USERNAME" valueFrom: configMapKeyRef: name: log-pilot5-configuration key: kafka_username - name: "KAFKA_PASSWORD" valueFrom: configMapKeyRef: name: log-pilot5-configuration key: kafka_password
配置
---
apiVersion: v1
kind: ConfigMap
metadata:
name: log-pilot5-configuration
namespace: ns-elastic
data:
logging_output: "kafka" # 输出源
kafka_brokers: "10.0.1.201:9092"
kafka_version: "0.10.0"
kafka_topics: "tomcat-syslog,tomcat-access"
#kafka_username: "user"
#kafka_password: "bogeit"
---
apiVersion: v1
data:
filebeat.tpl: |+
{{range .configList}}
- type: log
enabled: true
paths:
- {{ .HostDir }}/{{ .File }}
scan_frequency: 5s
fields_under_root: true
{{if eq .Format "json"}}
json.keys_under_root: true
{{end}}
fields:
{{range $key, $value := .Tags}}
{{ $key }}: {{ $value }}
{{end}}
{{range $key, $value := $.container}}
{{ $key }}: {{ $value }}
{{end}}
tail_files: false
close_inactive: 2h
close_eof: false
close_removed: true
clean_removed: true
close_renamed: false
{{end}}
kind: ConfigMap
metadata:
name: filebeat5-tpl
namespace: ns-elastic
---
apiVersion: v1
data:
config.filebeat: |+
#!/bin/sh
set -e
FILEBEAT_CONFIG=/etc/filebeat/filebeat.yml
if [ -f "$FILEBEAT_CONFIG" ]; then
echo "$FILEBEAT_CONFIG has been existed"
exit
fi
mkdir -p /etc/filebeat/prospectors.d
assert_not_empty() {
arg=$1
shift
if [ -z "$arg" ]; then
echo "$@"
exit 1
fi
}
cd $(dirname $0)
base() {
cat >> $FILEBEAT_CONFIG << EOF
path.config: /etc/filebeat
path.logs: /var/log/filebeat
path.data: /var/lib/filebeat/data
filebeat.registry_file: /var/lib/filebeat/registry
filebeat.shutdown_timeout: ${FILEBEAT_SHUTDOWN_TIMEOUT:-0}
logging.level: ${FILEBEAT_LOG_LEVEL:-info}
logging.metrics.enabled: ${FILEBEAT_METRICS_ENABLED:-false}
logging.files.rotateeverybytes: ${FILEBEAT_LOG_MAX_SIZE:-104857600}
logging.files.keepfiles: ${FILEBEAT_LOG_MAX_FILE:-10}
logging.files.permissions: ${FILEBEAT_LOG_PERMISSION:-0600}
${FILEBEAT_MAX_PROCS:+max_procs: ${FILEBEAT_MAX_PROCS}}
setup.template.name: "${FILEBEAT_INDEX:-filebeat}"
setup.template.pattern: "${FILEBEAT_INDEX:-filebeat}-*"
filebeat.config:
prospectors:
enabled: true
path: \${path.config}/prospectors.d/*.yml
reload.enabled: true
reload.period: 10s
EOF
}
es() {
if [ -f "/run/secrets/es_credential" ]; then
ELASTICSEARCH_USER=$(cat /run/secrets/es_credential | awk -F":" '{ print $1 }')
ELASTICSEARCH_PASSWORD=$(cat /run/secrets/es_credential | awk -F":" '{ print $2 }')
fi
if [ -n "$ELASTICSEARCH_HOSTS" ]; then
ELASTICSEARCH_HOSTS=$(echo $ELASTICSEARCH_HOSTS|awk -F, '{for(i=1;i<=NF;i++){printf "\"%s\",", $i}}')
ELASTICSEARCH_HOSTS=${ELASTICSEARCH_HOSTS%,}
else
assert_not_empty "$ELASTICSEARCH_HOST" "ELASTICSEARCH_HOST required"
assert_not_empty "$ELASTICSEARCH_PORT" "ELASTICSEARCH_PORT required"
ELASTICSEARCH_HOSTS="\"$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT\""
fi
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.elasticsearch:
hosts: [$ELASTICSEARCH_HOSTS]
index: ${ELASTICSEARCH_INDEX:-filebeat}-%{+yyyy.MM.dd}
${ELASTICSEARCH_SCHEME:+protocol: ${ELASTICSEARCH_SCHEME}}
${ELASTICSEARCH_USER:+username: ${ELASTICSEARCH_USER}}
${ELASTICSEARCH_PASSWORD:+password: ${ELASTICSEARCH_PASSWORD}}
${ELASTICSEARCH_WORKER:+worker: ${ELASTICSEARCH_WORKER}}
${ELASTICSEARCH_PATH:+path: ${ELASTICSEARCH_PATH}}
${ELASTICSEARCH_BULK_MAX_SIZE:+bulk_max_size: ${ELASTICSEARCH_BULK_MAX_SIZE}}
EOF
}
default() {
echo "use default output"
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.console:
pretty: ${CONSOLE_PRETTY:-false}
EOF
}
file() {
assert_not_empty "$FILE_PATH" "FILE_PATH required"
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.file:
path: $FILE_PATH
${FILE_NAME:+filename: ${FILE_NAME}}
${FILE_ROTATE_SIZE:+rotate_every_kb: ${FILE_ROTATE_SIZE}}
${FILE_NUMBER_OF_FILES:+number_of_files: ${FILE_NUMBER_OF_FILES}}
${FILE_PERMISSIONS:+permissions: ${FILE_PERMISSIONS}}
EOF
}
logstash() {
assert_not_empty "$LOGSTASH_HOST" "LOGSTASH_HOST required"
assert_not_empty "$LOGSTASH_PORT" "LOGSTASH_PORT required"
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.logstash:
hosts: ["$LOGSTASH_HOST:$LOGSTASH_PORT"]
index: ${FILEBEAT_INDEX:-filebeat}-%{+yyyy.MM.dd}
${LOGSTASH_WORKER:+worker: ${LOGSTASH_WORKER}}
${LOGSTASH_LOADBALANCE:+loadbalance: ${LOGSTASH_LOADBALANCE}}
${LOGSTASH_BULK_MAX_SIZE:+bulk_max_size: ${LOGSTASH_BULK_MAX_SIZE}}
${LOGSTASH_SLOW_START:+slow_start: ${LOGSTASH_SLOW_START}}
EOF
}
redis() {
assert_not_empty "$REDIS_HOST" "REDIS_HOST required"
assert_not_empty "$REDIS_PORT" "REDIS_PORT required"
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.redis:
hosts: ["$REDIS_HOST:$REDIS_PORT"]
key: "%{[fields.topic]:filebeat}"
${REDIS_WORKER:+worker: ${REDIS_WORKER}}
${REDIS_PASSWORD:+password: ${REDIS_PASSWORD}}
${REDIS_DATATYPE:+datatype: ${REDIS_DATATYPE}}
${REDIS_LOADBALANCE:+loadbalance: ${REDIS_LOADBALANCE}}
${REDIS_TIMEOUT:+timeout: ${REDIS_TIMEOUT}}
${REDIS_BULK_MAX_SIZE:+bulk_max_size: ${REDIS_BULK_MAX_SIZE}}
EOF
}
kafka() {
assert_not_empty "$KAFKA_BROKERS" "KAFKA_BROKERS required"
KAFKA_BROKERS=$(echo $KAFKA_BROKERS|awk -F, '{for(i=1;i<=NF;i++){printf "\"%s\",", $i}}')
KAFKA_BROKERS=${KAFKA_BROKERS%,}
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.kafka:
hosts: [$KAFKA_BROKERS]
topic: '%{[topic]}'
codec.format:
string: '%{[message]}'
${KAFKA_VERSION:+version: ${KAFKA_VERSION}}
${KAFKA_WORKER:+worker: ${KAFKA_WORKER}}
${KAFKA_PARTITION_KEY:+key: ${KAFKA_PARTITION_KEY}}
${KAFKA_PARTITION:+partition: ${KAFKA_PARTITION}}
${KAFKA_CLIENT_ID:+client_id: ${KAFKA_CLIENT_ID}}
${KAFKA_METADATA:+metadata: ${KAFKA_METADATA}}
${KAFKA_BULK_MAX_SIZE:+bulk_max_size: ${KAFKA_BULK_MAX_SIZE}}
${KAFKA_BROKER_TIMEOUT:+broker_timeout: ${KAFKA_BROKER_TIMEOUT}}
${KAFKA_CHANNEL_BUFFER_SIZE:+channel_buffer_size: ${KAFKA_CHANNEL_BUFFER_SIZE}}
${KAFKA_KEEP_ALIVE:+keep_alive ${KAFKA_KEEP_ALIVE}}
${KAFKA_MAX_MESSAGE_BYTES:+max_message_bytes: ${KAFKA_MAX_MESSAGE_BYTES}}
${KAFKA_REQUIRE_ACKS:+required_acks: ${KAFKA_REQUIRE_ACKS}}
EOF
}
count(){
cat >> $FILEBEAT_CONFIG << EOF
$(base)
output.count:
EOF
}
if [ -n "$FILEBEAT_OUTPUT" ]; then
LOGGING_OUTPUT=$FILEBEAT_OUTPUT
fi
case "$LOGGING_OUTPUT" in
elasticsearch)
es;;
logstash)
logstash;;
file)
file;;
redis)
redis;;
kafka)
kafka;;
count)
count;;
*)
default
esac
kind: ConfigMap
metadata:
name: config5.filebeat
namespace: ns-elastic
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: log-pilot5
namespace: ns-elastic
labels:
k8s-app: log-pilot5
spec:
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
k8s-app: log-pilot5
template:
metadata:
labels:
k8s-app: log-pilot5
spec:
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
- effect: NoSchedule
key: ToBeDeletedByClusterAutoscaler
operator: Exists
containers:
- name: log-pilot5
image: williamguozi/log-pilot-filebeat:containerd # 大神提供的
imagePullPolicy: IfNotPresent
env:
- name: "LOGGING_OUTPUT"
valueFrom:
configMapKeyRef:
name: log-pilot5-configuration
key: logging_output
- name: "KAFKA_BROKERS"
valueFrom:
configMapKeyRef:
name: log-pilot5-configuration
key: kafka_brokers
- name: "KAFKA_VERSION"
valueFrom:
configMapKeyRef:
name: log-pilot5-configuration
key: kafka_version
- name: "NODE_NAME"
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: sock
mountPath: /var/run/containerd/containerd.sock
- name: logs
mountPath: /var/log/filebeat
- name: state
mountPath: /var/lib/filebeat
- name: root
mountPath: /host
readOnly: true
- name: localtime
mountPath: /etc/localtime
- name: config-volume
mountPath: /etc/filebeat/config
- name: filebeat-tpl
mountPath: /pilot/filebeat.tpl
subPath: filebeat.tpl
- name: config-filebeat
mountPath: /pilot/config.filebeat
subPath: config.filebeat
securityContext:
capabilities:
add:
- SYS_ADMIN
terminationGracePeriodSeconds: 30
volumes:
- name: sock
hostPath:
path: /var/run/containerd/containerd.sock
type: Socket
- name: logs
hostPath:
path: /var/log/filebeat
type: DirectoryOrCreate
- name: state
hostPath:
path: /var/lib/filebeat
type: DirectoryOrCreate
- name: root
hostPath:
path: /
type: Directory
- name: localtime
hostPath:
path: /etc/localtime
type: File
- name: config-volume
configMap:
name: log-pilot5-configuration
items:
- key: kafka_topics
path: kafka_topics
- name: filebeat-tpl
configMap:
name: filebeat5-tpl
- name: config-filebeat
configMap:
name: config5.filebeat
defaultMode: 0777重新部署
# kubectl -n ns-elastic apply -f log-pilot5.yaml
configmap/log-pilot5-configuration created
configmap/filebeat5-tpl created
configmap/config5.filebeat created
daemonset.apps/log-pilot5 created
# kubectl -n ns-elastic get pod -o wide
NAME READY STATUS RESTARTS AGE
log-pilot5-2wz5b 0/1 Pending 0 2s
log-pilot5-hz5gs 0/1 ContainerCreating 0 2s
log-pilot5-qz8l4 0/1 Pending 0 2s
log-pilot5-ttg6r 0/1 Pending 0 2s
# kubectl -n ns-elastic describe pod log-pilot5-8r4kl
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 86s default-scheduler Successfully assigned ns-elastic/log-pilot5-8r4kl to 10.0.1.202
Normal Pulled 76s kubelet Container image "williamguozi/log-pilot-filebeat:containerd" already present on machine
Normal Created 74s kubelet Created container log-pilot5
Normal Started 73s kubelet Started container log-pilot5
kubectl -n ns-elastic logs log-pilot5-8r4kllog-pilot5需要300M内存
kubectl -n ns-elastic describe pod log-pilot5-cjdcd
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 36s default-scheduler Successfully assigned ns-elastic/log-pilot5-cjdcd to 10.0.1.201
Warning Evicted 27s kubelet The node was low on resource: memory. Threshold quantity: 300Mi, available: 182784Ki.
Warning ExceededGracePeriod 15s kubelet Container runtime did not kill the pod within specified grace period.