Kafka and Apama

Aims of this blog

My goal with this blog post is to show how to set up Apama with Kafka and how they can be used together in a stream processing application. Kafka handles the transport and allows you to set up the delivery system, while Apama is a high performance event-processing engine with enough flexibility and throughput to provide for most applications. This blog post isn’t going to be a tutorial on Kafka, and I have used a containerized version of a Kafka cluster for simplicity. If you have an existing cluster, or you wish to install Kafka yourself, then it is a simple matter of changing the configuration described below in the body of the blog.

The demo will connect to Twitter and use a stream of tweets to publish messages in Kafka on the tweets channel. Apama will subscribe to this channel, and then process each message producing a dictionary of word frequencies. This will then be published to the frequencies channel in Kafka on a periodic basis. There is a GitHub repository containing the basic script to stream tweets to Kafka, the Apama project directory, and a monitor file that will connect to the Kafka cluster then process the messages and output messages back to Kafka.

What is Kafka?

Apache Kafka describes itself as a streaming platform, allowing the user to publish or subscribe to messages on defined topics. Kafka provides potentially fault-tolerant and durable message queues, and is used primarily for real-time data streams as an intermediary between the stream and the application. Each message on the queue has an identifier, timestamp and the message payload. Kafka is used by many large organizations including Cisco, Oracle, Twitter, Spotify, Paypal and Netflix.

Elements of the demo

This demo uses Python to connect to Twitter and to stream messages into the tweets channel in the Kafka cluster it connects to. You can get the sources for this demo from this GitHub repository.

The Python script at the top level of the repository stream.py creates the connections to Kafka and Twitter. The following command line will install the Python libraries that the script depends on.

You will also need to create an application in Twitter to obtain the keys required to connect to their API. When creating the application, Twitter may require you to add values for “Website URL” and “Callback URLs”, but the addresses you add are not important. Edit the script stream.py which is in the kafka directory, filling in the keys you got when you created your Twitter application (go to the “Keys and tokens” tab, and use the Generate buttons as required). Once that has been done, the script then connects and streams the tweets into Kafka.

Apama has a connectivity layer which includes a Kafka plug-in. Using the Kafka plug-in coupled with the flexible system of configurable codecs means that as a user you can concentrate on actually coding your domain-specific application rather than writing interfaces and handling connections. You can use the Apama project included in the GitHub repository if you wish, and skip the configuration section below.

Creating the Apama project

You can just use the Apama project, “KafkaProcessor” you got in the kafka direcory when you cloned the github repository. Alternatively, delete the KafkaProcessor directory and create it as follows. If you are going to do this, it will save you a moment or two if you first copy KafkaProcessor/monitors/kafka.mon somewhere else so that you can copy it back in later rather than downloading it again.

If you want to create the project from scratch you can use Software AG Designer, but if you are primarily a Linux user, or like command line tools more than GUI applications, you can use the apama_project tool (new in 10.3.1) instead. There is a blog page about it here: Introducing new command line tool for creating Apama projects – ‘apama_project’
Whichever you start off with, you will be able to chop and change between them to modify the project: they are completely compatible.

I chose a root directory of kafka to contain all the elements of the demo, and created the Apama project below using the following commands.

This will drop out if the KafkaProcessor folder already exists, in which case you may not have got around to deleting the one that came with the clone first.
Delete it and try again, optionally copying monitors/kafka.mon from it first.

added the kafka bundle and the Automatic onApplicationInitialized bundle using the apama_project tool as shown above, although I deliberately removed some output from above for the sake of brevity. When you run the command it will show the many ways you can use Apama for connecting to external sources of information, storage options, and message format codecs. Finally, copy the kafka.mon file from the kafka/KafkaProcessor/monitors directory in GitHub (or the copy you may have made of it) into the  kafka/KafkaProcessor/monitors subdirectory in the filesystem (i.e. the created project).

The snippet below shows how subscription and publishing of messages between Apama and Kafka can be achieved by using channel names.

The output of the tool and copying the kafka.mon file into it, should produce a directory structure similar to the one below and the one contained in the GitHub repository.

Configuration

The Kafka.properties file under config/connectivity/Kafka can be edited to contain the correct parameters for your Kafka cluster if required.

Lastly, once the configuration is correct and the EPL monitor has been copied or created we need to deploy the application into a form that can then run in Apama.

Next run the Kafka server ready to receive the messages from Twitter produced by the stream.py script from the GitHub repository.  If you are using your own cluster you can skip this step of course. The advertised host should be the host machine you are running the Kafka cluster/container on.

Now we can run Apama. If you append & to the command and press return after the header is output, you can issue the next command in the same shell.

Lastly we can run the stream.py script in a terminal to connect to Twitter and it should start putting tweets into the “tweets” channel. Again, append & to the command to keep using the same shell.

At this point we are processing the stream of tweets and Apama is posting summary messages back to Kafka. You can observe the summary messages using a command-line tool present in the Kafka installation on the running image. Because we are running in the image we can use localhost, if you are using your own installation or existing cluster you will need to run the script using the correct host and port.

This will show the messages as they arrive – for example if we use the term “Lady Gaga” as our keyword then (at the current point in time) we see messages arriving like these:

The output is a JSON message containing the words that occurred with a calculated frequency of > 5%. Rounding is used so some words are shown with 5% exactly. I have pasted several messages to show how the frequency changes as the number of received messages arrives. We could for example use this output to create a dynamic word cloud that changes over time as the words change. Clearly much could be done to normalize the incoming data to make this analysis much more useful.

Finally…

I am sure that most of you can think of much better things to do with the data than analyse the most frequent words appearing that are in tweets containing the phrase “Lady Gaga” . The beauty of the EPL is that it is very flexible and powerful allowing for some very complex processing. Also when you are happy with what the application is doing you can activate LLVM mode which will improve the performance. Check out this white paper for some details on LLVM and how it will accelerate your application.

–John