Skip to content

Latest commit

 

History

History

anomaly-detection

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

Event pattern detection with Apache Flink and Pravega

Sample application which simulates network anomaly intrusion and detection using Apache Flink and Pravega.

This application is based on streaming-state-machine which is slightly extended to demonstrate Pravega/Flink integration capabilities.

Events in streams (generated by devices and services, such as firewalls, routers, authentication services etc.,) are expected to occur in certain patterns. Any deviation from these patterns indicates an anomaly (attempted intrusion) that the streaming system should recognize and that should trigger an alert.

The event patterns are tracked per interacting party (here simplified per source IP address) and are validated by a state machine. The state machine's states define what possible events may occur next, and what new states these events will result in.

The final aggregated results are grouped under network id which acts as a network domain abstraction hosting multiple server machines.

The aggregated results are (optionally) sinked to Elastic Search for visualizing from the Kibana user interface.

The following diagram depicts the state machine used in this example.

           +--<a>--> W --<b>--> Y --<e>---+
           |                    ^         |     +-----<g>---> TERM
   INITIAL-+                    |         |     |
           |                    |         +--> (Z)---<f>-+
           +--<c>--> X --<b>----+         |     ^        |
                     |                    |     |        |
                     +--------<d>---------+     +--------+

Pre-requisites

  1. Pravega running (see here for instructions)
  2. Build pravega-samples repository
  3. Apache Flink 1.12 running
  4. ELK running (optional)

Execution

The example program is copied to a distribution folder when built as shown above. Navigate to scenarios/anomaly-detection/build/install/pravega-flink-scenario-anomaly-detection/ for the below steps.

The example is split into three separate programs:

  1. A utility program to create a Pravega stream for use by the example code.
  2. An event generator producing simulated network events.
  3. An anomaly detector consuming events.

All programs share a configuration file (conf/app.json) and a startup script (bin/anomaly-detection). Usage:

bin/anomaly-detection --configDir <APP_CONFIG_DIR> --mode <RUN_MODE> [--stream <SCOPE/NAME>] [--controller <CONTROLLER_URI>]

where, 

- `APP_CONFIG_DIR` is the directory that contains the `app.json` configuration file
- `RUN_MODE` can take any of the 3 options (1, 2 or 3):
  - 1 = Create the configured Pravega stream and scope
  - 2 = Publish events to Pravega 
  - 3 = Run Anomaly detection
- `SCOPE/NAME` Optionally you can change the name of the pravega scope and stream name. Defaults to 'examples/NetworkPacket'
- `CONTROLLER_URI` is the URI to the pravega controller. Defaults to 'tcp://127.0.0.1:9090'

Update the Configuration

Update conf/app.json as appropriate (see "Application Configuration" section).

Create the Event Stream

Run the following command to create the Pravega-based event stream:

$ bin/anomaly-detection --configDir conf/ --mode 1

Run the Event Generator

The event generator is a Flink program to be run in either the local Flink environment or on a remote Flink cluster.

  • To run in the local environment:
$ bin/anomaly-detection --configDir conf/ --mode 2
  • To run on a cluster (given a pre-configured Flink client):
$ flink run -c io.pravega.anomalydetection.ApplicationMain lib/pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar --configDir conf/ --mode 2

Leave the program running for a while, to generate a few events per second for a number of simulated network clients. Type CTRL-C to exit the generator (local environment only).

The generator is able to produce both valid and invalid state transitions. Invalid transitions occur in one of two ways, depending on the configuration:

  1. Automatically, with some probability (as configured by errorProbFactor)
  2. Manually, by pressing ENTER on the console (if controlledEnv is true)

Hint: To run the application on Flink, set the property controlledEnv: false in app.json configuration file (as there is no user input). Moreover, the input parameter --configDir should contain the absolute path of the configuration directory.

Run the Anomaly Detector

The anomaly detector is another Flink program. The program groups events by source IP address, and maintains a state machine for each address as described above.

Alerts are aggregated using a tumbling window. Note that the Flink program uses event time to aggregate alerts.

  • To run in the local environment:
$ bin/anomaly-detection --configDir conf/ --mode 3
  • To run on a cluster:
$ flink run -c io.pravega.anomalydetection.ApplicationMain lib/pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar --configDir conf/ --mode 3

Ensure that $FLINK_HOME/bin is on your path to use the flink command shown above.

