Finding your blog abusers using Kibana 4 and logstash 1.5

-

A lot of us have a blog, just one like the one you are reading right now. I believe around 60% of the web is running wordpress. Nice to know if you are a hacker trying to break in to websites. Not so good if you are amongst those 60%. There are a lot of plugins available to help you secure your wordpress installation. Still it makes you curious which people are trying to get access to your blog.

In this blog post I am using logstash to parse your access logs. Than I use Kibana 4 to answer the following questions:

  • Which urls are called that smell like abuse?
  • Who is abusing my blog?

Beware, this post is meant to show some capabilities of Logstash and Kibana. This is not the most rigorous post to help you secure your wordpress blog.

The tools

If you are new to the ELK stack (Elasticsearch, Logstash, Kibana) and you are just curious about what you can do with it, you can skip the installation and configuration sections. If you want to follow a long with what I have done, than this is the section you need. I am not going into a lot of details, I’ll point to the provided documentation where appropriate.

Elasticsearch

Installing elasticsearch is not hard, just download the tar or zip and unpack it. There are some settings you might want to change. The most important one being the cluster name, discovery strategy and maybe some paths to config and data. I also want to experiment with the new security plugin provided by elasticsearch called shield, I need to install that plugin as well. More information about shield can be found here. I already wrote a blog post about configuring shield, since I am reusing that cluster you can read more about the steps I took to install shield and configure the users in that post: Elasticsearch shield first steps using java. For this demo I used elasticsearch 1.4.4.

Logstash

I wanted to use the latest versions of all tools. Therefore I am using logstash 1.5 beta 1. Installation is as easy as elasticsearch. Download the tar file unpack it and you are ready to go. More information about the beta and where to obtain it can be found at the elasticsearch blog.

Kibana

The newest Kibana version 4 got released last week. This is an impressive step forward if you compare it to the older Kibana 3. You do not install it as a plugin or on a web server. It now comes as a node.js server. Therefore you have to download the right package for your operating system. The packages can be found and the Kibana download page.

Since we are using elasticsearch with shield enabled, we need to configure Kibana with a user and password that has the rights to connect to elasticsearch to obtain information about status of the cluster and available indices. To do this open the kibana.yml file in the config folder. Look for the properties kibana_elasticsearch_username and kibana_elasticsearch_password. Choose a user with appropriate rights. In my case I just took an admin, that always works but if of course not the right thing to do in production. The shield documentation has a thorough section on the Kibana configuration.

Import data using logstash

We have the access logs from our apache httpd based wordpress installation. Each line of those files looks simular to this one.

75.82.171.165 - - [22/Feb/2015:03:18:02 +0100] "GET /2010/09/08/my-first-steps-with-gradle-creating-a-multi-module-java-web-project-and-running-it-with-jetty/ HTTP/1.1" 200 58873 "https://www.google.com/" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.43 Safari/537.36"

Input

Logstash configuration consists of three parts: input, filter and output. For this sample we are importing files we copied from the live environment. Therefore we import the files from the beginning. All files begin with access-log and the date stamp is appended. The following code block show how to import all these files

input {
    file {
        path => "/access_gridshore/access-log-*"
        type => "apache"
        start_position => "beginning"
    }
}

Output

The next logical step would be the filter, but I want to discuss sending the data to elasticsearch in the output first. The following code block shows the output configuration of logstash

output {
    stdout {
        codec => rubydebug
    }
    if "_grokparsefailure" not in [tags] {
        elasticsearch {
            protocol => "transport"
            host => "localhost:9300"
            cluster => "jc-play"
            index => "gridshore-logs-%{+YYYY.MM.dd}"
            manage_template => false
            user => "myuser"
            password => "mypassword"
        }
    }
}

Notice that we have two output elements. We show all messages also in the console. If the grok filter did its job and there are no errors than we send the message to elasticsearch. We do not want logstash to manage the template, we provide our own since we have some specific requirements. By default logstash provides a mapping to store all string based fields analyzed as well as not_analyzed. I do not need that. Therefore I have created a template to only store the string based fields as not_analyzed.

Notice that I have provided the user and password to tell logstash to use the right credentials when connecting to elasticsearch

Filter

