Logstash数据采集与清洗

51次阅读
一条评论

Logstash 完整的采集 + 清洗 + 输出配置,专门用来从 Kafka 拿 Nginx 日志 → 解析 → 存入 ES


一、input 模块:从 Kafka 拿数据

input { 
  kafka {
     bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
     group_id => "k8s-001"
     topics => ["oldboyedu-k8s-external-kafka"]
     auto_offset_reset => "earliest"
  }
}  

逐行翻译:

  • kafka {}:使用 kafka 输入插件,消费 Kafka 消息
  • bootstrap_serversKafka 集群地址
  • group_id消费者组名,同一个组不会重复消费
  • topics:要消费的 Kafka 主题
  • auto_offset_reset => earliest从头开始消费(从最早的消息开始读)

二、filter 模块:数据清洗、格式化

filter {
  json {
    source => "message"
  }

json {}:把 message 字段里的 JSON 字符串自动展开成字段

比如 {"a":1} → 变成 a:1

  mutate {
    remove_field => [ "tags","input","agent","@version","ecs" , "log", "host"]
  }

mutate + remove_field:删除没用的字段,减少 ES 存储压力

删掉了 Beats 自带的冗余字段

  grok {
    match => {
      "message" => "%{HTTPD_COMMONLOG}"
    }
  }

grok 正则解析

HTTPD_COMMONLOG:内置的 Nginx/Apache 通用日志格式

作用:把一行日志拆分成:IP、时间、状态码、请求路径等字段

  geoip {
     source => "clientip"
     database => "/root/GeoLite2-City_20250311/GeoLite2-City.mmdb"
     default_database_type => "City"
  }

IP 地理位置解析

source => clientip:从 clientip 字段拿 IP

自动生成:国家、省份、城市、经纬度

用于 Kibana 世界地图展示

  date {
     match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  }

时间格式化

用日志里的真实时间覆盖 @timestamp

否则 ES 会用 Logstash 接收时间,导致时间不准

  useragent {
     source => "message"
     target => "oldboyedu-useragent"
  }

解析浏览器信息

从日志里拆出:浏览器、系统、设备、手机 / PC

结果放到 oldboyedu-useragent 字段里

三、output 模块:输出到 Elasticsearch

output { 
  elasticsearch {
    hosts => ["https://10.0.0.91:9200","https://10.0.0.92:9200","https://10.0.0.93:9200"]
    index => "oldboyedu-logstash-kafka-k8s"
    api_key => "b5UvKpsB7QqI5-7wVg-k:2fgJFWRORhycUJtVQh5Y9g"
    ssl => true
    ssl_certificate_verification => false
  }
}

逐行翻译:

  • elasticsearch {}:输出到 ES
  • hosts:ES 集群地址
  • indexES 索引名称
  • api_key:ES 安全认证(代替账号密码)
  • ssl => true:开启 HTTPS
  • ssl_certificate_verification => false不验证证书(自签名证书必须关)

整段配置的完整工作流

  1. 从 Kafka 读取 Nginx 日志
  2. 解析 JSON
  3. 删除无用字段
  4. Grok 拆分日志为结构化字段
  5. IP 解析成地理位置
  6. 修正日志时间
  7. 解析浏览器 / 设备信息
  8. 最终写入 Elasticsearch
  9. Kibana 可以直接做可视化看板

总结成 1 句话

Kafka 消费 → Nginx 日志解析 → 地理信息 + 浏览器信息 + 时间格式化 → 写入 ES 的完整生产配置!

正文完
 0
评论(一条评论)
2026-05-09 09:38:42 回复

大佬好厉害 看了你的笔记我解决了工作上的问题

 Windows  Edge  中国北京北京市联通