Create a Robust Logstash configuration
The past week I was going over an ELK configuration for log processing at a customer. Like always, there were some things to improve. Before diving into the configuration, I wanted to have a look at a few options that Logstash provides these days. I started a small experiment to investigate using pipelines, and persistent queues. To make it run on my laptop I decided to have a look at a docker-compose configuration. In this blog post, you will learn about some of my findings to make this experiment work.
Context of the experiment
The main goal for the experiment is to make a more robust Logstash configuration for a project with a lot of steps in the filter part and some performance issues on the UDP input. Using the most recent version of Logstash (7.6 at the time of writing), I want to implement an example using multiple pipelines, one for the input of UDP messages, one for the output to elasticsearch using multiple indexes, and more than one for the filter part of different paths. The sub-goal for the experiment is gaining experience with Elastic X-Pack features that are available through Kibana concerning pipeline monitoring.
Used websites and libraries
The most interesting resource for this experiment was this GitHub repository: “https://github.com/deviantony/docker-elk“. This gave me a jump start to configure the complete ELK stack. Of course, it was not completely what I needed, but with some small changes, I could easily work with it. Other resources are the documentation pages of Elastic:
- https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html
- https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
- https://www.elastic.co/guide/en/logstash/current/tuning-logstash.html
Changes to the default docker-compose
One of the things with Docker and especially with docker-compose scripts is that every script you open seems to be different. I tried to stick as much as possible to the notations of the original author. One of the things I like is adding a volume to a local folder for the Elasticsearch data folder. This enables you to keep the data after a restart. Therefore I added this to the volumes part of elasticsearch config.
docker-compose.yml
- type: bind
source: ./elasticsearch/data
target: /usr/share/elasticsearch/data
The Logstash part contains more changes, therefore I post it complete. I added the pipelines.yml, the pipelines folder and the data folder for the persistent queue data. I also added the port for http input connector and made the reloading of config work by adding the command part.
docker-compose.yml
logstash:
build:
context: logstash/
args:
ELK_VERSION: $ELK_VERSION
volumes:
- type: bind
source: ./logstash/config/logstash.yml
target: /usr/share/logstash/config/logstash.yml
read_only: true
- type: bind
source: ./logstash/config/pipelines.yml
target: /usr/share/logstash/config/pipelines.yml
read_only: true
- type: bind
source: ./logstash/pipeline
target: /usr/share/logstash/pipeline
read_only: true
- type: bind
source: ./logstash/data
target: /var/lib/logstash/data
ports:
- "9101:5000/tcp"
- "9101:5000/udp"
- "8003:8003"
- "9600:9600"
command: --config.reload.automatic
environment:
LS_JAVA_OPTS: "-Xmx1g -Xms1g"
networks:
- elk
depends_on:
- elasticsearch
Using Persistent queues
An interesting option when the number of records coming in fluctuates a lot is using a persistent queue between your input and the filter part. Especially useful when dealing with a UDP input that just tosses the overflow messages, having a queue that writes messages to a file that cannot be handled immediately. Configuring the queue type is as easy as adding the queue.type and path.queue options through the logstash.yml file. In our case, we configure the queue.type per pipeline. But the path is configured in the logstash.yml file to be: “/var/lib/logstash/data/queue”. Yes the path as configured in the docker-compose.yml file. In the next part, we are going to configure the pipelines.
Configuring the pipelines
Before diving into the code, there are a few things to take note of. First, be sure to configure the pipelines.yml file in your docker-compose. If you forget this. all pipeline files in the pipeline folder are automatically loaded and everything becomes a mess. In the pipelines.yml file we configure the different pipelines. The main pipeline contains the input through UDP and http. The first pipeline splits the name by a space character. If the last name is Coenradie, the message is sent to the coenradie pipeline. In all other cases the message is sent to the other pipeline. Both these pipelines set the field index_group. Which is used in the final pipeline to be sent to elasticsearch. The name of the index is taken from the field index_group. The code blocks below show the different pipelines.
pipelines.yml
- pipeline.id: main_pipeline
path.config: "/usr/share/logstash/pipeline/main.pipeline"
queue.type: persisted
- pipeline.id: elasticsearch_pipeline
path.config: "/usr/share/logstash/pipeline/elasticoutput.pipeline"
- pipeline.id: coenradie_pipeline
path.config: "/usr/share/logstash/pipeline/coenradie.pipeline"
- pipeline.id: other_pipeline
path.config: "/usr/share/logstash/pipeline/other.pipeline"
main.pipeline
input {
http {
id => "receiveMessagesHttp"
port => 8003
codec => "json"
}
tcp {
id => "receiveMessagesTcp"
port => 5000
codec => "json"
}
}
filter {
mutate {
lowercase => [ "name" ]
}
grok {
id => "splitName"
match => { "name" => "%{WORD:firstname} %{WORD:lastname}" }
}
}
output {
if [lastname] == "coenradie" {
pipeline {
id => "sendToCoenradie"
send_to => coenradie_pipeline
}
} else {
pipeline {
id => "sendToOther"
send_to => other_pipeline
}
}
}
coenradie.pipeline
input {
pipeline {
id => "readCoenradie"
address => coenradie_pipeline
}
}
filter {
mutate {
id => "addIndexGroupCoenradie"
add_field => {"index_group" => "coenradiegroup"}
}
}
output {
pipeline {
id => "fromCoenradieToElastic"
send_to => elasticsearch_pipeline
}
}
other.pipeline
input {
pipeline {
id => "readOther"
address => other_pipeline
}
}
filter {
mutate {
id => "addIndexGroupOther"
add_field => { "index_group" => "othergroup" }
}
}
output {
pipeline {
id => "fromOtherToElastic"
send_to => elasticsearch_pipeline
}
}
elasticoutput.pipeline
input {
pipeline {
address => elasticsearch_pipeline
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "%{index_group}-%{+YYYY.MM.dd}"
user => "elastic"
password => "changeme"
}
}
That’s it, now you can run the sample. Send a few messages and go to the special monitoring board in Kibana to see the messages coming in. Open de url http://localhost:5601, login with elastic – changeme. Open de monitoring app (almost at the bottom). Of course you can also use the Kibana console to inspect the indexes. If you are not sure how to send messages using curl or nc on the mac. Be sure to check the last code block. The code: https://github.com/jettro/docker-elk-blog
$ echo "{\"name\":\"Jettro Coenradie\"}" | nc 127.0.0.1 9101
$ curl -XPOST "http://localhost:8003" -H 'Content-Type: application/json' -d'{ "name": "Byron Voorbach"}'
Want to know more about what we do?
We are your dedicated partner. Reach out to us.