为了便于查找和分析 Kubernetes 上运行的服务日志,将 Filebeat 以 daemonset 的形式部署在集群的各个节点上,收集指定服务的日志,并将日志暂存到 Kafka 里,再通过 Logstash 做后处理,并写入到 Elasticsearch 中。
整体的日志处理工作流为:Filebeat -> Kafka -> Logstash -> Elasticsearch
Filebeat 配置 Filebeat 主要负责读取宿主机上的服务日志,做简单的预处理后,写入 Kafka 对应的 Topic 里。相较于 Logstash,Filebeat 使用的系统资源较少,但后处理插件较少,如果需要比较复杂的处理过程推荐使用 Logstash。
Beats are lightweight data shippers that you install as agents on your servers to send specific types of operational data to Elasticsearch. Beats have a small footprint and use fewer system resources than Logstash.
Logstash has a larger footprint, but provides a broad array of input, filter, and output plugins for collecting, enriching, and transforming data from a variety of sources.
Filebeat 的 input/processors/output 配置
如何读取宿主机上的 Kubernetes Pod 日志,以及系统日志
Config Map input:
读取 /var/log/pods
下的 *.log
在消息中增加 hostname
字段,其值为环境变量 HOSTNAME
将嵌套的 [log][file][path]
字段拷贝一份,并命名为 log_file_path
如果仅需要采集指定路径/namespace 下的文件,或者不采集某些文件,可以配合 drop_event
增加 namespace
三个字段,便于后期处理(该步骤也可以在后续的 Logstash 中添加)
namespace:Pod 所在的 namespace
pod:哪个 Pod 打印的日志
service:哪个 Container 打印的日志
是否可以使用 script
processor 通过解析 log_file_path
获取 namespace
将消息暂存到 Kafka 的 log-collector
topic 中
1 kubectl apply -f filebeat_cm.yml -n <namespace>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 kind: ConfigMap apiVersion: v1 metadata: name: filebeat-config data: filebeat.yml: | filebeat.inputs: - type: filestream id: filebeat paths: - /var/log/pods/**/*.log - /var/log/syslog - /var/log/messages fields: hostname: ${HOSTNAME} fields_under_root: true processors: - if: or: - contains: log.file.path: "/namespace-name" - and: - contains: log.file.path: "xxx" - contains: log.file.path: "xxx" then: - copy_fields: fields: - from: log.file.path to: log_file_path - drop_fields: fields: ["input", "agent", "ecs", "log"] - add_fields: target: "" fields: namespace: "" pod: "" service: "" log_type: "pod" cluster: "xxx" else: if: or: - contains: log.file.path: "/messages" - contains: log.file.path: "/syslog" then: - copy_fields: fields: - from: log.file.path to: log_file_path - drop_fields: fields: ["input", "agent", "ecs", "log"] - add_fields: target:"" fields: log_type: "syslog" cluster: "xxx" else: - drop_event: {} output.kafka: hosts: ["kafka.ops.svc:9092"] topics: - topic: "log-collector" when.equals: log_type: "pod" - topic: "syslog" when.equals: log_type: "syslog"
如果希望将不同服务的日志写入到不同的 Kafka topic 中,可以参照如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 kind: ConfigMap apiVersion: v1 metadata: name: filebeat-config data: filebeat.yml: | filebeat.inputs: - type: filestream id: filebeat paths: - /var/log/pods/**/*.log fields: hostname: ${HOSTNAME} fields_under_root: true processors: - if: or: - contains: log.file.path: "/namespace-1" - contains: log.file.path: "/namespace-2" then: - drop_event: when: contains: log.file.path: "daprd" - copy_fields: fields: - from: log.file.path to: log_file_path - drop_fields: fields: ["input" , "agent" , "ecs" , "log" ] - add_fields: target: "" fields: namespace: "" pod: "" service: "" else: - drop_event: {} output.kafka: hosts: ["kafka.ops.svc:9092" ] topics: - topic: "log-collector" when.contains: log_file_path: "/namespace-1" - topic: "log-collector-2" when.contains: log_file_path: "/namespace-2"
DaemonSet 配置 1 kubectl apply -f filebeat_ds.yml -n <namespace >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 kind: DaemonSet apiVersion: apps/v1 metadata: annotations: {} labels: app: filebeat name: filebeat-daemonset spec: selector: matchLabels: app: filebeat updateStrategy: rollingUpdate: maxUnavailable: 1 type: RollingUpdate template: metadata: labels: app: filebeat spec: tolerations: - effect: NoSchedule operator: Exists containers: - name: filebeat image: docker.elastic.co/beats/filebeat:8.5.1 imagePullPolicy: IfNotPresent readinessProbe: exec: command: - pgrep - filebeat initialDelaySeconds: 10 periodSeconds: 10 timeoutSeconds: 5 failureThreshold: 1 livenessProbe: exec: command: - pgrep - filebeat initialDelaySeconds: 20 periodSeconds: 30 timeoutSeconds: 5 failureThreshold: 3 resources: requests: cpu: 100m memory: 100Mi limits: cpu: 1000m memory: 200Mi env: - name: HOSTNAME valueFrom: fieldRef: fieldPath: spec.nodeName volumeMounts: - mountPath: /usr/share/filebeat/filebeat.yml subPath: filebeat.yml name: filebeat-config readOnly: true - mountPath: /var/log/pods name: pods-log readOnly: true - mountPath: /usr/share/filebeat/data name: filebeat-data volumes: - configMap: name: filebeat-config name: filebeat-config - hostPath: path: /var/log/pods type: Directory name: pods-log - hostPath: path: /var/lib/filebeat-data type: Directory name: filebeat-data
Kafka 消息格式
Filebeat 发往 Kafka 的消息格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "@timestamp" : "2023-03-10T08:04:57.458Z" , "@metadata" : { "beat" : "filebeat" , "type" : "_doc" , "version" : "8.5.1" } , "host" : { "name" : "filebeat-daemonset-1234" } , "log_file_path" : "/var/log/pods/<namespace>_<pod>_xxxx/<service>/0.log" , "message" : "2023-03-01T01:38:10.281346082+08:00 stdout F {\"@timestamp\":\"2023-03-01T01:38:10.281+08:00\",\"content\":\"detail message\",\"level\":\"info\"}" , "namespace" : "" , "pod" : "" , "service" : "" , "hostname" : "node123" }
Logstash 配置 Logstash 主要负责处理如下流程:
从 Kakfa 中读取消息
从 log_file_path
中提取 namespace
从 message
字段中移除 K8s 自动添加的日志前缀 2023-03-01T01:38:10.281346082+08:00 stdout F
,并将日志中的 json 字符串转换成 json 对象【如果 Pod 打印的不是 json 格式的,则不需要进行转换】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 input { kafka { id => "log-collector" type => "log-collector" bootstrap_servers => "kafka.ops.svc:9092" topics => ["log-collector" ] client_dns_lookup => "use_all_dns_ips" client_id => "log-collector" group_id => "log-collector" decorate_events => "basic" auto_offset_reset => "latest" consumer_threads => 4 } kafka { id => "syslog" type => "syslog" bootstrap_servers => "kafka.ops.svc:9092" topics => ["syslog" ] client_dns_lookup => "use_all_dns_ips" client_id => "syslog" group_id => "syslog" decorate_events => "basic" auto_offset_reset => "latest" consumer_threads => 4 } }
:从 Kafka 中读取消息,并将原始的消息转换为 json 格式。input
:从 log_file_path
中提取 namespace
:从 message
字段中移除 K8s 自动加的前缀信息,并将业务记录的日志记录到 message_content
:将提取的 message_content
转换成 json 格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 filter { if [type] == "pod" { json { source => "message" skip_on_invalid_json => true remove_field => ["event" ] } ruby { code => " log_file_path = event.get('log_file_path') log_file_name = log_file_path.sub('/var/log/pods/', '') namespace, pod = log_file_name.split('_')[0, 2] service = log_file_name.split('/')[-2] event.set('namespace', namespace) event.set('pod', pod) event.set('service', service) " } grok { match => { "message" => "%{TIMESTAMP_ISO8601} %{WORD} %{WORD} %{GREEDYDATA:message_content}" } } json { source => "message_content" remove_field => ["message_content" ] skip_on_invalid_json => true } } else if [type] == "syslog" { json { source => "message" skip_on_invalid_json => true remove_field => ["event" ] } grok { match => { "message" => "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:log_host} %{PROG:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message_content}" } } } }
output 将处理后的消息写入 Elasticsearch,其中 index
字段为创建的 datastream
如果要为每一条写入 Elasticsearch 的数据记录其在 Kafka 中的信息,可以设置:document_id => "%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
。但由于 Filebeat 写入 Kafka 的消息默认含有 @metadata
是不生效的。可以在 filter / json 中配置 target => "doc"
,则 Filebeat 的 @metadata
字段会转换到 [doc][@metadata]
字段中,不会再覆盖 Kakfa 添加的 @metadata
。相应的,其他 Filebeat 消息也需要调整为从 [doc][...]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 output { if [type] == "pod" { elasticsearch { hosts => [ "${ES_HOST}" ] user => "${ES_USERNAME}" password => "${ES_PASSWORD}" index => "log-collector" action => "create" } } else if [type] == "syslog" { elasticsearch { hosts => [ "${ES_HOST}" ] user => "${ES_USERNAME}" password => "${ES_PASSWORD}" index => "syslog" action => "create" } } }
Elasticsearch 配置
创建 datastream 用于存储 Logstash 的发来的消息
datastream 需要依次创建
life cycle
component mappings & settings
index template