通过 Filebeat 收集 Kubernetes Pod 日志并保存至 Elasticsearch

为了便于查找和分析 Kubernetes 上运行的服务日志,将 Filebeat 以 daemonset 的形式部署在集群的各个节点上,收集指定服务的日志,并将日志暂存到 Kafka 里,再通过 Logstash 做后处理,并写入到 Elasticsearch 中。

整体的日志处理工作流为:Filebeat -> Kafka -> Logstash -> Elasticsearch

Filebeat 配置

Filebeat 主要负责读取宿主机上的服务日志,做简单的预处理后,写入 Kafka 对应的 Topic 里。相较于 Logstash,Filebeat 使用的系统资源较少,但后处理插件较少,如果需要比较复杂的处理过程推荐使用 Logstash。

Beats are lightweight data shippers that you install as agents on your servers to send specific types of operational data to Elasticsearch. Beats have a small footprint and use fewer system resources than Logstash.

Logstash has a larger footprint, but provides a broad array of input, filter, and output plugins for collecting, enriching, and transforming data from a variety of sources.

该阶段主要包括:

  • Filebeat 的 input/processors/output 配置
  • 如何读取宿主机上的 Kubernetes Pod 日志,以及系统日志

Config Map

input:

  • 读取 /var/log/pods 下的 *.log 文件,/var/log/pods 直接挂载宿主机的对应目录
  • 在消息中增加 hostname 字段,其值为环境变量 HOSTNAME

processors:

  • 将嵌套的 [log][file][path] 字段拷贝一份,并命名为 log_file_path,便于后期处理
  • 如果仅需要采集指定路径/namespace 下的文件,或者不采集某些文件,可以配合 drop_event 使用
  • 增加 namespacepodservice 三个字段,便于后期处理(该步骤也可以在后续的 Logstash 中添加)
    • namespace:Pod 所在的 namespace
    • pod:哪个 Pod 打印的日志
    • service:哪个 Container 打印的日志

是否可以使用 script processor 通过解析 log_file_path 获取 namespacepodservice 等信息?

output:

  • 将消息暂存到 Kafka 的 log-collector topic 中
1
kubectl apply -f filebeat_cm.yml -n <namespace>

filebeat_cm.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
kind: ConfigMap
apiVersion: v1
metadata:
name: filebeat-config
data:
filebeat.yml: |
filebeat.inputs:
- type: filestream
id: filebeat
paths:
- /var/log/pods/**/*.log
- /var/log/syslog
- /var/log/messages
fields:
hostname: ${HOSTNAME}
fields_under_root: true
processors:
- if:
or:
- contains:
log.file.path: "/namespace-name"
- and:
- contains:
log.file.path: "xxx"
- contains:
log.file.path: "xxx"
then:
- copy_fields:
fields:
- from: log.file.path
to: log_file_path
- drop_fields:
fields: ["input", "agent", "ecs", "log"]
- add_fields:
target: ""
fields:
namespace: ""
pod: ""
service: ""
log_type: "pod"
cluster: "xxx"
else:
if:
or:
- contains:
log.file.path: "/messages"
- contains:
log.file.path: "/syslog"
then:
- copy_fields:
fields:
- from: log.file.path
to: log_file_path
- drop_fields:
fields: ["input", "agent", "ecs", "log"]
- add_fields:
target:""
fields:
log_type: "syslog"
cluster: "xxx"
else:
- drop_event: {}
output.kafka:
hosts: ["kafka.ops.svc:9092"]
topics:
- topic: "log-collector"
when.equals:
log_type: "pod"
- topic: "syslog"
when.equals:
log_type: "syslog"

如果希望将不同服务的日志写入到不同的 Kafka topic 中,可以参照如下配置:

filebeat_cm.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
kind: ConfigMap
apiVersion: v1
metadata:
name: filebeat-config
data:
filebeat.yml: |
filebeat.inputs:
- type: filestream
id: filebeat
paths:
- /var/log/pods/**/*.log
fields:
hostname: ${HOSTNAME}
fields_under_root: true

processors:
- if:
or:
- contains:
log.file.path: "/namespace-1"
- contains:
log.file.path: "/namespace-2"
then:
- drop_event:
when:
contains:
log.file.path: "daprd"
- copy_fields:
fields:
- from: log.file.path
to: log_file_path
- drop_fields:
fields: ["input", "agent", "ecs", "log"]
- add_fields:
target: ""
fields:
namespace: ""
pod: ""
service: ""
else:
- drop_event: {}

output.kafka:
hosts: ["kafka.ops.svc:9092"]
topics:
- topic: "log-collector"
when.contains:
log_file_path: "/namespace-1"
- topic: "log-collector-2"
when.contains:
log_file_path: "/namespace-2"

DaemonSet 配置

1
kubectl apply -f filebeat_ds.yml -n <namespace>

