Administrator
Administrator
发布于 2026-05-04 / 2 阅读
0
0

ELK日志分析系统

ELK 日志系统

一、ELK 是什么

ELK Stack 是一个缩略语,由三个热门开源项目组成:Elasticsearch、Logstash 和 Kibana。三者协同实现 “数据收集 → 处理 → 存储 → 可视化” 的全链路日志处理闭环。

1.1 Elasticsearch(核心存储与检索引擎)

Elasticsearch 是一个构建在 Apache Lucene 之上的分布式搜索和分析引擎,是整个 ELK 系统的“数据中枢”。

维度

说明

核心功能

毫秒级全文检索、聚合分析、结构化查询

存储方式

将非结构化日志转化为可索引的 JSON 文档

扩展能力

支持 PB 级数据存储与高并发查询,分布式自动分片和副本

通信方式

通过 RESTful Web 接口(HTTP API)交互,无需复杂客户端适配

💡 简单理解:Elasticsearch 就像一个专门为搜索和分析设计的“超级数据库”。你写入 JSON 格式的数据,它自动建立倒排索引,让查询变得飞快。

关键概念

  • 集群(Cluster):一个或多个 ES 节点组成的集合,协同工作

  • 节点(Node):单个 ES 实例,可配置为主节点(管理集群)、数据节点(存储数据)、协调节点(转发请求)

  • 索引(Index):类似数据库的“表”,是一组结构相似的文档集合

  • 文档(Document):ES 的最小数据单元,以 JSON 格式存储,一条日志就是一个文档

  • 分片(Shard):索引的分块,默认 5 个主分片,分布在不同节点实现并行处理

  • 副本(Replica):主分片的备份,默认 1 个副本,保证高可用和数据安全

1.2 Logstash(数据处理管道)

Logstash 是一个开源的数据收集与处理工具,支持从数百种来源采集数据,并内置丰富的过滤器进行解析、清洗、丰富后输出到目标存储。

维度

说明

架构

Input → Filter → Output 三阶段管道模型

输入插件

支持文件、Syslog、Kafka、JDBC、HTTP、TCP/UDP 等 200+ 种来源

过滤插件

支持 Grok、GeoIP、Date、Mutate、KV 等,用于解析和转换数据

输出插件

最常用 Elasticsearch,也支持文件、Kafka、MongoDB 等

三大组件详解

  • Input(输入):定义数据从哪里来,如从文件读取、从 TCP 端口接收

  • Filter(过滤):最核心的部分,对数据进行解析、转换和增强。按配置顺序依次执行

  • Output(输出):定义处理完的数据往哪里发送

💡 简单理解:Logstash 就像一个“数据加工厂”——原料(原始日志)从入口进来,经过各种机器(过滤器)加工处理,变成规整的产品(结构化 JSON),再从出口送走。

1.3 Kibana(数据可视化与探索界面)

Kibana 是一个数据可视化工具,与 Elasticsearch 紧密集成,提供 Web 界面让用户无需编写代码即可搜索、查看和分析 Elasticsearch 中的数据。

维度

说明

核心功能

提供仪表盘、柱状图、折线图、饼图、热力图、地理分布等多种可视化组件

查询语言

支持 KQL(Kibana Query Language)和 Lucene Query Syntax

典型应用

日志检索、时序数据分析、应用监控、安全分析

💡 简单理解:Kibana 就是 ELK 系统的“驾驶舱”——所有收集到的日志和分析结果,都通过它展示成直观的图表,让你一眼看清系统状态。

二、企业推荐版本

2.1 版本选择建议

场景

推荐版本

说明

生产环境(成熟稳定)

8.14.x ~ 8.17.x

功能完善,社区成熟,插件兼容性好,被广泛验证

生产环境(最新特性)

9.x 系列

最新功能和 AI 能力,但部分功能(如 Enterprise Search)已移除

学习/测试环境

8.x 最新版

功能完整,文档齐全,易于上手

2.2 版本兼容性原则

  • Elasticsearch、Kibana、Logstash 的大版本号必须保持一致,否则可能出现兼容性问题

  • Logstash 与 Kibana 之间没有直接的版本兼容性要求,但为了保证整个 ELK Stack 的稳定性,建议使用相同大版本

本笔记采用版本:8.14.0 — 兼顾功能完整性与稳定性,适合学习和生产参考

