输出:Elasticsearch 并非存储的唯一选择,Logstash 提供很多输出选择。
灵活性:具备多个案例场景。支持数字、文本、地理位置、结构化、非结构化,所有的数据类型都欢迎。
3.Logstash、Elasticsearch、Kibana 安装包,您可以在 此页面 下载。
tar -xzvf logstash-7.3.0.tar.gz显示更多简单用例测试,进入到解压目录,并启动一个将控制台输入输出到控制台的管道。
cd logstash-7.3.0 elk@elk:~/elk/logstash-7.3.0$ bin/logstash -e 'input { stdin {} } output { { stdout {} } }'显示更多看到如下日志就意味着 Logstash 启动成功。
{ "@timestamp" => 2019-08-10T16:11:10.040Z, "host" => "elk", "@version" => "1", "message" => "Hello Logstash" }
tar -xzvf elasticsearch-7.3.0-linux-x86_64.tar.gz启动 Elasticsearch:
cd elasticsearch-7.3.0/ bin/elasticsearch在启动 Elasticsearch 的过程中我遇到了两个问题在这里列举一下,方便大家排查。
elk@elk:~$ curl http://localhost:9200 { "name" : "elk", "cluster_name" : "elasticsearch", "cluster_uuid" : "hqp4Aad0T2Gcd4QyiHASmA", "version" : { "number" : "7.3.0", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "de777fa", "build_date" : "2019-07-24T18:30:11.767338Z", "build_snapshot" : false, "lucene_version" : "8.1.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }安装 Kibana
tar -xzvf kibana-7.3.0-linux-x86_64.tar.gz修改配置文件 config/kibana.yml ,主要指定 Elasticsearch 的信息。
elasticsearch.hosts: "http://ip:9200" # 允许远程访问 server.host: "0.0.0.0" # Elasticsearch用户名 这里其实就是我在服务器启动Elasticsearch的用户名 elasticsearch.username: "es" # Elasticsearch鉴权密码 这里其实就是我在服务器启动Elasticsearch的密码 elasticsearch.password: "es"启动 Kibana:
cd kibana-7.3.0-linux-x86_64/bin ./kibana在浏览器中访问 http://ip:5601 ,若出现以下界面,则表示 Kibana 安装成功。
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"> <contextName>Logback For demo Mobile</contextName> <property name="LOG_HOME" value="/log" /> <springProperty scope="context" name="appName" source="spring.application.name" defaultValue="localhost" /> ... <appender name="ROLLING_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> ... <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{25} ${appName} -%msg%n</pattern> </encoder> ... </appender> ... </configuration>以上内容省略了很多内容,您可以在源码中获取。在上面的配置中我们定义了一个名为 ROLLING_FILE 的 Appender 往日志文件中输出指定格式的日志。而上面的 pattern 标签正是具体日志格式的配置,通过上面的配置,我们指定输出了时间、线程、日志级别、logger(通常为日志打印所在类的全路径)以及服务名称等信息。
# 打包命令 mvn package -Dmaven.test.skip=true # 部署命令 java -jar sb-elk-start-0.0.1-SNAPSHOT.jar查看日志文件, logback 配置文件中我将日志存放在 /log/sb-log.log 文件中,执行 more /log/sb-log.log 命令,出现以下结果表示部署成功。
# 堆代码 duidaima.com input { file { path => [ # 这里填写需要监控的文件 "/log/sb-log.log" ] } } output { # 输出到redis redis { host => "10.140.45.190" # redis主机地址 port => 6379 # redis端口号 db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } }其实 Logstash 的配置是与前面提到的 Logstash 管道中的三个部分(输入、过滤器、输出)一一对应的,只不过这里我们不需要过滤器所以就没有写出来。上面配置中 Input 使用的数据源是文件类型的,只需要配置上需要收集的本机日志文件路径即可。Output 描述数据如何输出,这里配置的是输出到 Redis。
input { redis { host => "192.168.142.131" # redis主机地址 port => 6379 # redis端口号 db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "sb-logback" # 发布通道名称 } } filter { #定义数据的格式 grok { match => { "message" => "%{TIMESTAMP_ISO8601:time} \[%{NOTSPACE:threadName}\] %{LOGLEVEL:level} %{DATA:logger} %{NOTSPACE:applicationName} -(?:.*=%{NUMBER:timetaken}ms|)"} } } output { stdout {} elasticsearch { hosts => "localhost:9200" index => "logback" } }与 Shipper 不同的是,Indexer 的管道中我们定义了过滤器,也正是在这里将日志解析成结构化的数据。下面是我截取的一条 logback 的日志内容:
2023-08-11 18:01:31.602 [http-nio-8080-exec-2] INFO c.i.s.aop.WebLogAspect sb-elk -接口日志 POST请求测试接口结束调用:耗时=11ms,result=BaseResponse{code=10000, message='操作成功'}在 Filter 中我们使用 Grok 插件从上面这条日志中解析出了时间、线程名称、Logger、服务名称以及接口耗时几个字段。Grok 又是如何工作的呢?
我们编写的解析字符串可以使用 Grok Debugger 来测试是否正确,这样避免了重复在真实环境中校验解析规则的正确性。
# 进入到 Logstash 的解压目录,然后执行下面的命令 bin/logstash -f indexer-logstash.conf
# 进入到 Logstash 的解压目录,然后执行下面的命令 bin/logstash -f shipper-logstash.conf调用 Spring Boot 接口,此时应该已经有数据写入到 ES 中了。
192.168.142.1 - - [17/Aug/2023:21:31:43 +0800] "GET /weblog/get-test?name=elk HTTP/1.1" 200 3 "http://192.168.142.131/swagger-ui.html" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"同样,我们需要为此日志编写一个 Grok 解析规则,如下所示:
%{IPV4:ip} \- \- \[%{HTTPDATE:time}\] "%{NOTSPACE:method} %{DATA:requestUrl} HTTP/%{NUMBER:httpVersion}" %{NUMBER:httpStatus} %{NUMBER:bytes} "%{DATA:referer}" "%{DATA:agent}"完成上面这些之后的关键点是 Indexer 类型的 Logstash 需要支持两种类型的输入、过滤器以及输出,如何支持呢?首先需要给输入指定类型,然后再根据不同的输入类型走不同的过滤器和输出,如下所示。
input { redis { type => "logback" ... } redis { type => "nginx" ... } } filter { if [type] == "logback" { ... } if [type] == "nginx" { ... } } output { if [type] == "logback" { ... } if [type] == "nginx" { ... } }我的 Nginx 与 Spring Boot 项目部署在同一台机器上,所以还需修改 Shipper 类型的 Logstash 的配置以支持两种类型的日志输入和输出,其配置文件的内容可 点击这里获取。
[program:elasticsearch] environment=JAVA_HOME="/usr/java/jdk1.8.0_221/" directory=/home/elk/elk/elasticsearch user=elk command=/home/elk/elk/elasticsearch/bin/elasticsearch [program:logstash] environment=JAVA_HOME="/usr/java/jdk1.8.0_221/" directory=/home/elk/elk/logstash user=elk command=/home/elk/elk/logstash/bin/logstash -f /home/elk/elk/logstash/indexer-logstash.conf [program:kibana] environment=LS_HEAP_SIZE=5000m directory=/home/elk/elk/kibana user=elk command=/home/elk/elk/kibana/bin/kibana按照以上内容配置完成后,执行 sudo supervisorctl reload 即可完成整个 ELK 的启动,而且其默认是开机自启。当然,我们也可以使用 sudo supervisorctl start/stop [program_name] 来管理单独的应用。另外,欢迎关注公众号码猿技术专栏,后台回复“9527”,送你一份Spring Cloud Aliababa实战视频!