filebeat_ds.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
kind: DaemonSet
apiVersion: apps/v1
metadata:
annotations: {}
labels:
app: filebeat
name: filebeat-daemonset
spec:
selector:
matchLabels:
app: filebeat
updateStrategy:
rollingUpdate:
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: filebeat
spec:
tolerations:
- effect: NoSchedule
operator: Exists
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:8.5.1
imagePullPolicy: IfNotPresent
readinessProbe:
exec:
command:
- pgrep
- filebeat
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 1
livenessProbe:
exec:
command:
- pgrep
- filebeat
initialDelaySeconds: 20
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 3
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 1000m
memory: 200Mi
env:
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- mountPath: /usr/share/filebeat/filebeat.yml
subPath: filebeat.yml
name: filebeat-config
readOnly: true
- mountPath: /var/log/pods
name: pods-log
readOnly: true
- mountPath: /usr/share/filebeat/data
name: filebeat-data
volumes:
- configMap:
name: filebeat-config
name: filebeat-config
- hostPath:
path: /var/log/pods
type: Directory
name: pods-log
- hostPath:
path: /var/lib/filebeat-data
type: Directory
name: filebeat-data

Kafka 消息格式

Filebeat 发往 Kafka 的消息格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@timestamp": "2023-03-10T08:04:57.458Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.5.1"
},
"host": {
"name": "filebeat-daemonset-1234"
},
"log_file_path": "/var/log/pods/<namespace>_<pod>_xxxx/<service>/0.log",
"message": "2023-03-01T01:38:10.281346082+08:00 stdout F {\"@timestamp\":\"2023-03-01T01:38:10.281+08:00\",\"content\":\"detail message\",\"level\":\"info\"}",
"namespace": "",
"pod": "",
"service": "",
"hostname": "node123"
}

Logstash 配置

Logstash 主要负责处理如下流程:

  • 从 Kakfa 中读取消息
  • log_file_path 中提取 namespacepodservice 等信息
  • message 字段中移除 K8s 自动添加的日志前缀 2023-03-01T01:38:10.281346082+08:00 stdout F ,并将日志中的 json 字符串转换成 json 对象【如果 Pod 打印的不是 json 格式的,则不需要进行转换】

input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
input {
kafka {
id => "log-collector"
type => "log-collector"
bootstrap_servers => "kafka.ops.svc:9092"
topics => ["log-collector"]
client_dns_lookup => "use_all_dns_ips"
client_id => "log-collector"
group_id => "log-collector"
decorate_events => "basic"
auto_offset_reset => "latest"
consumer_threads => 4
}
kafka {
id => "syslog"
type => "syslog"
bootstrap_servers => "kafka.ops.svc:9092"
topics => ["syslog"]
client_dns_lookup => "use_all_dns_ips"
client_id => "syslog"
group_id => "syslog"
decorate_events => "basic"
auto_offset_reset => "latest"
consumer_threads => 4
}
}

filter

  • json:从 Kafka 中读取消息,并将原始的消息转换为 json 格式。input 插件读取后是字符串格式
  • ruby:从 log_file_path 中提取 namespacepodservice 等信息,并赋值给相应字段
  • grok:从 message 字段中移除 K8s 自动加的前缀信息,并将业务记录的日志记录到 message_content 字段
  • json:将提取的 message_content 转换成 json 格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
filter {
if [type] == "pod" {
json {
source => "message"
skip_on_invalid_json => true
remove_field => ["event"]
}

ruby {
code => "
log_file_path = event.get('log_file_path')
log_file_name = log_file_path.sub('/var/log/pods/', '')
namespace, pod = log_file_name.split('_')[0, 2]
service = log_file_name.split('/')[-2]
event.set('namespace', namespace)
event.set('pod', pod)
event.set('service', service)
"
}

grok {
match => {
"message" => "%{TIMESTAMP_ISO8601} %{WORD} %{WORD} %{GREEDYDATA:message_content}"
}
}

json {
source => "message_content"
remove_field => ["message_content"]
skip_on_invalid_json => true
}
} else if [type] == "syslog" {
json {
source => "message"
skip_on_invalid_json => true
remove_field => ["event"]
}

grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:log_host} %{PROG:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message_content}"
}
}
}
}

output

将处理后的消息写入 Elasticsearch,其中 index 字段为创建的 datastream

如果要为每一条写入 Elasticsearch 的数据记录其在 Kafka 中的信息,可以设置:document_id => "%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"。但由于 Filebeat 写入 Kafka 的消息默认含有 @metadata 字段,所以按照上述的配置,document_id 是不生效的。可以在 filter / json 中配置 target => "doc",则 Filebeat 的 @metadata 字段会转换到 [doc][@metadata] 字段中,不会再覆盖 Kakfa 添加的 @metadata。相应的,其他 Filebeat 消息也需要调整为从 [doc][...] 字段中获得

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
output {
if [type] == "pod" {
elasticsearch {
hosts => [ "${ES_HOST}" ]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "log-collector"
action => "create"
}
} else if [type] == "syslog" {
elasticsearch {
hosts => [ "${ES_HOST}" ]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "syslog"
action => "create"
}
}
}

Elasticsearch 配置

  • 创建 datastream 用于存储 Logstash 的发来的消息
  • datastream 需要依次创建
    • life cycle
    • component mappings & settings
    • index template