Skip to content

ThatGuyHughesy/kastr

Repository files navigation

kastr

Clojure wrapper for Kafka Streams

Build Status Clojars Project

Installation

Declare kastr in your project.clj:

Clojars Project

Use kastr in your clojure code:

(require '[kastr.core :as kastr])

Configuration

Required

:application-id - Name of Kafka Streams application

:bootstrap-servers - Comma separated list of Kafka brokers

:zookeeper-connect - Comma separated list of Zookeeper servers

Optional

:num-stream-threads (Default: 1) - Number of threads the stream instance will run

:key-serde-class (Default: String) - Default key serializer/deserializer

:value-serde-class (Default: String) - Default value serializer/deserializer

:auto-offset-reset (Default: :earliest) - Kafka topic offset the stream instance will read from

Example:

{:kafka-streams {:application-id "threat-detection-1"
                 :bootstrap-servers "localhost:9092"
                 :zookeeper-connect "localhost:2181"
                 :num-stream-threads 8}
 :job {:input-topic "input-messages"
       :output-topic "output-messages"}}

Running

I would recommend using Component for managing the Kafka Streams runtime.

Example:

(defrecord KafkaStreams
  [configuration kafka-streams-topology kafka-streams]
  component/Lifecycle
  (start [component]
    (info "Starting Kafka Streams")
    (let [configuration (kafka-streams-configuration configuration)
          kafka-streams (init configuration kafka-streams-topology)]
      (clean kafka-streams)
      (start kafka-streams)
      (assoc component :kafka-streams kafka-streams)))
  (stop [component]
    (try
      (finally
        (info "Shutting down Kafka Streams")
        (if kafka-streams
          (stop kafka-streams))))
    (assoc component :kafka-streams nil)))

The kafka-streams-topology is where you carry out your logic.

Basic Example:

Take messages from the input topic and send them to the output topic.

(defn mapv-function
  [v]
  (+ 1 v))

(defn basic-kafka-streams-topology
  [configuration stream-builder]
  (let [{:keys [job]} configuration
        {:keys [input-topic output-topic]} job
        message-stream (stream stream-builder [input-topic])]
    (-> (map-v message-stream mapv-function)
        (ks/to output-topic))))

Documentation

I'm currently working on this so for now here is the official Developer Guide.

Development

Testing

Run tests

$ lein test

Contributing

Want to become a Kastr contributor?
Then checkout our code of conduct and contributing guidelines.

Copyright & License

Copyright (c) 2017 Conor Hughes - Released under the MIT license.

Releases

No releases published

Packages

No packages published