三、Docker Compose 安装 ELK(企业推荐配置)

3.1 环境准备

  • 已安装 Docker 和 Docker Compose

  • 内存建议:至少 4GB 空闲给 Docker(ES 分配 1GB,Kibana 约 512MB,Logstash 约 512MB)

  • 磁盘建议:20GB+ 空闲空间

3.2 创建目录结构

bash

mkdir -p ~/elk-stack/logstash/pipeline
cd ~/elk-stack
mkdir -p logstash/config

最终目录结构:

text

elk-stack/
├── docker-compose.yml
└── logstash/
    └── pipeline/
        └── logstash.conf

3.3 docker-compose.yml 配置文件

yaml

version: '3.8'
​
services:
  # Elasticsearch - 核心存储与检索引擎
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.14.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node        # 单节点模式(学习/测试用)
      - xpack.security.enabled=false      # 禁用安全认证(学习用,生产务必开启)
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"      # JVM 堆内存分配,建议至少 1GB
    volumes:
      - es-data:/usr/share/elasticsearch/data   # 数据持久化
    ports:
      - "9200:9200"                       # REST API 端口(HTTP 通信)
      - "9300:9300"                       # 集群通信端口(节点间内部通信)
    networks:
      - elk-net
​
  # Kibana - 数据可视化界面
  kibana:
    image: docker.elastic.co/kibana/kibana:8.14.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://192.168.10.2:9200   # 指向 ES 容器
      - I18N_LOCALE=zh-CN #支持中文
    ports:
      - "5601:5601"                       # Kibana Web 访问端口
    depends_on:
      - elasticsearch
    networks:
      - elk-net
​
  # Logstash - 数据处理管道
  logstash:
    image: docker.elastic.co/logstash/logstash:8.14.0
    container_name: logstash
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro   # 挂载管道配置
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro   #挂载logstash.yml文件
    ports:
      - "5000:5000/tcp"                   # TCP 输入端口
      - "5000:5000/udp"                   # UDP 输入端口
      - "5044:5044"                       # Beats 协议端口(Filebeat 默认使用)
    depends_on:
      - elasticsearch
    networks:
      - elk-net
​
# 数据卷 - 持久化 ES 数据
volumes:
  es-data:
    driver: local
​
# 网络 - 容器间通信
networks:
  elk-net:
    driver: bridge

3.4 配置文件关键参数说明

参数

含义

discovery.type

single-node

告诉 ES 这是一个单节点实例,无需进行集群节点发现

xpack.security.enabled

false

禁用安全认证(学习用简化配置)

ES_JAVA_OPTS

-Xms1g -Xmx1g

设置 ES 初始和最大堆内存,生产环境建议物理内存 50% 且不超过 32GB

volumes 挂载

./logstash/pipeline:/usr/share/logstash/pipeline:ro

将本地配置文件挂载到容器内,ro 表示只读

depends_on

- elasticsearch

定义服务启动顺序,但不会等待服务完全就绪

3.5 启动服务

bash

# 在 elk-stack 目录下执行
docker compose up -d
##################################启动之前需要配置logstash.conf文件
# 查看运行状态
docker compose ps
​
# 查看日志(排查问题)
docker compose logs -f
​
# 停止服务
docker compose down
​
# 停止并删除数据卷(完全清理)
docker compose down -v

3.6 验证安装

bash

# 1. 检查 Elasticsearch 是否正常运行
curl http://localhost:9200
​
# 预期返回 ES 版本信息
​
# 2. 检查 Kibana 是否可访问
# 浏览器打开 http://localhost:5601
​
# 3. 检查 Logstash 是否运行
docker logs logstash --tail 20

四、Logstash 配置文件详解 + Grok 解析 Nginx 日志

创建 logstash/config/logstash.yml 文件:

cat > logstash/config/logstash.yml << 'EOF'
pipeline.batch.size: 500
pipeline.batch.delay: 10
EOF

💡 解释:

  • pipeline.batch.size: 500 → 每批次最多处理 500 条事件(原默认 125)

  • pipeline.batch.delay: 10 → 最多等待 10 毫秒(原默认 50 毫秒)

    你可以根据自己服务器性能和日志量调整这两个值

4.1 logstash.conf 完整配置