When an anomalous event sequence is detected (due to an invalid event generated by whatever means), an alert is generated to STDOUT and to Elasticsearch (if configured).

Testing State Recovery

The state machine associated with each source IP address is reliably stored in Flink's checkpoint state. In a failure scenario, Flink will automatically recover the job state, ensuring exactly-once processing of input events and their corresponding state transitions. Assuming that the event stream contains only valid events, no spurious alerts should be produced following the recovery.

Perform the below steps to test job state recovery. This works only when checkpoint is enabled and a reliable state backend is configured to store checkpoint/savepoint snapshot (flink-conf.yaml).

  1. Cancel the current running job using -s option which will create a savepoint in the configured savepoint location (as defined in flink-conf.yaml):
bin/flink cancel -s <JOB_ID>
  1. To resume from a savepoint, run the below command:
bin/flink run -s <SAVEPOINT_LOCATION> -c io.pravega.anomalydetection.ApplicationMain <PATH_TO_pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar> --configDir <APP_CONFIG_DIR> --mode 3

The job should nicely recover from the last checkpointed state. As mentioned, you should not see any spurious alerts.

The alerts may be visualized using Kibana or by examining the TaskManager's 'stdout' (as shown in the Flink Web UI).

Application Configuration

{
  "name": "anomaly-detection",
  "producer": {
    "latencyInMilliSec": 2000,
    "capacity": 100,
    "errorProbFactor": 0.3,
    "controlledEnv": false
  },
  "pipeline": {
    "parallelism": 1,
    "checkpointIntervalInMilliSec": 1000,
    "disableCheckpoint": false,
    "watermarkOffsetInSec": 0,
    "windowIntervalInSeconds": 30,
    "elasticSearch": {
      "sinkResults": false,
      "host": "127.0.0.1",
      "port": 9300,
      "cluster": "elasticsearch",
      "index": "anomaly-index",
      "type":"anomalies"
    }
  }
}

latencyInMilliSec - how frequently events needs to be generated and published to Pravega

capacity - initial capacity till which the error records will not be generated

errorProbFactor - how frequently error records needs to be simulated. Provide a value between 0.0 and 1.0

controlledEnv - When this is true, the errorProbFactor value will be ignored and the error record will be generated only when user presses ENTER on the console

parallelism - This value will be used to define pravega segment count/fixed scaling policy and Flink job parallelism

controllerUri - Pravega controller endpoint

elasticSearch - Final results will be sinked to elasticsearch if sinkResults is set to true

windowIntervalInSeconds - Window frequency interval

watermarkOffsetInSec - Window watermark offset interval

disableCheckpoint - If checkpoint is enabled, make sure Flink cluster uses appropriate state backend (checkpoint/savepoint) configurations

Steps to setup Elastic Search/Kibana

The steps are tested on Ubuntu 16.x OS/docker 1.13.1/JDK8/ElasticSearch5/Kibana5.3.0

Install Elastic Search and Kibana

Install Elastic Search
sudo mkdir -p /elk-stack/esdata
sudo chmod 777 -R /elk-stack/
docker run -d --name elasticsearch  --net="host" -v /elk-stack/esdata:/usr/share/elasticsearch/data elasticsearch

Verify if ES is running by executing the command: 
curl -X GET http://localhost:9200

update /etc/hosts with 127.0.0.1 elasticsearch (this is for Kibana to lookup ES)

Install Kibana
docker run -d --name kibana --net="host" kibana

Verify by going to the URL: http://localhost:5601

Create Elastic Search Index and define schema

curl -XDELETE "http://localhost:9200/anomaly-index"

curl -XPUT "http://localhost:9200/anomaly-index"

curl -XPUT "http://localhost:9200/anomaly-index/_mapping/anomalies" -d'
{
 "anomalies" : {
   "properties" : {
     "count": {"type": "integer"},
     "location": {"type": "geo_point"},
     "minTimestampAsString": {"type": "date"},
     "maxTimestampAsString": {"type": "date"}
   }
 }
}'

Visualize results in Kibana

- Create index pattern for "anomaly-index" and set it as default index
- Visualize the metrics using "Tile Map" option and choose "sum" aggregation on the field "count"
- Select Geo-Coordinates bucket for the field  "location"
- You can now visualize the total anomalies per geo location for the simulated time window period

Final results filtered based on lat/long coordinates from Kibana Kibana Screenshot