闽公网安备 35020302035485号
输出:Elasticsearch 并非存储的唯一选择,Logstash 提供很多输出选择。
灵活性:具备多个案例场景。支持数字、文本、地理位置、结构化、非结构化,所有的数据类型都欢迎。


3.Logstash、Elasticsearch、Kibana 安装包,您可以在 此页面 下载。
tar -xzvf logstash-7.3.0.tar.gz显示更多简单用例测试,进入到解压目录,并启动一个将控制台输入输出到控制台的管道。
cd logstash-7.3.0
elk@elk:~/elk/logstash-7.3.0$ bin/logstash -e 'input { stdin {} } output { { stdout {} } }'
显示更多看到如下日志就意味着 Logstash 启动成功。
{
"@timestamp" => 2019-08-10T16:11:10.040Z,
"host" => "elk",
"@version" => "1",
"message" => "Hello Logstash"
}
tar -xzvf elasticsearch-7.3.0-linux-x86_64.tar.gz启动 Elasticsearch:
cd elasticsearch-7.3.0/ bin/elasticsearch在启动 Elasticsearch 的过程中我遇到了两个问题在这里列举一下,方便大家排查。


elk@elk:~$ curl http://localhost:9200
{
"name" : "elk",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "hqp4Aad0T2Gcd4QyiHASmA",
"version" : {
"number" : "7.3.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "de777fa",
"build_date" : "2019-07-24T18:30:11.767338Z",
"build_snapshot" : false,
"lucene_version" : "8.1.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
安装 Kibanatar -xzvf kibana-7.3.0-linux-x86_64.tar.gz修改配置文件 config/kibana.yml ,主要指定 Elasticsearch 的信息。
elasticsearch.hosts: "http://ip:9200" # 允许远程访问 server.host: "0.0.0.0" # Elasticsearch用户名 这里其实就是我在服务器启动Elasticsearch的用户名 elasticsearch.username: "es" # Elasticsearch鉴权密码 这里其实就是我在服务器启动Elasticsearch的密码 elasticsearch.password: "es"启动 Kibana:
cd kibana-7.3.0-linux-x86_64/bin ./kibana在浏览器中访问 http://ip:5601 ,若出现以下界面,则表示 Kibana 安装成功。

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<contextName>Logback For demo Mobile</contextName>
<property name="LOG_HOME" value="/log" />
<springProperty scope="context" name="appName" source="spring.application.name"
defaultValue="localhost" />
...
<appender name="ROLLING_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
...
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{25} ${appName} -%msg%n</pattern>
</encoder>
...
</appender>
...
</configuration>
以上内容省略了很多内容,您可以在源码中获取。在上面的配置中我们定义了一个名为 ROLLING_FILE 的 Appender 往日志文件中输出指定格式的日志。而上面的 pattern 标签正是具体日志格式的配置,通过上面的配置,我们指定输出了时间、线程、日志级别、logger(通常为日志打印所在类的全路径)以及服务名称等信息。# 打包命令 mvn package -Dmaven.test.skip=true # 部署命令 java -jar sb-elk-start-0.0.1-SNAPSHOT.jar查看日志文件, logback 配置文件中我将日志存放在 /log/sb-log.log 文件中,执行 more /log/sb-log.log 命令,出现以下结果表示部署成功。

# 堆代码 duidaima.com
input {
file {
path => [
# 这里填写需要监控的文件
"/log/sb-log.log"
]
}
}
output {
# 输出到redis
redis {
host => "10.140.45.190" # redis主机地址
port => 6379 # redis端口号
db => 8 # redis数据库编号
data_type => "channel" # 使用发布/订阅模式
key => "logstash_list_0" # 发布通道名称
}
}
其实 Logstash 的配置是与前面提到的 Logstash 管道中的三个部分(输入、过滤器、输出)一一对应的,只不过这里我们不需要过滤器所以就没有写出来。上面配置中 Input 使用的数据源是文件类型的,只需要配置上需要收集的本机日志文件路径即可。Output 描述数据如何输出,这里配置的是输出到 Redis。input {
redis {
host => "192.168.142.131" # redis主机地址
port => 6379 # redis端口号
db => 8 # redis数据库编号
data_type => "channel" # 使用发布/订阅模式
key => "sb-logback" # 发布通道名称
}
}
filter {
#定义数据的格式
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} \[%{NOTSPACE:threadName}\] %{LOGLEVEL:level} %{DATA:logger} %{NOTSPACE:applicationName} -(?:.*=%{NUMBER:timetaken}ms|)"}
}
}
output {
stdout {}
elasticsearch {
hosts => "localhost:9200"
index => "logback"
}
}
与 Shipper 不同的是,Indexer 的管道中我们定义了过滤器,也正是在这里将日志解析成结构化的数据。下面是我截取的一条 logback 的日志内容:2023-08-11 18:01:31.602 [http-nio-8080-exec-2] INFO c.i.s.aop.WebLogAspect sb-elk -接口日志
POST请求测试接口结束调用:耗时=11ms,result=BaseResponse{code=10000, message='操作成功'}
在 Filter 中我们使用 Grok 插件从上面这条日志中解析出了时间、线程名称、Logger、服务名称以及接口耗时几个字段。Grok 又是如何工作的呢?我们编写的解析字符串可以使用 Grok Debugger 来测试是否正确,这样避免了重复在真实环境中校验解析规则的正确性。
# 进入到 Logstash 的解压目录,然后执行下面的命令 bin/logstash -f indexer-logstash.conf
# 进入到 Logstash 的解压目录,然后执行下面的命令
bin/logstash -f shipper-logstash.conf
调用 Spring Boot 接口,此时应该已经有数据写入到 ES 中了。