创建文件 ~/elk-stack/logstash/pipeline/logstash.conf

ruby

# ==================== Input(输入层)====================
# 定义数据从哪里来
​
input {
    # 方式一:从 Filebeat 接收(生产推荐)
    beats {
        port => 5044
        client_inactivity_timeout => 3600
    }
    
    # 方式二:从 TCP 端口接收(直接发送)
    tcp {
        port => 5000
        codec => "json_lines"
    }
    
    # 方式三:从 UDP 端口接收
    udp {
        port => 5000
        codec => "json"
    }
}
​
# ==================== Filter 配置 ====================
# 这是 Logstash 最核心的部分
# 根据 Filebeat 传来的 tags 分别处理不同类型的日志
​
filter {   # <--- 添加这一行
​
# ---------- 1. Nginx 访问日志 ----------
if "nginx-access" in [tags] {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
        break_on_match => false
    }
    mutate {
        rename => {
            "clientip"    => "client_ip"
            "ident"       => "ident"
            "auth"        => "auth_user"
            "verb"        => "method"
            "request"     => "request_uri"
            "httpversion" => "http_version"
            "response"    => "status_code"
            "bytes"       => "bytes_sent"
            "referrer"    => "referrer"
            "agent"       => "user_agent"
        }
        remove_field => ["ident", "auth", "message", "host", "@version", "log", "input"]
    }
    if [request_uri] {
        grok {
            match => { "request_uri" => "(?<request_path>[^?]*)(?:\?(?<query_string>.*))?" }
        }
    }
    geoip {
        source => "client_ip"
        target => "geo_location"
        fields => ["city_name", "country_name", "continent_code", "location", "region_name"]
    }
    mutate {
        convert => {
            "bytes_sent"    => "integer"
            "status_code"   => "integer"
        }
    }
}
​
# ---------- 2. Nginx 错误日志(兼容多种格式) ----------
if "nginx-error" in [tags] {
    # 先尝试匹配你的实际格式:2026/04/17 08:08:26 [emerg] 13211#13211: message
    grok {
        match => { "message" => "%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME:time} \[%{LOGLEVEL:level}\] %{NUMBER:pid}#%{NUMBER:tid}: %{GREEDYDATA:error_message}" }
        tag_on_failure => []
    }
    # 如果匹配成功,拼接时间戳
    if [year] and [month] and [day] and [time] {
        mutate {
            add_field => { "timestamp" => "%{year}/%{month}/%{day} %{time}" }
        }
        date {
            match => ["timestamp", "yyyy/MM/dd HH:mm:ss"]
            timezone => "Asia/Shanghai"
            target => "@timestamp"
        }
        mutate {
            remove_field => ["year", "month", "day", "time", "timestamp"]
        }
    }
    # 删除原始 message 和无用字段(但保留原始 event.original 以备查看)
    mutate {
        remove_field => ["message", "host", "@version"]
    }
}
​
# ---------- 3. 系统日志(/var/log/messages, /var/log/secure) ----------
# 常见格式:日期 主机名 进程[pid]: 消息
# 例如:Apr 17 06:25:01 master3 systemd: Started Session 123 of user root.
if "system-log" in [tags] {
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{DATA:process}(?:\[%{NUMBER:pid}\])?: %{GREEDYDATA:message_detail}" }
        break_on_match => false
    }
    # 如果上面的模式失败,尝试更宽松的匹配
    if "_grokparsefailure" in [tags] {
        grok {
            match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{GREEDYDATA:message_detail}" }
        }
    }
    date {
        match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
        timezone => "Asia/Shanghai"
        target => "@timestamp"
    }
    mutate {
        remove_field => ["message", "timestamp", "host"]
    }
}
​
# ---------- 4. Cron 日志(/var/log/cron) ----------
# 格式类似:Apr 17 06:01:01 master3 CROND[12345]: (root) CMD (run-parts /etc/cron.hourly)
if "cron-log" in [tags] {
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{DATA:process}(?:\[%{NUMBER:pid}\])?: %{GREEDYDATA:cron_command}" }
    }
    date {
        match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
        timezone => "Asia/Shanghai"
        target => "@timestamp"
    }
    mutate {
        remove_field => ["message", "timestamp", "host"]
    }
}
​
# ---------- 5. 邮件日志(/var/log/maillog) ----------
# 格式类似系统日志,但消息内容可能包含邮件特定信息
if "mail-log" in [tags] {
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{DATA:process}(?:\[%{NUMBER:pid}\])?: %{GREEDYDATA:mail_message}" }
    }
    date {
        match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
        timezone => "Asia/Shanghai"
        target => "@timestamp"
    }
    mutate {
        remove_field => ["message", "timestamp", "host"]
    }
}
​
# ---------- 6. 内核日志(/var/log/dmesg 或 /var/log/messages 中的 kernel) ----------
# 通常格式:时间戳 内核消息,例如:[    0.000000] Linux version 3.10.0-...
# 或者:Apr 17 06:25:01 master3 kernel: usb 1-1: new device
if "kernel-log" in [tags] {
    grok {
        match => { "message" => "\[%{NUMBER:kernel_time}\] %{GREEDYDATA:kernel_message}" }
    }
    # 如果不是方括号格式,尝试 syslog 格式
    if "_grokparsefailure" in [tags] {
        grok {
            match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} kernel: %{GREEDYDATA:kernel_message}" }
        }
        date {
            match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
            timezone => "Asia/Shanghai"
            target => "@timestamp"
        }
        mutate { remove_field => ["timestamp", "host"] }
    }
    mutate {
        remove_field => ["message", "host", "@version"]
    }
}
​
# ---------- 7. Yum 日志(/var/log/yum.log) ----------
# 格式:日期 时间 动作: 软件包-版本
# 例如:Apr 17 06:29:29 Installed: vsftpd-3.0.2-29.el7_9.x86_64
if "yum-log" in [tags] {
    # 直接提取时间戳、动作、软件包名和版本(用第一个 - 分隔)
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{WORD:action}: %{DATA:package_name}-%{GREEDYDATA:package_version}" }
    }
    date {
        match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
        timezone => "Asia/Shanghai"
        target => "@timestamp"
    }
    mutate {
        remove_field => ["message", "timestamp", "host"]
    }
}
​
# ---------- 8. 自定义应用日志(/var/log/myapp/*.log) ----------
# 这里根据你的实际日志格式来写,下面是一个示例(假设日志格式:时间 级别 消息)
if "myapp-log" in [tags] {
    grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:app_message}" }
    }
    # 如果匹配失败,保留原始消息(修正 unless 语法)
    if "_grokparsefailure" not in [tags] {
        mutate {
            remove_field => ["message", "host", "@version"]
        }
    }
}
​
# ---------- 9. 其他未分类日志(默认) ----------
# 如果没有匹配的 tag,至少尝试提取时间戳,并保留原始消息
if "_grokparsefailure" not in [tags] and [tags] not in [["nginx-access"], ["nginx-error"], ["system-log"], ["cron-log"], ["mail-log"], ["kernel-log"], ["yum-log"], ["myapp-log"]] {
    # 尝试提取常见的时间戳
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}" }
        tag_on_failure => []
    }
    if [timestamp] {
        date {
            match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
            timezone => "Asia/Shanghai"
            target => "@timestamp"
        }
        mutate { remove_field => ["timestamp"] }
    }
}
​
}   # <--- 闭合 filter 块
​
# ==================== 输出配置(根据日志分类写入不同索引) ====================
output {
    # 如果这条日志的 tags 字段里包含 "nginx-access"
    # (说明是 Filebeat 采集的 Nginx 访问日志)
    if "nginx-access" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]                     # 发送到本机的 Elasticsearch
            index => "logstash-nginx-access-%{+YYYY.MM.dd}"   # 索引名:logstash-nginx-access-2026.04.17(按天分割)
        }
    }
    # 否则,如果 tags 里包含 "nginx-error"(Nginx 错误日志)
    else if "nginx-error" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-nginx-error-%{+YYYY.MM.dd}"    # 单独存放错误日志的索引
        }
    }
    # 否则,如果 tags 里包含 "system-log"(系统日志,如 /var/log/messages)
    else if "system-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-system-%{+YYYY.MM.dd}"         # 系统日志专用索引
        }
    }
    # 否则,如果 tags 里包含 "cron-log"(定时任务日志)
    else if "cron-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-cron-%{+YYYY.MM.dd}"           # cron 日志专用索引
        }
    }
    # 否则,如果 tags 里包含 "mail-log"(邮件日志)
    else if "mail-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-mail-%{+YYYY.MM.dd}"           # 邮件日志专用索引
        }
    }
    # 否则,如果 tags 里包含 "kernel-log"(内核日志)
    else if "kernel-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-kernel-%{+YYYY.MM.dd}"         # 内核日志专用索引
        }
    }
    # 否则,如果 tags 里包含 "yum-log"(yum 安装日志)
    else if "yum-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-yum-%{+YYYY.MM.dd}"            # yum 日志专用索引
        }
    }
    # 否则,如果 tags 里包含 "myapp-log"(你自己的应用日志)
    else if "myapp-log" in [tags] {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-myapp-%{+YYYY.MM.dd}"          # 应用日志专用索引
        }
    }
    # 如果以上条件都不满足(即 tags 里没有任何已知的标识)
    else {
        elasticsearch {
            hosts => ["192.168.10.2:9200"]
            index => "logstash-default-%{+YYYY.MM.dd}"        # 丢到一个默认索引,防止数据丢失
        }
    }
}

