官网: https://vector.dev/docs/setup/quickstart/
Vector 是一款轻量级、超高性能的日志、指标和事件收集工具(Observability Data Pipeline)。它通常用于替代 Logstash 或 Fluentd。
1. 安装 (二进制方式)
这是最通用的 Linux 安装方式,无需依赖包管理器。
# 1. 下载指定版本 (以 0.51.1 的 musl 版本为例)
curl -OL https://packages.timber.io/vector/0.51.1/vector-0.51.1-x86_64-unknown-linux-musl.tar.gz# 2. 解压
tar -xvf vector-0.51.1-x86_64-unknown-linux-musl.tar.gz# 3. 将文件复制到当前目录 (或你指定的安装目录)
cp -r vector-x86_64-unknown-linux-musl/* .# 4. 清理安装包
rm -rf vector-0.51.1-x86_64-unknown-linux-musl.tar.gz vector-x86_64-unknown-linux-musl/# 5. 验证安装
./bin/vector --version
2. 运行管理
2.1 临时调试 (命令行直接运行)
适用于开发环境或快速测试配置文件是否正确。
- 前台启动 (日志直接输出到屏幕,
Ctrl+C停止):
./bin/vector --config config/vector.yaml
后台启动 (nohup):
nohup ./bin/vector --config config/vector.yaml > vector.log 2>&1 &
停止 (后台进程):
# 查找进程 ID (PID)
ps -ef | grep vector
# 杀掉进程 (将 12345 替换为实际 PID)
kill 12345
2.2 生产环境部署 (Systemd 托管)
推荐在生产环境使用。支持开机自启、崩溃自动重启和日志管理。
1. 创建服务文件
创建 /etc/systemd/system/vector.service:
[Unit]
Description=Vector
Documentation=https://vector.dev
After=network-online.target
Requires=network-online.target[Service]
# 注意:修改下面的 User, Group 以及 ExecStart 的绝对路径
User=root
Group=root
ExecStart=/usr/local/bin/vector --config /etc/vector/vector.yaml
Restart=on-failure
RestartSec=10
LimitNOFILE=65536[Install]
WantedBy=multi-user.target
2. 管理命令
# 重载服务配置 (修改 .service 文件后执行)
sudo systemctl daemon-reload# 启动服务
sudo systemctl start vector# 停止服务
sudo systemctl stop vector# 重启服务 (修改 vector.toml 后执行)
sudo systemctl restart vector# 查看状态 (含实时日志片段)
sudo systemctl status vector# 设置开机自启
sudo systemctl enable vector
3. 配置文件详解
Vector 的核心在于配置文件 (通常是 .toml 或 .yaml 格式)。核心概念是一个流水线 (Pipeline),数据流向为:Sources -> Transforms -> Sinks。
3.1 基础配置结构 (vector.yaml)
# -----------------------------------------------------------
# 1. Sources (输入源): 数据从哪里来?
# -----------------------------------------------------------
sources:my_source_id:type: "file" # 类型:读取文件include: # 路径通配符 (列表)- "/var/log/nginx/*.log"ignore_older_secs: 600 # 忽略太旧的文件# -----------------------------------------------------------
# 2. Transforms (转换处理): 数据如何清洗/解析?
# -----------------------------------------------------------
transforms:my_transform_id:type: "remap" # 类型:VRL (Vector Remap Language)inputs: # 指定上一环节的 ID (列表)- "my_source_id"source: | # 使用管道符 | 表示多行字符串# 这里写解析逻辑,例如解析 JSON. = parse_json!(.message) # 添加字段.environment = "production"# -----------------------------------------------------------
# 3. Sinks (输出): 数据去哪里?
# -----------------------------------------------------------
sinks:my_sink_id:type: "console" # 类型:打印到控制台 (调试用)# type: "kafka" # 或者发送到 Kafkainputs: # 指定上一环节的 ID (列表)- "my_transform_id"encoding:codec: "json" # 输出格式
3.3 常用配置片段示例
示例:收集 Nginx 日志 -> 解析 -> 发送到 Elasticsearch
data_dir: "/data/nlu/xuanji-vector/config/tmp"# 添加配置文件
enrichment_tables:sampling_table:type: "file"file:path: "/data/nlu/xuanji-vector/config/sampling_rules.csv"encoding:type: "csv"include_headers: trueschema:service_id: "string"sample_rate: "string"service_name: "string"# 1. 输入源配置 (Sources)
# 作用:定义从哪里读取数据。这里配置的是读取本地文件。
# =================================================================================
sources:nginx_error:type: "file"include:- "/data/nlu/api_gateway_logs/error.log"# "beginning": 如果 Vector 第一次启动且没有记录位置,从文件头开始读(读历史数据)。# "end": 只读启动后新产生的数据。read_from: "beginning"fingerprint:strategy: "device_and_inode"max_read_bytes: 10485760# 读取顺序:false 表示优先读取最新的文件内容oldest_first: false# 多行合并配置 (非常重要)# 作用:处理 Nginx 报错时的堆栈信息(StackTrace),把多行报错合并成一条日志事件。multiline:# 匹配新日志开始的正则:以 "YYYY/MM/DD HH:MM:SS [" 开头的行被视为新的一条start_pattern: '^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \['# 模式:遇到了匹配 start_pattern 的行,就把之前累积的行打包发送mode: "halt_before"# 超时时间:如果 2秒 没遇到下一条新日志,也会强制把当前累积的行发送出去timeout_ms: 2000condition_pattern: '^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \['nginx_access:type: "file"include:- "/data/nlu/api_gateway_logs/access.log"# "beginning": 如果 Vector 第一次启动且没有记录位置,从文件头开始读(读历史数据)。# "end": 只读启动后新产生的数据。read_from: "beginning"# =================================================================================
# 2. 数据处理/转换配置 (Transforms)
# 作用:使用 VRL 语言清洗、解析数据,把文本变成 JSON 结构。
# =================================================================================
transforms:parse_nginx_access:type: "remap"inputs:- "nginx_access"source: |# 1. 初始时间兜底.@timestamp = now()# 2. 正则定义 (保持你提供的正则不变)pattern = r'^(?P<remote_addr>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?P<remote_user>\S+) \[(?P<time_local>\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] "(?P<request>(?:[A-Za-z]+ \S+ HTTP/[\d\.]+|.*?))" "(?P<http_host>.*?)" (?P<status>[0-9]+) (?P<body_bytes_sent>[0-9]+) "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" "(?P<request_time>.*?)" "(?P<ssl_protocol>.*?)" "(?P<ssl_cipher>.*?)" "(?P<http_x_forwarded_for>.*?)" "(?P<upstream_addr>.*?)" "(?P<upstream_status>.*?)" "(?P<upstream_response_length>.*?)" "(?P<upstream_response_time>.*?)" "(?P<http_x_gateway_serve>.*?)" "(?P<http_x_ai_gateway_app_id>.*?)"$'parsed, err = parse_regex(.message, pattern)if err == null {# --- 基础字段提取 ---.app_id = parsed.http_x_ai_gateway_app_id.body_bytes_sent = to_int!(parsed.body_bytes_sent).client_ip = parsed.remote_addr.forwarded_for = parsed.http_x_forwarded_for.http_host = parsed.http_host.http_request = parsed.request.referer = parsed.http_referer.user = parsed.remote_user.status = to_int(parsed.status) ?? 0.user_agent = parsed.http_user_agent.ssl_protocol = parsed.ssl_protocol.ssl_cipher = parsed.ssl_cipher.upstream_addr = parsed.upstream_addr.upstream_status = parsed.upstream_status.serve = parsed.http_x_gateway_serve.upstream_response_time = to_float(parsed.upstream_response_time) ?? 0.0.log_type = "nginx_access"# --- 复杂的 Request Line 拆解 ---req_parts = parse_regex(.http_request, r'(?P<method>\w+) (?P<uri>\S+) (?P<proto>HTTP/[\d\.]+)') ?? {}.method = req_parts.method.protocol = req_parts.proto.uri = req_parts.uri# =======================================================# 核心修复区:时间计算# =======================================================# 1. 解析日志记录时间 (End Time).log_timestamp = parse_timestamp!(parsed.time_local, format: "%d/%b/%Y:%H:%M:%S %z")# 2. 解析请求耗时 (Duration in Seconds)req_time_float = to_float(parsed.request_time) ?? 0.0.request_time = req_time_float# 3. 计算 Start Time (避免 calc_error 的最佳实践:转换为纳秒整数计算)# 逻辑:(日志时间戳_纳秒) - (请求耗时_秒 * 10亿) = 开始时间_纳秒log_ts_nano = to_unix_timestamp(.log_timestamp, unit: "nanoseconds")req_time_nano = to_int(req_time_float * 1_000_000_000)# 执行减法 (整数减整数,绝对安全)start_ts_nano = log_ts_nano - req_time_nano# 转回时间对象final_start_ts = from_unix_timestamp!(start_ts_nano, unit: "nanoseconds")# =======================================================# --- 最终赋值 ---# 1. 覆盖标准时间戳 (Elasticsearch索引使用,UTC格式).@timestamp = final_start_ts# 2. 保留专用字段 (UTC对象).request_start_time = final_start_ts# 3. 生成北京时间字符串 (人类阅读专用)# 例如: "2025-12-17 10:58:02".request_start_time_cn = format_timestamp!(final_start_ts, format: "%Y-%m-%d %H:%M:%S%.3f", timezone: "Asia/Shanghai")} else {# 解析失败处理.parse_error = true.error_message = err.log_type = "nginx_unknown".raw_message = .message}# 环境信息补充.host = get_hostname!()if exists(.file) { .source_file = string!(.file); del(.file) }apply_sampling:type: "remap"inputs: [ "parse_nginx_access" ]source: |# 这里的 .serve 就是正则提取出的 http_x_gateway_serve (例如 "1888645479")key = .serve# 默认采样率:如果不匹配,默认全采 (1.0)target_rate = 1.0# 查 CSV 表record, err = get_enrichment_table_record("sampling_table", {"service_id": key})if err == null && record != null {# CSV 读取出来是 String,必须转 Floattarget_rate = parse_float(record.sample_rate) ?? 1.0}# 记录最终使用的采样率,方便调试(生产环境可删除).sampling_rate_applied = target_rate# 判断请求的状态码,如果状态码非200 / 101 不抽样should_sample = (.status == 200 || .status == 101)# 执行采样丢弃# 如果产生的随机数 (0.0-1.0) 大于目标采样率,则丢弃该条日志if should_sample && random_float(0.0,1.0) > target_rate {abort}
# =================================================================================
# 3. 输出端配置 (Sinks)
# 作用:将处理好的数据发给 Elasticsearch。
# =================================================================================
sinks:elasticsearch_output:type: "elasticsearch"inputs:- "apply_sampling"endpoints:- "http://api-gateway-dashboard-1448-prd-0.es.dba.vivo.lan.:11403"auth:strategy: "basic"user: "elastic"password: "yBfV5ObBQmkl9kAS"bulk:index: "xuanji-sandbox-access-logs-%Y.%m.%d"action: "create"batch:max_events: 1000 # 每攒够 1000 条发一次timeout_secs: 5 # 或者每过 5 秒发一次(看谁先满足)compression: "gzip"# 缓冲区配置 (防止网络中断丢数据)buffer:type: "disk" # 缓存到磁盘(比内存缓存更安全)max_size: 268435488 # 最大缓冲大小:约 256MBwhen_full: "block" # 如果缓冲满了,阻塞源头读取(防止撑爆磁盘),或者选 drop_newest 丢弃healthcheck:enabled: trueuri: "/_cluster/health"request:timeout_secs: 60 # 请求超时时间 60秒retry_attempts: 5 # 失败重试 5 次retry_initial_backoff_secs: 1 # 第一次重试等 1秒retry_max_duration_secs: 300 # 最大重试持续时间 300秒
4. 调试技巧
当 pipeline 不工作时,可以使用以下命令进行排查:
- 检查配置文件语法:
这将检查文件格式是否正确,是否存在未定义的输入引用。
./bin/vector validate --config vector.yaml
查看实时处理情况 (Tap):
类似于 tcpdump,不用修改配置文件即可查看流经某个组件的数据。
# 查看经过 parse_nginx 这个 Transform 后的实时数据
./bin/vector tap parse_nginx
打印详细日志:
./bin/vector -v --config vector.yaml # Info 级别
./bin/vector -vv --config vector.yaml # Debug 级别
5. Enrichment Tables (数据富化表)
当日志里只有 ID(例如 service_id),但你希望在输出的日志里包含可读性更强的名称(例如 service_name)或获取动态配置(如 sample_rate)时,可以使用此功能。
5.1 配置示例 (YAML)
将此段加入配置文件的根级(与 sources, transforms, sinks 平级):
# -----------------------------------------------------------
# 全局数据表定义:加载外部 CSV 文件到内存
# -----------------------------------------------------------
enrichment_tables:# 1. 表的名称 (在 VRL 代码中引用时使用此名字)sampling_table:type: "file" # 数据源类型:本地文件file:path: "/data/nlu/xuanji-vector/config/sampling_rules.csv"encoding:type: "csv" # 文件格式delimiter: "," # (可选) 分隔符,默认逗号include_headers: true # 是否包含标题行 (如果是 true,schema 里的字段名要和 CSV 标题一致)# 2. 数据结构定义 (必须显式指定字段类型)schema:service_id: "string" # 这个通常作为查找的 Keysample_rate: "string" # 注意:CSV 读取进来默认多是字符串,后续脚本中可能需要 to_int/to_floatservice_name: "string" # 补充的描述信息
5.2 关联文件内容示例 (sampling_rules.csv)
文件路径:/data/nlu/xuanji-vector/config/sampling_rules.csv
service_id,sample_rate,service_name
1001,0.5,payment-service
1002,1.0,auth-service
5.3 在 VRL (Transforms) 中如何使用?
定义好 enrichment_tables 后,你需要在 transforms 阶段使用 get_enrichment_table_record 函数来查表。
配置示例:
transforms:enrich_log:type: "remap"inputs:- "my_source_id"# 这里通过 global_variable 声明要使用的表 (Vector 0.35+ 建议方式,部分旧版本不需要)# reroute table configuration might differ slightly by version, # but strictly speaking for VRL logical lookups:source: |# 假设日志里有个字段叫 .service_id,值为 "1001"# 1. 查表:在 'sampling_table' 中查找,匹配条件是 CSV 列 'service_id' 等于日志字段 .service_id# 注意:CSV 的查找 Key 必须匹配唯一行row, err = get_enrichment_table_record("sampling_table", { "service_id": .service_id })# 2. 如果查到了结果 (err 为 null)if err == null {# 将查出来的字段塞入日志.service_name = row.service_name.sampling_strategy = row.sample_rate} else {# 没查到,给默认值.service_name = "unknown"}
关键点总结
- 内存驻留:Vector 会监听文件变化。如果 CSV 文件更新,Vector 会自动重新加载数据到内存(Auto-reload),无需重启进程。
- Schema 匹配:
schema中定义的字段名必须和 CSV 文件的 Header 完全一致。 - 类型转换:CSV 中的数字通常被视为字符串。如果在 VRL 中需要做数学计算(如采样率判断),记得使用
to_float!(row.sample_rate)进行转换。
6. VRL (Vector Remap Language) 语法核心笔记
VRL 是一种专门为处理可观测性数据(日志/指标)设计的表达式语言,类似于简单的脚本语言,执行速度极快。它主要用在 transform 类型的组件中(即 type: "remap")。
6.1 基本概念
.(Dot): 表示当前处理的整条日志事件(Event)。- 字段访问: 使用
.加上字段名。.message: 访问根目录下的 message 字段。.http.status: 访问嵌套字段(JSON 结构)。.tags["host-ip"]: 访问包含特殊字符(如-或.)的字段名。
- 赋值: 使用
=修改或添加字段。 - 删除: 使用
del()函数。