使用 Vector 快速搭建日志采集系统

注:本博客所有文章禁止转载,将追究法律责任。

前言

在分布式系统中,可观测性(Observability)是十分重要的组成之一,如果没有可观测性系统,在排查问题时通常只能 SSH 到每一台机器上去查看分散的日志,排查问题的效率将变的非常低。

而可观测性系统通常由三部分组成:Logs、Metrics、Tracing:

  • Logs:日志采集监测系统,常见的技术栈为 ELK 全家桶(Elasticsearch、Logstash、Kibana);
  • Metrics:常见的技术栈为 Prometheus,数据量大的时候会加上时序数据库;
  • Tracing:常见的技术是在日志中打点 trace_id,或使用 eBPF 技术。

本文主要将精力放在 Logs,也就是日志采集监测系统搭建这里。

vector

技术选型

前面提到,日志采集系统最常见的技术栈为 ELK 全家桶,而随着 elastic 商业性的扩大,其将项目的 license 替换为了 Elastic License,能否自由地在商用软件中使用还需要仔细调研。且现在的 Kibana 默认界面启动后,会有很多需要购买 license 才能用的功能摆在界面上,让人觉得十分臃肿。

从 Uber 的这篇博客(Fast and Reliable Schema-Agnostic Log Analytics Platform)中,我们能看到越来越多的开发者愿意将 Elasticsearch 换到性价比更高的 Clickhouse 中。因为 Elasticsearch 的维护成本实在是太高了(前公司内部 ES 集群都是部署在 SSD 上的,维护成本十分昂贵),而 Clickhouse 在介绍中提到:

ClickHouse被设计用于工作在传统磁盘上的系统,它提供每GB更低的存储成本,但如果可以使用SSD和内存,它也会合理的利用这些资源。ClickHouse会使用服务器上一切可用的资源,从而以最自然的方式并行处理大型查询。

而 Logstash 官方并没有支持 Clickhouse 的 sink plugin,在一个开源实现(funcmike/logstash-output-clickhouse)中,原作者在 readme 中提到他转到了 Vector,我是在这里才了解到 Vector 这个项目的。Vector 官方支持了 Clickhouse、Loki 等多种 sink 方式,且 Vector 具有以下特性:

  • 配置灵活,可以非常自由地组装日志流,复用日志流中的任意一个组件;
  • 使用 Rust 语言编写,性能有保证,错误必须被显式处理(类似于 go 的 err 编程……);
  • 灵活的 VRL(Vector Remap Language),可以应付几乎所有日志处理的场景(例如正则匹配、常见日志格式匹配、条件判断、字段的增删改查、数值计算、字符串计算、数组处理等);
  • 占用资源小,内存仅占用 80MB 左右,相比于 Logstash 轻量了很多;
  • 由于轻量,所以更好调试;

综上,我们将使用 Vector + Clickhouse 来代替 Filebeat + Logstash + Elasticsearch 的方案。这里 Vector 将同时担当采集端(collector)和处理端(aggregator)的工作,其实 Logstash 也是支持采集端的配置的,但由于 Logstash 太重了,所以在采集端通常由更轻量级的 Filebeat 实现。

至于 Kibana 的替换方案,我们可以使用 Grafana + loki-adapter 实现。

Vector 安装与部署方式

关于安装我推荐大家直接看官方的说明(Install Vector),这里只提供 CentOS 下的安装说明,与使用容器部署的说明。

systemd

安装:

1
2
3
curl -1sLf 'https://repositories.timber.io/public/vector/cfg/setup/bash.rpm.sh' | sudo -E bash

sudo yum install vector

配置文件在 /etc/vector/vector.toml 下。

启动:

1
systemctl start vector

container

nerdctl 和 docker 都是一样的命令(注意 docker 的 -v 不能使用相对路径):

1
2
3
4
5
./nerdctl run -d -h $HOSTNAME --name vector \
-v ./vector.toml:/etc/vector/vector.toml \
-v /var/log:/log --network="host" \
-e TZ=$(timedatectl | grep 'Time zone' | awk '{print $3}') \
-p 127.0.0.1:514:514 timberio/vector:0.22.1-alpine

这里有几个参数需要说明:

  • 传入宿主机的 hostname 是因为 vector 在采集日志时,会自动带上 host 字段,如果不传入的话,就会被 container id 给取代;
  • 由于将 /var/log 挂载到了 /log,所以要注意配置文件里的路径得是容器内的路径;
  • 时区的传入也很重要,见附录;
  • 暴露 514 端口到本地是为了让宿主机上的 rsyslogd 服务能将事件传入至 vector,vector 自带 sylog 的解析功能;

Vector 功能介绍

我们直接来看 Vector 的配置文件长什么样,即可了解到它的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[sources.libvirtd]
type = "file"
include = [ "/var/log/libvirtd.log" ]
read_from = "beginning"
multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2}"
multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2}"
multiline.mode = "halt_before"
multiline.timeout_ms = 1000