192.168.142.1 - - [17/Aug/2023:21:31:43 +0800] "GET /weblog/get-test?name=elk HTTP/1.1" 200 3 "http://192.168.142.131/swagger-ui.html" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"同样,我们需要为此日志编写一个 Grok 解析规则,如下所示:
%{IPV4:ip} \- \- \[%{HTTPDATE:time}\] "%{NOTSPACE:method} %{DATA:requestUrl}
HTTP/%{NUMBER:httpVersion}" %{NUMBER:httpStatus} %{NUMBER:bytes}
"%{DATA:referer}" "%{DATA:agent}"
完成上面这些之后的关键点是 Indexer 类型的 Logstash 需要支持两种类型的输入、过滤器以及输出,如何支持呢?首先需要给输入指定类型,然后再根据不同的输入类型走不同的过滤器和输出,如下所示。input {
redis {
type => "logback"
...
}
redis {
type => "nginx"
...
}
}
filter {
if [type] == "logback" {
...
}
if [type] == "nginx" {
...
}
}
output {
if [type] == "logback" {
...
}
if [type] == "nginx" {
...
}
}
我的 Nginx 与 Spring Boot 项目部署在同一台机器上,所以还需修改 Shipper 类型的 Logstash 的配置以支持两种类型的日志输入和输出,其配置文件的内容可 点击这里获取。
[program:elasticsearch] environment=JAVA_HOME="/usr/java/jdk1.8.0_221/" directory=/home/elk/elk/elasticsearch user=elk command=/home/elk/elk/elasticsearch/bin/elasticsearch [program:logstash] environment=JAVA_HOME="/usr/java/jdk1.8.0_221/" directory=/home/elk/elk/logstash user=elk command=/home/elk/elk/logstash/bin/logstash -f /home/elk/elk/logstash/indexer-logstash.conf [program:kibana] environment=LS_HEAP_SIZE=5000m directory=/home/elk/elk/kibana user=elk command=/home/elk/elk/kibana/bin/kibana按照以上内容配置完成后,执行 sudo supervisorctl reload 即可完成整个 ELK 的启动,而且其默认是开机自启。当然,我们也可以使用 sudo supervisorctl start/stop [program_name] 来管理单独的应用。另外,欢迎关注公众号码猿技术专栏,后台回复“9527”,送你一份Spring Cloud Aliababa实战视频!