Log analysis with ELK for Business Intelligence systems

In this post I’ll show howto collect logs from several applications (Oracle OBIEE, Oracle Essbase, QlikView, Apache logs, Linux system logs) with the ELK (Elasticsearch, Logstash and Kibana) stack. ELK is a powerful opensource alternative for Splunk. It can easily manage multiline logs.

Installing the ELK stack in docker containers is really fast, easy and flexible..

Client side

To be installed where are the logs.

On Linux, using docker, run

docker run -d --restart=unless-stopped -v "$PWD"/filebeat.yml:/filebeat.yml -v "$PWD"/filebeat.template.json:/filebeat.template.json -v "$PWD"/filebeat.template-es2x.json:/filebeat.template-es2x.json -v /home/oracle/Oracle:/Oracle:ro -v /var/log:/var/log:ro --name filebeat prima/filebeat:latest

Sample filebeat.yml file for collecting Obiee logs is

filebeat.prospectors:
- input_type: log
 document_type: obiee_access_log
 paths:
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/bi_server1/logs/access.log
 exclude_lines: ["\t200\t[0-9]+$"]
 #ignore_older: 10m
- input_type: log
 document_type: obiee_log
 paths:
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/bi_server1/logs/bi_server1-diagnostic.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obiccs1/logs/nqcluster.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obiccs1/logs/nqscheduler.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obijh1/logs/jh.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obips1/logs/sawlog0.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obis1/logs/obis1-diagnostic.log
 - /Oracle/Middleware/Oracle_Home/user_projects/domains/bi/servers/obisch1/logs/nqscheduler.log
 multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
 multiline.negate: true
 multiline.match: after
 ignore_older: 10m
 fields:
 level: debug
- input_type: log
 document_type: apache_httpd_log
 paths:
 - /var/log/apache2/*_log
 ignore_older: 10m
- input_type: log
 paths:
 - /var/log/*.log
 - /var/log/messages
 document_type: syslog
 exclude_lines: ["^DBG"]
 include_lines: ["^ERR", "^WARN", "Error", "error"]
 exclude_files: [".gz$", "vmware-vmsvc*.log"]
 fields:
 level: debug
 #review: 1
 ignore_older: 10m
name: zaffiro.it.pirelli.com
tags: ["obiee"]
fields:
 env: production
output.logstash:
 # The Logstash hosts
 hosts : ["ambra.redaelli.org:5044"]
processors:
- drop_event:
 when:
 or:
 - regexp:
 message: "/Info"
 - regexp:
 message: "NOTIFICATION"
logging.level: error
logging.selectors: ["*"]

Sample filebeat.yaml for Qlikview logs

filebeat.prospectors:
- input_type: log
 document_type: qlikview_log
 paths:
 - G:\QLIK_CONFIG\QVS\Logs\Events_PAPPAPP13IT_*.log
 include_lines: ["Error"]
 ignore_older: 10m
 fields:
 name: pappapp13it
- input_type: log
 document_type: qlikview_log
 paths:
 - G:\QLIK_CONFIG\QVS\Logs\Events_PAPPAPP18IT_*.log
 include_lines: ["Error"]
 ignore_older: 10m
 fields:
 name: pappapp18it
- input_type: log
 document_type: qlikview_log
 paths:
 - G:\QLIK_CONFIG\QVS\Logs\Events_PAPPAPP25IT_*.log
 include_lines: ["Error"]
 ignore_older: 10m
 fields:
 name: pappapp25it
#================================ General =====================================
tags: ["qlikview"]
fields:
 env: production

#================================ Outputs =====================================
output.logstash:
 # The Logstash hosts
 hosts : ["ambra.redaelli.org:5044"]
#================================ Logging =====================================
logging.level: error

Server sid (Logstash and kibana)

On linux, using docker,

docker run -d --restart=unless-stopped --name elasticsearch -p 9200:9200 -p 9300:9300 -v /home/oracle/apps/docker_shares/elasticsearch_data:/usr/share/elasticsearch/data elasticsearch
docker run -d --restart=unless-stopped --name logstash -p 5044:5044 --link elasticsearch:elasticsearch -v /home/oracle/apps/docker_shares/logstash:/logstash logstash -f /logstash/logstash.conf
docker run --restart=unless-stopped --name kibana --link elasticsearch:elasticsearch -p 5601:5601 -d kibana

Sample logstash.conf

input {
 beats {
 port => 5044
 }
}

filter {
 if [type] == "syslog" {
 grok {
 match => { "message" => "%{SYSLOGLINE}" }
 }

 date {
 match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
 }
 } else
 if [type] == "apache_httpd_log" {
 grok {
 match => { "message" => "%{COMBINEDAPACHELOG}" }
 }
 date {
 match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
 }
 } else
 if [type] == "essbase_log" {
 mutate {
 gsub => ['message', "\n", " "]
 }
 mutate {
 gsub => ['message', "\r", " "]
 }
 grok{
 match => ["message","\[%{TIMESTAMP_ISO8601:timestamp}\] \[.*\] \[%{LOGLEVEL:loglevel}\:?\d*\] %{GREEDYDATA:log_message}"]
 }
 date {
 match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"]
 }
 if ([message] !~ "ERROR" or [message] !~ "Error") {
 drop {}
 }
 } else
 if [type] == "obiee_log" {
 mutate {
 gsub => ['message', "\n", " "]
 }
 mutate {
 gsub => ['message', "\r", " "]
 }
 grok{
 match => ["message","\[%{TIMESTAMP_ISO8601:timestamp}\] \[.*\] \[%{LOGLEVEL:loglevel}\:?\d*\] %{GREEDYDATA:log_message}"]
 }
 date {
 match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"]
 }
 if ([message] !~ "\[ERROR") {
 drop {}
 }
 } else
 if [type] == "obiee_access_log" {
 # 2017-02-17 11:37:01 GET e82f1268-7238-4672-a983-03e62ebc9b9c-0000026f 0 /analytics/Missing_uicomponents/sensecomplete/sensecomplete.css 404 1164
 grok {
 match => { "message" => "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}\t%{TIME:time}\t%{WORD:method}\t%{NOTSPACE:dummy1}\t%{NUMBER:dummy2}\t%{URIPATH:uri}\t%{NUMBER:response}\t%{NUMBER:bytes}"}
 }
 mutate {
 add_field => {
 "timestamp" => "%{year}-%{month}-%{day} %{time}"
 }
 }
 mutate {
 gsub => ['message', "\t", " "]
 }
 date {
 match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss" ]
 timezone => "Europe/Rome"
 }
 mutate {
 remove_field => [ "timestamp", "year", "month", "day", "time", "dummy1", "dummy2" ]
 }
 } else
 if [type] == "qlikview_log" {
 # Server Started Timestamp SeverityID EventID Severity Message
 # 2017-02-19 18:01:29 2017-02-21 00:00:16 4 700 Information Debug: CNTService::Handler(4)
 grok {
 match => { "message" => "%{YEAR:y}-%{MONTHNUM:m}-%{MONTHDAY:d} %{TIME:t}\t%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{TIME:time}\t%{NUMBER:severityid}\t%{NUMBER:eventid}\t%{WORD:loglevel}\t%{GREEDYDATA:log_message}"}
 }
 mutate {
 add_field => {
 "timestamp" => "%{year}-%{month}-%{day} %{time}"
 }
 }
 date {
 match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss" ]
 timezone => "Europe/Rome"
 }
 }
}

output {
 elasticsearch {
 hosts => ["elasticsearch:9200"]
 index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
 }
 stdout {
 codec => rubydebug
 }
}