Logstash 完整的采集 + 清洗 + 输出配置,专门用来从 Kafka 拿 Nginx 日志 → 解析 → 存入 ES。
一、input 模块:从 Kafka 拿数据
input {
kafka {
bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
group_id => "k8s-001"
topics => ["oldboyedu-k8s-external-kafka"]
auto_offset_reset => "earliest"
}
}
逐行翻译:
kafka {}:使用 kafka 输入插件,消费 Kafka 消息bootstrap_servers:Kafka 集群地址group_id:消费者组名,同一个组不会重复消费topics:要消费的 Kafka 主题auto_offset_reset => earliest:从头开始消费(从最早的消息开始读)
二、filter 模块:数据清洗、格式化
filter {
json {
source => "message"
}
json {}:把 message 字段里的 JSON 字符串自动展开成字段
比如 {"a":1} → 变成 a:1
mutate {
remove_field => [ "tags","input","agent","@version","ecs" , "log", "host"]
}
mutate + remove_field:删除没用的字段,减少 ES 存储压力
删掉了 Beats 自带的冗余字段
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
}
grok 正则解析
HTTPD_COMMONLOG:内置的 Nginx/Apache 通用日志格式
作用:把一行日志拆分成:IP、时间、状态码、请求路径等字段
geoip {
source => "clientip"
database => "/root/GeoLite2-City_20250311/GeoLite2-City.mmdb"
default_database_type => "City"
}
IP 地理位置解析
source => clientip:从 clientip 字段拿 IP
自动生成:国家、省份、城市、经纬度
用于 Kibana 世界地图展示
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
时间格式化
用日志里的真实时间覆盖 @timestamp
否则 ES 会用 Logstash 接收时间,导致时间不准
useragent {
source => "message"
target => "oldboyedu-useragent"
}
解析浏览器信息
从日志里拆出:浏览器、系统、设备、手机 / PC
结果放到 oldboyedu-useragent 字段里
三、output 模块:输出到 Elasticsearch
output {
elasticsearch {
hosts => ["https://10.0.0.91:9200","https://10.0.0.92:9200","https://10.0.0.93:9200"]
index => "oldboyedu-logstash-kafka-k8s"
api_key => "b5UvKpsB7QqI5-7wVg-k:2fgJFWRORhycUJtVQh5Y9g"
ssl => true
ssl_certificate_verification => false
}
}
逐行翻译:
elasticsearch {}:输出到 EShosts:ES 集群地址index:ES 索引名称api_key:ES 安全认证(代替账号密码)ssl => true:开启 HTTPSssl_certificate_verification => false:不验证证书(自签名证书必须关)
整段配置的完整工作流
- 从 Kafka 读取 Nginx 日志
- 解析 JSON
- 删除无用字段
- Grok 拆分日志为结构化字段
- IP 解析成地理位置
- 修正日志时间
- 解析浏览器 / 设备信息
- 最终写入 Elasticsearch
- Kibana 可以直接做可视化看板
总结成 1 句话
Kafka 消费 → Nginx 日志解析 → 地理信息 + 浏览器信息 + 时间格式化 → 写入 ES 的完整生产配置!
正文完
大佬好厉害 看了你的笔记我解决了工作上的问题