你看到的配置

大白话作用

input { beats ... tcp ... udp ... }

开好几个门,让不同来源的数据都能进来

if "nginx-access" in [tags]

区分日志类型,不同格式不同处理

grok { match => ... }

把一行文本拆成 IP、状态码、URL 等字段

mutate { rename => ... }

把字段名改成自己喜欢的名字

mutate { remove_field => ... }

扔掉没用的字段,省磁盘

grok { match => request_uri ... }

/api/users?id=1 拆成路径和参数

geoip { ... }

根据 IP 查出城市、国家,加进数据里

mutate { convert => ... }

把数字字符串转成真正的数字,才能做加减比较

elasticsearch { index => "...%{+YYYY.MM.dd}" }

按天存到 ES,方便删除旧数据

batch_size / batch_delay

攒一批再发,又快又不卡

4.2 Grok 解析详解

4.2.1 Grok 是什么

Grok 是 Logstash 中的过滤器,用于将非结构化日志解析为结构化和可查询的数据。它位于正则表达式之上,使用文本模式匹配日志文件中的行。

Grok 语法%{SYNTAX:SEMANTIC}

  • SYNTAX:匹配值的类型(如 IPNUMBERTIMESTAMP_ISO8601WORD

  • SEMANTIC:字段名称(如 client_ipstatus_code

4.2.2 常用 Grok 模式速查

模式

匹配内容

示例

%{IP:client_ip}

IPv4/IPv6 地址

116.236.167.58

%{IPORHOST:client}

IP 或主机名

localhost / 192.168.1.1

%{NUMBER:bytes}

整数或浮点数

19939 / 0.043

%{WORD:method}

单词(字母数字+下划线)

GET / POST

%{DATA:anything}

任意字符串(非贪婪)

anything

%{GREEDYDATA:everything}

任意字符串(贪婪)

everything...

%{TIMESTAMP_ISO8601:timestamp}

ISO8601 时间戳

2016-03-29T17:30:32+08:00

%{HTTPDATE:timestamp}

HTTP 日期格式

29/Mar/2016:17:30:32 +0800

%{URIPATHPARAM:uri}

URL 路径+参数

/get-data/?id=123

%{NOTSPACE:value}

非空格字符

anything_without_space

4.2.3 Nginx 日志匹配详解

Nginx combined 日志格式示例

text

116.236.167.58 - - [29/Mar/2016:17:30:32 +0800] "POST /get-screen-data/ HTTP/1.1" 200 19939 "http://example.com" "Mozilla/5.0..."

使用内置模式 %{COMBINEDAPACHELOG} 解析后得到以下字段:

字段

说明

clientip

116.236.167.58

客户端 IP 地址

ident

-

身份标识(通常为 -)

auth

-

认证用户(通常为 -)

timestamp

29/Mar/2016:17:30:32 +0800

请求时间

verb

POST

HTTP 方法

request

/get-screen-data/

请求 URI

httpversion

1.1

HTTP 版本

response

200

HTTP 状态码

bytes

19939

响应字节数

referrer

http://example.com

来源页面

agent

Mozilla/5.0...

用户代理

💡 调试技巧:推荐使用 Grok Debugger 在线调试 Grok 表达式,或在 Kibana 的 Developer Tools → Grok Debugger 中进行测试。

五、Filebeat — RPM 方式安装与配置

Filebeat 是 ELK Stack 中的轻量级日志收集器,部署在每台应用服务器上,负责读取日志文件并转发到 Logstash 或 Elasticsearch。

5.1 RPM 安装方式(推荐)

方法一:使用 YUM 仓库安装(最推荐)

bash

# 1. 添加 Elastic 官方 YUM 仓库(确保软件包来源可信)
sudo tee /etc/yum.repos.d/elastic-beats.repo << EOF
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# 2. 安装 Filebeat
sudo yum install filebeat -y

# 3. 启动并设置开机自启
sudo systemctl start filebeat
sudo systemctl enable filebeat

方法二:手动下载 RPM 包安装

bash

# 1. 下载对应版本的 Filebeat RPM 包
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.0-x86_64.rpm

# 2. 安装 RPM 包(-i 安装,-v 显示详情,-h 显示进度)
sudo rpm -ivh filebeat-8.14.0-x86_64.rpm

💡 RPM 安装的优势:自动处理依赖关系,通过系统包管理工具方便地进行安装、升级和维护,更好地与系统集成。

5.2 配置文件详解

Filebeat 的主配置文件位于 /etc/filebeat/filebeat.yml

yaml

# ==================== 输入配置(定义要采集的日志) ====================
filebeat.inputs:
  
  # 采集 Nginx 访问日志
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/access.log*
    # 排除压缩文件(减少资源消耗)
    exclude_files: ['.gz$']
    # 忽略 72 小时前修改的文件
    ignore_older: 72h
    # 添加自定义 tag,便于 Logstash 区分日志类型
    tags: ["nginx-access"]
    
  # 采集 Nginx 错误日志
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/error.log*
    exclude_files: ['.gz$']
    ignore_older: 72h
    tags: ["nginx-error"]
    
  # 采集系统日志(通用系统消息和登录安全日志)
  - type: log
    enabled: true
    paths:
      - /var/log/messages
      - /var/log/secure
    tags: ["system-log"]

  # ==================== 多行日志合并(处理 Java 堆栈等跨行日志) ====================
  # 当一条日志跨越多行时,需要配置 multiline 将它们合并成一个事件
  # 采集自定义应用日志(支持多行合并)
  - type: log
    enabled: true
    paths:
      - /var/log/myapp/*.log
    # 匹配以日期时间开头的行作为新日志的开始
    multiline.pattern: '^\[?[0-9]{4}-[0-9]{2}-[0-9]{2}'
    multiline.negate: true
    multiline.match: after
    tags: ["myapp-log"]

  # ---------- 下面是你要求“加上”的更多日志,都用大白话写了注释 ----------

  # 采集定时任务日志(cron 计划任务执行的记录)  nstall vsftpd
  - type: log
    enabled: true
    paths:
      - /var/log/cron
    tags: ["cron-log"]

  # 采集邮件日志(发信、收信、邮件错误等)
  - type: log
    enabled: true
    paths:
      - /var/log/maillog
    tags: ["mail-log"]

  # 采集内核日志(系统内核产生的各种消息,比如硬件、驱动)
  - type: log
    enabled: true
    paths:
      - /var/log/dmesg
    tags: ["kernel-log"]

  # 采集 yum 日志(用 yum 安装、更新软件包的历史记录)
  - type: log
    enabled: true
    paths:
      - /var/log/yum.log
    tags: ["yum-log"]

# ==================== 输出配置(发送到 Logstash — 生产推荐) ====================
output.logstash:
  # Logstash 服务器地址和端口(Filebeat 默认使用 Beats 协议 5044 端口)
  hosts: ["192.168.10.2:5044"]
  
  # 可选:如果 Logstash 需要认证
  # username: "elastic"
  # password: "your_password"
  
  # SSL/TLS 配置(生产环境强烈推荐开启)
  # ssl.enabled: true
  # ssl.certificate_authorities: ["/etc/filebeat/certs/ca.crt"]
  # ssl.certificate: "/etc/filebeat/certs/client.crt"
  # ssl.key: "/etc/filebeat/certs/client.key"

# ==================== 输出到 Elasticsearch(直接发送,备选方案) ====================
# 如果不经过 Logstash,可直接发送到 Elasticsearch
# output.elasticsearch:
#   hosts: ["192.168.10.2:9200"]
#   index: "filebeat-nginx-%{+yyyy.MM.dd}"
#   # 如果启用了安全认证
#   # username: "elastic"
#   # password: "your_password"

# ==================== Kibana 配置(可选,用于设置仪表板) ====================
setup.kibana:
  host: "192.168.10.2:5601"

# ==================== 性能优化配置 ====================
# 批量发送的日志条数(提高吞吐量)
filebeat.config.modules.path: ${path.config}/modules.d/*.yml
# 设置文件资源管理器关闭时间(文件未更新时关闭句柄,默认 5 分钟)
# close_older: 5m

5.3 启动与管理

bash

# 1. 测试配置文件语法是否正确
sudo filebeat test config

# 预期输出:Config OK

# 2. 启动 Filebeat 服务
sudo systemctl start filebeat

# 3. 设置开机自启
sudo systemctl enable filebeat

# 4. 查看运行状态
sudo systemctl status filebeat

# 5. 查看 Filebeat 自身日志(排查问题)
sudo tail -f /var/log/filebeat/filebeat

# 6. 重新加载配置(修改配置后)
sudo systemctl restart filebeat

# 7. 停止服务
sudo systemctl stop filebeat

5.4 数据流验证

bash

# 1. 确保 Logstash 正在运行(在 ELK 服务器上)
docker ps | grep logstash

# 2. 查看 Filebeat 是否成功发送数据
sudo filebeat test output

# 3. 在 Kibana 中创建索引模式
# 浏览器访问 Kibana → Stack Management → Index Patterns → Create index pattern
# 输入索引名称:logstash-nginx-* 或 filebeat-*
# 选择时间字段:@timestamp

# 4. 在 Discover 中搜索日志
# 使用 KQL 搜索:tags: "nginx-access" AND status_code: 500

六、ELK 数据流完整示意图

七、常见问题排查

7.1 Logstash 无法连接到 Elasticsearch

bash

# 检查 ES 是否运行
curl http://elasticsearch:9200

# 检查 Logstash 日志
docker logs logstash --tail 50

7.2 Filebeat 无法连接到 Logstash

bash

# 测试网络连通性
telnet your-logstash-server 5044

# 检查 Filebeat 自身日志
sudo tail -f /var/log/filebeat/filebeat

# 验证输出配置
sudo filebeat test output

7.3 Grok 解析失败

bash

# 1. 在 Kibana 中使用 Grok Debugger 测试
# 路径:Kibana → Developer Tools → Grok Debugger

# 2. 在 Logstash 中启用调试输出(临时)
output {
    stdout { codec => rubydebug }
}

# 3. 使用 _grokparsefailure 标签查找未解析的日志
# 在 Kibana 中搜索:tags: "_grokparsefailure"

7.4 索引空间增长过快

yaml

# 在 logstash.conf 的 output 中设置索引生命周期
elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logstash-nginx-%{+YYYY.MM.dd}"
    # 设置最大索引大小和文档数
    manage_template => true
    template_overwrite => true
}

# 配合 Elasticsearch ILM(索引生命周期管理)自动删除旧索引

7.5 性能优化建议

配置项

建议值

位置

ES 堆内存

物理内存 50%,不超过 32GB

ES_JAVA_OPTS

Logstash 批次大小

batch_size => 500

logstash.conf

Logstash 工作线程

pipeline.workers: 4

logstash.yml

Filebeat 批量发送

bulk_max_size: 2048

filebeat.yml

索引刷新间隔

index.refresh_interval: 30s

ES 索引设置

八、安全注意事项(生产环境必读)

本笔记中为学习方便禁用了安全功能(xpack.security.enabled=false),生产环境必须启用

  1. 启用 X-Pack 安全认证:配置用户名/密码认证

  2. 启用 TLS/SSL 加密通信:节点间通信和客户端访问都需要加密

  3. 配置防火墙规则:仅开放必要的端口(9200、5601、5044)

  4. 使用专用非 root 用户运行 ELK 相关进程

  5. 定期更新版本:关注 Elastic 官方安全公告


版本信息:本笔记基于 Elastic Stack 8.14.0 版本编写,建议在部署前查阅官方文档确认最新稳定版本。


评论