Creating the filters is the important part to improve the data quality that we insert into elasticsearch. Below a list with the fiters we used and why, followed by the actual configuration.

  • grok message, a standard grok pattern to parse a message coming from apache log.
  • grok request, strip the parameters from the request field
  • geoip, add a lot of fields describing where the client is coming from based on his IP address
  • useragent, extract a number of fields from the standard useragent field as obtain through the first grok
  • date, take the timestamp from a field and store it in the message timestamp field to be used by Kibana

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
        remove_field => ["message"]
    }
    grok {
        match => { "request" => "%{URIPATH:request_noparam}"}
    }
    geoip {
        source => "clientip"
    }
    useragent {
        source => "agent"
        target => "useragent"
        remove_field => ["agent"]
    }
    date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
}

Index templates are used to provide mappings for new indexes. At the moment we create a new index every day. With the amount of logs in my blog this could be a monthly index as well. The script to add our index template is at the bottom of the post. We have chosen to override the default behaviour for strings.

Now we can start logstash and tell it where to obtain the configuration files we have created.

bin/logstash -f ../configs/check.conf

The result in a number of indexes, one for each day we have logs for.

Anylyze the data using Kibana

Time to start Kibana, after changing the configuration like mentioned before, we can just call bin/kinana, that is enough. In the welcome screen we need to create an index profile. We enter gridshore-logs-* and enter the @timestamp field to indicate we are using a time based index profile.

Time to start answering our question.

Which urls are called that smell like abuse?

In Kibana we can use a term aggregation on the request field. Below you can find the image showing the top urls.

The clear winner in the picture is the url wp-login.php. Since I am almost the single user of this blog, that seems a bit odd. We can have a look at the response codes that are returned for this url. To do that we create a pie chart containing the http response codes. Now we come to the best part, we add both items to the same dashboard. That way we can easily create filters that work on both visuals. The next screen shows that dashboard.

By clicking on the /wp-login.php url, we add a filter. This filter is very usefull, you can very easily change the filter from an positive filter (needs to have the term) to a negative filter (must not have the term). The following two images show the two stages of the filter.

kibana4-filter-inc
kibana4-filter-ex

All nice, but why is half of it response 200 and the other half 503? I have a plugin running on my wordpress site called wordfence. This plugin traces users that try to login to often and blocks them. The way they are blocked is that they get back a 503.

This is the moment the next question becomes interesting

Who is abusing my blog?

I have created another dashboard to find out more about our users. I have used the geo coordinates that were obtained using the client ip to find all users on the map. The coordinates are added by the geoid logstash filter. The first image shows you the map with all request for the current time period. Next I click on the wp-login.php url and select the top user. Watch and see what happens on the map in the second image.

So somewhere in the vicinity of Brussels there is a rascal trying to login to my site.

Concluding

Of course this is just the tip of the iceberg, but it is nice to see what you can do with data in elasticsearch using Kibana. Without a lot of effort we can check what people are doing on your site. Using appropriate filters you can dive deeper. If you filter out the login url you can start looking at other urls that might be of interest to you. Use the comments below if you would like to see something else.

References

The script to create an index template in elasticsearch for our parsed logs

#!/bin/bash
curl -XPUT --user jettro:nopiforme 'http://localhost:9200/_template/gridshore-logs-custom' -d '{
    "template": "gridshore-logs-*",
    "order": 0,
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "_default_": {
            "dynamic_templates": [
                {
                    "message_field": {
                        "mapping": {
                            "index": "analyzed",
                            "omit_norms": true,
                            "type": "string"
                        },
                        "match_mapping_type": "string",
                        "match": "message"
                    }
                },
                {
                    "string_fields": {
                        "mapping": {
                            "ignore_above": 256,
                            "index": "not_analyzed",
                            "type": "string"
                        },
                        "match_mapping_type": "string",
                        "match": "*"
                    }
                }
            ],
            "_all": {
               "enabled": true
            },
            "properties": {
                "geoip": {
                    "path": "full",
                    "dynamic": true,
                    "type": "object",
                    "properties": {
                        "location": {
                            "type": "geo_point"
                        }
                    }
                },
                "response": {
                    "type":"integer"
                },
                "bytes": {
                    "type":"long"
                },
                "@version": {
                    "index": "not_analyzed",
                    "type": "string"
                }
            }
        }
    }
}'