Filesystem events are useful to monitor. They can indicate a security breach. They can also help understanding how a complex system works by looking at the files it reads and writes.
When monitoring events, you can expect a lot of data to be generated quickly. The events might be interesting to process for different systems and at a different pace. Also it would be nice if you could replay events from the start or a specific moment. Enter Kafka. In order to put the filesystem events in Kafka (from an output file), the Kafka Connect FileSourceConnector is used. In order to get the data from Kafka to Elasticsearch, the Kafka Connect ElasticsearchSinkConnector is used. Both connectors can be used without Enterprise license.
Filesystem events
In order to obtain filesystem events, I’ve used inotify-tools. On Ubuntu like systems they can be installed with
1 | sudo apt-get install inotify-tools |
inotify-tools contains two CLI utilities. inotifywait which can be used to output events to a file in a specific format. inotify-wait can generate statistics. Since we’ll do our analyses in Kibana, we want individual events from inotifywait. Using the following command, the /home/developer directory is watched. Events are put in a JSON format in a file called /tmp/inotify.txt
1 | inotifywait -r -m /home/developer -o /tmp/inotify.txt --timefmt "%FT%T%z" --format '{"time": "%T","watched": "%w","file":"%f","events":"%e"}' |
Events can look like;
1 2 3 4 | {"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"OPEN"} {"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"ATTRIB"} {"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"CLOSE_WRITE,CLOSE"} {"time": "2019-02-26T13:52:17+0000","watched": "/home/developer/","file":"t","events":"DELETE"} |
In the above example I touched /home/developer/t and removed it. When watching your home folder, it is interesting to see what is happening there! I could not add a more specific moment of the event since the format is printf-like and uses strftime for formatting of date/time. strftime does not support anything more fine grained than seconds. The order of events however is correct. I could not add more specific information myself since the allowed replacement variables were limited to a specific set. Want to know more? man inotifywait
Filesystem events to Kafka
To install the Confluent platform I did:
1 2 3 4 | wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add - add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main" apt-get update apt-get -y install confluent-platform-2.11 |
Starting Kafka and related can be done with
1 | confluent start |
Using the Kafka Connect FileStreamSource connector (available without Enterprise license), it is relatively easy to monitor the file which is written by notifywait. Kafka Connect can run in distributed mode and in standalone mode. Since it needs to save information on what it has already processed, storage is required. In standalone mode, this can be a file. In distributed mode these are Kafka topics. I choose to go with the standalone mode since removing the file (to do the loading of events again) is quite easy. Drawback of using standalone mode is that the connector cannot be monitored by the Kafka Control Center. Benefit of running distributed is also that it could easily be run in containers since the connector itself is stateless; state is in Kafka.
I used the following filesource.properties:
1 2 3 4 5 | name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/tmp/inotify.txt topic=connect-test |
And the following worker.properties
1 2 3 4 5 6 7 8 9 10 11 12 | offset.storage.file.filename=/tmp/example.offsets bootstrap.servers=localhost:9092 offset.flush.interval.ms=10000 rest.port=10082 rest.host.name=localhost rest.advertised.port=10082 rest.advertised.host.name=localhost internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.storage.StringConverter key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter plugin.path=/usr/share/java |
There is also a JsonConverter available, but that wrapped my JSON into a string (escapes characters) and adds schema information (unless disabled). Both I did not want. I did not find a way to disable the behavior that my Json message became an escaped string. This caused issues on the Elasticsearch side. Kafka Connect should in my use-case be a ‘dumb pipe’ and the StringConverter does a nice job at that!
To start this connector, I can do:
1 | /usr/bin/connect-standalone worker.properties filesource.properties |
Elasticsearch and Kibana
In order to run Elasticsearch, you need to increase vm.max_map_count (see here)
1 | sudo sysctl -w vm.max_map_count=262144 |
For running Elasticsearch and Kibana, I’ve used the following docker-compose.yml file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | version: '3.3' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:6.6.1 container_name: elasticsearch environment: - node.name=es01 - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: nproc: 65535 memlock: soft: -1 hard: -1 cap_add: - ALL privileged: true ports: - 9200:9200 - 9300:9300 kibana: image: docker.elastic.co/kibana/kibana-oss:6.6.1 container_name: kibana environment: SERVER_NAME: localhost ELASTICSEARCH_URL: http://elasticsearch:9200/ ports: - 5601:5601 ulimits: nproc: 65535 memlock: soft: -1 hard: -1 cap_add: - ALL |
Starting it is easy with
1 | docker-compose up |
Next you can access Kibana at http://localhost:5601
Getting data from Kafka to Elasticsearch
In order to get the data from the connect-test topic to Elasticsearch, we can again use a standalone Kafka Connect connector. The ElasticsearchSinkConnector which is also available without Enterprise license. You can configure this as follows:
elasticsearch.properties
1 2 3 4 5 6 7 8 | name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=connect-test key.ignore=true schema.ignore=true connection.url=http://localhost:9200 type.name=kafka-connect |
worker2.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | offset.storage.file.filename=/tmp/example2.offsets bootstrap.servers=localhost:9092 offset.flush.interval.ms=10000 rest.port=10083 rest.host.name=localhost rest.advertised.port=10083 rest.advertised.host.name=localhost internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false key.converter.schemas.enable=false plugin.path=/usr/share/java |
You can start the connector with:
1 | /usr/bin/connect-standalone worker2.properties elasticsearchsink.properties |
Some differences with the FileSourceConnector worker.properties are:
- a different port for the REST API is used
- a different offset.storage.file is used
- the key and value converters are Json instead of string
View the results in Kibana
Next you can view the results in Kibana
Also notice that in order for the data from Elasticsearch to be visible in Kibana, not only data needs to be available but also an index needs to be there. Since a JSON document is offered, a default index is created.
Hey!
I enjoyed reading this and getting some new ideas of blogs to consider! I am very new to the world of blogging so I appreciate you highlighting some blogs that have stood out to you. It is helpful to have one blog like yours to lead viewers to other blogs from other bloggers!
Thank you.