[transforms.remap_log_libvirtd]
type = "remap"
inputs = ["libvirtd"]
source = '''
. |= parse_regex!(.message, r'^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}\+\d{4}): (?P<pid>\d+): (?P<severity>\w+) : (?P<caller>[A-Za-z0-9_.-]+:\d+) : ((?P<event>\w+): )?(?P<message>(.|\n)*)$')
.timestamp = parse_timestamp!(.timestamp, "%Y-%m-%d %H:%M:%S.%3f%z")
'''

[sinks.print]
type = "console"
inputs = ["remap_log_libvirtd"]
encoding.codec = "json"

Vector 中将日志流分为 sources、transforms、sinks:

  • sources:日志来源,最常用的是 file 类型的日志源,如果你能确保日志都是单行的,则可不用配置 multiline,vector 还支持的源有 docker container logs、http(webhook)、metrics、kafka、logstash、syslog 等,当然了,还有 vector 自身;
  • transforms:日志转换规则,可通过正则的方式从日志中提取字段,也可以做一些简单的计算,能覆盖大部分场景,其中最强大的是 remap 功能,类似于 lua 脚本但有更简单的语法,会由 rust interpreter 执行;
  • sinks:日志下沉配置,支持 clickhouse、loki、es、kafka、file 等,还有 vector 自身。

sources

配置需要采集的日志源,这里仅说明最常用的 file 类型日志源(sources/File)。

常用的配置项只有 includeread_from

  • include:文件路径,支持通配符,例如 /var/log/**/*.log
  • read_from:默认为 begginning,从文件头开始读取,若配置为 end,则会从最后一行开始监听,监听文件的 checkpoint 记录默认存放在 /var/lib/vector/source_id 路径下,若想删除文件读取记录让 vector 从头开始读取文件,删除该路径下的数据即可。

我们可以在中看到这个组件的输出格式:一共四个字段:

  • file(文件路径)
  • host(主机名,所以前面提到容器启动时需要传入 hostname)
  • message(日志行)
  • timestamp(采集时间,不是日志时间)

transforms

而 transforms 的功能就是为了处理来自 sources 的事件,通过配置 inputs 来指定需要收到的数据流,inputs 里当然也可以放其他的 transform_id 来自由组装日志流。

而 transforms 的类型有很多,但官网似乎为了精简功能,将大部分 transforms 都隐藏了,能被 remap transform 实现的就不展示了,但类似于很简单的添加字段、删除字段这些需求,是没必要使用到 remap 的(remap 会启动一个 rust interpreter,在性能上可能会比使用简单的 transforms 来的低)。

在日志处理中,我们常用的组件会有:remap, filter, route, add_fileds, remove_fields 等。remap 即可满足几乎所有的需求:https://vector.dev/docs/reference/vrl/ ,但如果只是为了使用简单的添加、删除字段,可使用被官网隐藏起来的 transforms: https://assume-role-docs--vector-project.netlify.app/docs/reference/transforms/add_fields/

parse_timestamp 的过程中,vector 会根据当前时区做转换,详见附录。

sinks

将日志下沉到某个地方,这里以 clickhouse 为例:

1
2
3
4
5
6
7
8
[sinks.my_sink_id]
type = "clickhouse"
inputs = [ "my-source-or-transform-id" ]
database = "mydatabase"
endpoint = "http://localhost:8123"
table = "mytable"
compression = "gzip"
skip_unknown_fields = true

这样就可以把数据写入 Clickhouse 了,在 sink 中常用的配置项可能是 acknowledgements.enabled=true 当这条开启后,只有日志下沉成功了,日志才表示上传成功(否则日志会停留在 sources 被标记为发送失败,会重试发送)。

数据存储与查询

我们使用 Clickhouse 做为数据存储,建表语句通常如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS logs.libvirtd (
host String,
service String,
file String,
message String,
time DateTime,
severity String,
pid UInt32,
caller String,
event String
)
ENGINE = MergeTree()
ORDER BY time;

其中 DateTime 只支持到秒级,所以前面的 remap 中还需要添加 .time = format_timestamp!(.timestamp, "%Y-%m-%d %H:%M:%S)" 若要支持到更细精度的,可使用 DateTime64 存储。

到这里,整条日志流应该就可以运行起来了,也可以通过 Grafana 配置 Clickhouse DataSource 来查询日志,但效果还是差了些,不如 Grafana 的亲儿子 Loki 能做到类似于 Kibana 的查询效果。

有没有什么办法是可以既用到 Grafana Loki 的能力,又使用 Clickhouse 进行存储呢?我们可以使用 GitHub 上的这个组件:https://github.com/metrico/qryn

启用这个组件后,在 Vector 中将 sinks 改为 type="loki" 并配置需要写入的 labels,即可达到上面的目的,从而使用 Grafana + Clickhouse 的技术栈达到 Kibana 查询日志的使用体验。

基于日志的报警

这一步其实也很简单,通过使用 Alertmanager,我们将报警发送到它的 Webhook,即可实现基于日志的报警。

这里我们首先在日志流中加入 filter 组件,将需要报警的日志筛选出来:

1
2
3
4
[transforms.filter_event_udev]
type = "filter"
inputs = ["remap_log_udev"]
condition = '!is_empty(.event) ?? false'

然后将日志重组为 webhook 需要的 json 格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[transforms.remap_alert_udev]
type = "remap"
inputs = ["filter_event_udev"]
source = '''
. = {
"startsAt": .happen_time,
"labels": {
"hostname": .hostname,
"device": .data.pass_through_name,
"created_time": .timestamp,
"event": downcase!(.name),
"service": "udev"
}
}
if (.labels.event == "add_device") {
.labels.alertname = "event_add_device"
.labels.severity = "info"
} else if (.labels.event == "remove_device") {
.labels.alertname = "event_remove_device"
.labels.severity = "critical"
}
'''

最后通过 http sink 的能力发送出去即可:

1
2
3
4
5
6
7
[sinks.alertmanager]
type = "http"
inputs = ["remap_alert_*"]
uri = "http://alertmanager:9093/api/v2/alerts"
compression = "none"
encoding.codec = "json"
acknowledgements.enabled = true

将 Vector 拆分为 Collector 和 Aggregator

在实际场景中,我们通常将 sources 单独放在主机上,然后配置 sinks.vector 将所有采集的日志发送到一个单独的 vector 实例(aggregator)上。

通过 transforms.add_fieldstransforms.route 来实现,首先,我们在 collector 端配置 sources,并在每个 sources 后面跟着一个 transforms.add_fields 来标记这条日志是由什么服务产生的,并下沉到 vector aggregator 实例上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[sources.nginx_access]
type = "file"
include = [ "/log/nginx/access.log" ]
read_from = "end"

[transforms._nginx_access]
type = "add_fields"
inputs = ["nginx_access"]
fields.service = "nginx_access"

[sinks.vector]
type = "vector"
inputs = ["_*"]
address = "192.168.xxx.xxx:8687"
acknowledgements.enabled = true

同时需要在 aggregator 上配置 sources.vector 并开放监听端口,然后使用 transforms.route 将日志路由到不同监听组件的 inputs 下,这样在接下来的组件中就可以通过 inputs = ["router.xxx"] 的形式处理不同服务的日志了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[sources.vector]
type = "vector"
address = "0.0.0.0:8687"

[transforms.router]
type = "route"
inputs = ["vector"]
route.nginx = '.service == "nginx_access"'

[transforms.remap_log_nginx_access]
type = "remap"
inputs = ["router.nginx"]
source = '''
.severity = "info"
. |= parse_nginx_log!(.message, "combined")
'''

[sinks.loki]
type = "loki"
inputs = ["remap_log_*"]
endpoint = "http://loki-adapter:3100"
compression = "none"
encoding.codec = "json"
remove_label_fields = true
acknowledgements.enabled = true

[sinks.loki.labels]
host = "{{ host }}"
severity = "{{ severity }}"
service = "{{ service }}"

总结

Vector 中文参考资料还是比较少的,但好在官网文档说明已经很全面了,做为一个日志流处理工具,它是一把瑞士军刀,已经被 Datadog 收购,结合 Clickhouse,相信很快会在国内各大厂看到它的身影。

附录

日志的时区问题

在日志流中,时区问题通常是比较棘手的,不同的服务记录的日志格式各不相同,时间也通常有如下几种形式,以北京时间 UTC+8 为例,日志时间可能是:

  • UTC+8 的时间,不带时区信息;
  • UTC+8 的时间,且带时区信息;
  • UTC 时间,带时区信息;
    如果不妥善处理,查询出来的日志可能会有提前八小时或推后八小时的情况出现。

而在 Vector 中,对 timestamp 的处理我们都是通过正则匹配后,由 parse_timestamp 函数解决的,该函数在传入的时间不带时区信息的情况下,会将时间转换为当前时区,也就是说为了正确处理时区问题,我们要将所有时间正确地转换为 UTC 时间戳并交由 Vector 处理时区。

只消稍微琢磨一下,就知道处理方式了:将 Vector 实例设置为正确的时区(collector 和 aggregator 都要)、并在 parse_timestamp 时尽可能传入时区信息即可。

Vector 监测与调试方法

在 Vector 的配置文件中,可以通过配置开启 API(默认监听端口为 8686)来启动 vector api,然后就可以通过 vector top 来观察每个组件的 Events In、Events Out、Errors 等信息了,能够知道自己的日志处理中是否有出错的地方,可以通过 Vector 的日志看到 transforms 出错的原因。

1
2
[api]
enabled = true

配置文件过多的解决办法

如果采集的日志过多,配置文件也可能越来越复杂,Vector 在启动时可通过 -C, --config-dir 的方式扫描一个文件夹下的所有配置文件,并自行组装,这样一来我们就可以将每个服务的配置单独放到一个文件里。

Reference