Spring Boot + Apache Kafka

This post will guide you to create a simple web application using Spring Boot and Apache Kafka. 

Apache Kafka is a distributed streaming platform which is well known for moving data between systems in a distributed way.

Spring boot provides out of the box integration with Apache Kafka so that you don’t have to write a lot of boiler plate code to integrate Kafka in your web application. 

Let’s get started!

Prerequisites

The only prerequisite is to have Kafka up and running in your environment. And to do that follow the links based on what operating system you’re using:

  1. Install Kafka on windows
  2. Install Kafka on macOS

I generally prefer running all my system dependencies using docker which is an awesome tool for developers. Docker enables developers to focus on writing code and not worry about the system it will run on. If you want to run Kafka using docker, you can use the docker-compose.yaml file that I have added in the github repo. So before running the application just copy the file and perform a docker-compose up -d which will spin up a single Kafka broker, Zookeeper and will also create the required topic. This means you don’t need to perform step 6.

Steps to create Spring Boot + Apache Kafka web application:

Follow the below steps to create a Spring Boot application with which you can produce and consume messages from Kafka using a Rest client.

1) Creating the Web Application Template:

We’ll be using Spring Initializr to create the web application project structure and the easiest way to use Spring Initializr is to use its web interface.

1.1) Go to https://start.spring.io/:

1.2) Enter Group and Artifact details:

1.3) Add Spring Web & Spring for Apache Kafka dependencies:

Then, click on Generate Project

This will generate and download the kafka-spring-app.zip file which is your maven project structure.

1.4) Unzip the file and then import it in your favourite IDE.

After importing the project in your IDE (Intellij in my case), you’ll see a project structure like this:

2) Configure Kafka Producer and Consumer:

We can configure the Kafka producer and consumer either by creating configuration classes (annotating classes with @Configuration annotation) for both producer and consumer or by using application.properties/application.yml file to configure them. In this tutorial, I’ll be demonstrating both for integrity. I personally prefer the latter approach (application.properties/application.yml) as it is quite handy and doesn’t require all the boilerplate code of the configuration class and that’s the beauty of spring boot one should leverage. 

2.1) Creating configuration classes for Kafka consumer and producer:

Creating the consumer config class:

Create a class ConsumerConfig.java in package com.technocratsid.kafkaspringapp.config with the following content:

package com.technocratsid.kafkaspringapp.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServers;

    @Value("${spring.kafka.groupId}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> getConsumer() {
        Map<String, Object> configProp = new HashMap<>();
        configProp.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        configProp.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProp.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProp.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProp);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(getConsumer());
        return factory;
    }

}

Note: Here we are declaring ConsumerFactory and ConcurrentKafkaListenerContainerFactory as beans (using @Bean annotation). This tells the spring container to manage them for us. ConsumerFactory is used to specify the strategy to create a Consumer instance(s) and ConcurrentKafkaListenerContainerFactory is used to create and configure containers for @KafkaListener annotated methods.

@Configuration is used to tell Spring that this is a Java-based configuration file and contains the bean definitions.

@EnableKafka annotation tells Spring that we want to talk to Kafka and allows Spring to detect the methods that are annotated with @KafkaListener.

@Value annotation is used to inject value from a properties file based on the property name.

The application.properties file for properties spring.kafka.bootstrap-servers and spring.kafka.groupId is inside src/main/resources and contains the following key value pairs:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.groupId=kafka-spring-app

If you wish to run the application with a remote Kafka cluster then edit spring.kafka.bootstrap-servers pointing to your remote brokers.

Creating the producer config class:

Create another class ProducerConfig.java in the same package com.technocratsid.kafkaspringapp.config with the following content:

package com.technocratsid.kafkaspringapp.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class ProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServers;

    @Bean
    public ProducerFactory<String, String> getProducer() {
        Map<String, Object> configProp = new HashMap<>();
        configProp.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        configProp.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProp.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProp);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(getProducer());
    }

}

Note: Here we are declaring ProducerFactory and KafkaTemplate as beans where ProducerFactory is used to specify the strategy to create a Producer instance(s) and KafkaTemplate is a template for executing high-level operations like sending messages to a Kafka topic etc.

The value of kafkaServers is injected from the property spring.kafka.bootstrap-servers of the application.properties file same as ConsumerConfig class.

2.2) Configuring the Kafka consumer and producer using application.properties:

Open the application.properties file inside src/main/resources and add the following key value pairs:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-spring-app
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

That’s it!

The above properties will configure the Kafka consumer and producer without having to write a single line of code and that’s the beauty of spring boot.

The key value pair in application.properties (allows to specify configuration) allows spring to do different things. With this file you can tell spring to configure things without writing any code.

3) Creating a consumer service:

Create a class KafkaConsumer.java in package com.technocratsid.kafkaspringapp.service with the following content:

package com.technocratsid.kafkaspringapp.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class KafkaConsumer {

    public static List<String> messages = new ArrayList<>();
    private final static String topic = "technocratsid-kafka-spring";
    private final static String groupId = "kafka-spring-app";

    @KafkaListener(topics = topic, groupId = groupId)
    public void listen(String message) {
        messages.add(message);
    }
}

@KafkaListener allows a method to listen/subscribe to specified topics.

In our case whenever a message is produced on the topic “technocratsid-kafka-spring“, we are adding that message to a List of String (which is adding stuff to memory and it is not a good practice, in real world you might consider writing the messages to some datastore) so that we can display the messages later.

@Service tells Spring that this file performs a business service.

4) Creating a producer service:

Create a class KafkaProducer.java in package com.technocratsid.kafkaspringapp.service with the following content:

package com.technocratsid.kafkaspringapp.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic}")
    private String topic;

    public void produce(String message) {
        kafkaTemplate.send(topic, message);
    }

}

Here we are using the KafkaTemplate send method to send messages to a particular topic.

@Autowired tells Spring to automatically wire or inject the value of variable from the beans which are managed by the the spring container. So in our case the value of kafkaTemplate is injected from the bean kafkaTemplate() defined in ProducerConfig class.

@Service tells Spring that this file performs a business service.

5) Creating a rest controller:

Create a class KafkaController.java in package com.technocratsid.kafkaspringapp.controller with the following content:

package com.technocratsid.kafkaspringapp.controller;

import com.technocratsid.kafkaspringapp.service.KafkaConsumer;
import com.technocratsid.kafkaspringapp.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
public class KafkaController {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @RequestMapping(value="/send", method= RequestMethod.POST)
    public void send(@RequestBody String data) {
        producer.produce(data);
    }

    @RequestMapping(value="/receive", method=RequestMethod.GET)
    public List<String> receive() {
        return consumer.messages;
    }
}

This class registers two endpoints /send and /receive which sends and receive messages to and from Kafka respectively, where /send is a POST request with String body and /receive returns the sent messages.

6) Create the required Kafka topic:

Before running the application create the following topic:

kafka-topics --bootstrap-server localhost:9092 --topic technocratsid-kafka-spring --create --partitions 1 --replication-factor 1

7) Running the web application:

Either run the KafkaSpringAppApplication class as a Java Application from your IDE or use the following command:

mvn spring-boot:run

8) Testing the web app with a REST client:

To test the Spring Boot + Apache Kafka web application I am using Insomnia REST Client which is my favourite Rest client because of its simple interface. You can use any REST client.

Once your application is up and running, perform a POST request to the URL http://localhost:8080/send with the body {“key1”: “value1”}:

Then, perform a GET request to the URL http://localhost:8080/receive and this is the response you’ll get:

Congratulations on building a Spring Boot + Apache Kafka web application.

Hack into the github repo to see the complete code.

Install Kafka on Windows

This post is a step-by-step guide to install and run Apache Kafka on Windows.

Prerequisite

The only prerequisite for this setup is JRE.

Install Java (Skip if you already have it)

  1. Download Java 8 from here. (Java 8 is recommended by Apache Kafka)
  2. Run the installer and follow the instructions on Installation wizard.
  3. Please note/copy the Destination Folder location.
  4. Go to Control Panel -> System -> Advanced system settings -> Environment Variables.
  5. Create a new user variable named JAVA_HOME and paste the path copied from step 3 to the variable value and click OK.
  6. Now edit PATH variable in User variables and add “%JAVA_HOME%\bin” at the end of the variable value. If it’s an older windows version, then add “;%JAVA_HOME%\bin;” at the end of the text. If PATH variable doesn’t exist create it with the value “%JAVA_HOME%\bin”.
  7. Open command prompt and type “java -version” to validate the installation.
  8. If you get the following output in your command prompt you’re good to go:
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

Steps to install Kafka on windows:

1) Download and extract Apache Kafka from here.

Note: At the time of writing this post the current stable version is 2.4.0. If you’re using Scala for development then make sure to select the Kafka version corresponding to your Scala version.

2) Go to the folder where you’ve extracted Kafka, open the file server.properties in the config folder and then edit the line log.dirs=/tmp/kafka-logs to log.dirs=C:\path_where_kafka_is_extracted\kafka-logs.

Note: If you won’t change log.dirs value, you’ll keep getting the following error:

java.util.NoSuchElementException: key not found: /tmp/kafka-logs

3) Start Zookeeper

Kafka uses ZooKeeper so before starting Kafka you have to make sure that ZooKeeper is up and running. We’ll be running the single node ZooKeeper instance packaged with Kafka. 

Go to your Kafka installation directory and open command prompt and start Zookeeper using the following command:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Once zookeeper is started you should see this in the command prompt:

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

4) Start Kafka Server

You need to open another command prompt in the Kafka installation directory and run the following command:

> bin\windows\kafka-server-start.bat \config\server.properties

Once the Kafka server is started you should see something like this in the command prompt:

INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

5) Validate the Kafka setup

Create a Kafka topic:

Open a command prompt in the Kafka installation directory and run the following command:

> bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

The above command will create a topic “testTopic” with 1 partition and 1 replication factor.

Produce messages to the created topic:

In the same command prompt start a kafka-console-producer and produce some messages:

> bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic testTopic
>Hello
>How are you?

Consume the messages produced by the kafka-console-producer:

Open another command prompt in the Kafka installation directory and start a kafka-console-consumer which subscribes to testTopic:

> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testTopic --from-beginning
Hello
How are you?

The messages which are produced by the kafka-console-producer will be visible on the kafka-console-consumer console.

Congratulations on completing a single broker Kafka setup on windows!

Install Kafka on macOS

I really like Homebrew to install stuff on macOS. So in this tutorial I’ll be using Hombrew to install Apache Kafka on macOS.

Install Homebrew:

Open Terminal, paste the following command and then press Return key:

$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

The above command will install Homebrew. Once it is installed, follow the next steps:

1)  Install Java8:

After Homebrew is installed, you can just paste the following commands in Terminal and press Return to install java on macOS:

$ brew tap adoptopenjdk/openjdk
$ brew cask install adoptopenjdk8

The above commands will install openjdk 8. In order to validate the java installation, execute the following command in Terminal:

$ java -version

If you get the following output, it means that java is installed correctly:

openjdk version "1.8.0_212"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.212-b04, mixed mode)

2) Install Kafka:

To install Kafka on macOS: open Terminal, paste the following command and then press Return:

$ brew install kafka

The above command will install Kafka along with Zookeeper.

Start Zookeeper:

To start ZooKeeper, paste the following command in the Terminal and press Return:

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Once ZooKeeper is started you’ll see something like this in the console:

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Start Kafka:

To start Kafka, paste the following command in another Terminal and press Return:

$ kafka-server-start /usr/local/etc/kafka/server.properties

Once Kafka is started you’ll see something like this in the console:

INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Create a Kafka topic:

Open another Terminal and execute the following command to create a topic testTopic with 1 partition and 1 replication factor:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

Produce messages to the created topic:

In the same Terminal prompt, start a kafka-console-producer and produce some messages:

$ kafka-console-producer --broker-list localhost:9092 --topic testTopic
>Hello
>How are you?

Consume the messages produced by the producer:

Open another Terminal and start a kafka-console-consumer which subscribes to testTopic:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning
Hello
How are you?

The messages which are produced by the kafka-console-producer will be visible on the kafka-console-consumer console.

Congratulations on setting up a single broker Kafka setup on macOS!

Exploring Twitter Data using Elasticseach & Kibana’s Canvas

After thinking a lot about what to write next, I stumbled upon a very cool idea.

And this is what I thought I should do: Discover something interesting using code ?

Let’s find out what’s the most popular drink among tea, coffee and beer in the world?  All using code!!!

Yes, you heard it right.

How do we do this?

First and foremost, we need data!! And by data, I mean real time data because trends may change day to day. Social media is full of data, and we should thank Twitter for writing a Java HTTP client for streaming real-time Tweets using Twitter’s own Streaming API.

This client is known as Hosebird Client (hbc). Though it was written by Twitter a long time back and Twitter has deprecated some of its features but it will perfectly work for our requirement.

Also, we need to store the streaming data into some data-store and for this purpose we’ll be using Elasticsearch.

Why Elasticsearch?

The sole purpose of using Elasticsearch is to use Kibana’s Canvas to further visualise the data.

Canvas is a whole new way of making data look amazing. Canvas combines data with colours, shapes, text, and your own imagination to bring dynamic, multi-page, pixel-perfect, data displays to screens large and small.

Elastic

In simple words it is an application which lets you visualise data stored in Elasticsearch in a better and customised way in real time (while data is being ingested in Elasticsearch) and is currently in beta release.

You’ll be thrilled to see the end result using Elasticsearch Canvas.

Note: For the demonstration Elasticsearch & Kibana 6.5.2 are used.

Prerequisites:

  • Make sure Elasticsearch and Kibana are installed.

Let’s get started. Cheers to the beginning ?

Follow the steps below to implement the above concept:

1) Setting up a maven project:

1.1) Create a Maven Project (for the demonstration I am using Eclipse IDE, you can use any IDE):

1.2) Skip the archetype selection:

1.3) Add the Group Id, Artifact Id and Name, then click Finish:

2) Configuring the maven project:

2.1) Open the pom.xml and add the following dependencies:

<dependencies>
	<dependency>
		<groupId>com.twitter</groupId>
		<artifactId>hbc-core</artifactId>
		<version>2.2.0</version>
	</dependency>
	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>transport</artifactId>
		<version>6.5.2</version>
	</dependency>
</dependencies>

These are the Java client libraries of Twitter and Elasticsearch.

2.2) Configuring the maven-compiler-plugin to use Java 8:

<project>
  [...]
  <build>
    [...]
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.0</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
    [...]
  </build>
  [...]
</project>

2.3) After this update the maven project:

Alternately you can also press Alt+F5 after selecting the project.

3) Create an Application class:

3.1) Go to src/main/java and create a new class:

3.2) Add the Package and Name of the class then click Finish:

4) Configure the Twitter Java Client:

4.1) Create a static method createTwitterClient in Application class and add the following lines of code:

public static Client createTwitterClient(BlockingQueue<String> msgQueue, List<String> terms) {
	Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
	StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
	hosebirdEndpoint.trackTerms(terms); // tweets with the specified terms
	Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
	ClientBuilder builder = new ClientBuilder().name("Twitter-Elastic-Client").hosts(hosebirdHosts)
				.authentication(hosebirdAuth).endpoint(hosebirdEndpoint)
				.processor(new StringDelimitedProcessor(msgQueue));
	Client hosebirdClient = builder.build();
	return hosebirdClient;
}

Notice, that this method expects two arguments: one is the BlockingQueue which is used as a message queue for the tweets and another is the List of terms we want our tweets to be filtered with (in our case “tea”, “coffee” & “beer”). So we are configuring our client to return real time filtered tweets (tweets with terms “tea”, “coffee” or “beer”).

Notice the line of code shown below:

Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

Twitter Java Client uses OAuth to provide authorised access to the Streaming API, which requires you to have the consumer/access keys and tokens.
So to stream Twitter data you must have the consumer/access keys and tokens.

4.2) Getting Twitter Consumer API/Access token keys:

Follow the link Getting Twitter Consumer API/Access token keys to obtain the keys and tokens.

After getting the Consumer API key, Consumer API secret key, Access token and Access token secret,add them as Strings in the Application class:

private final static String consumerKey = "xxxxxxxxxxxxxxxxxx";
private final static String consumerSecret = "xxxxxxxxxxxxxxxxxx";
private final static String token = "xxxxxxxxxxxxxxxxxx";
private final static String secret = "xxxxxxxxxxxxxxxxxx";

It is not advisable to put this info in the program itself and should be read from a config file but for brevity I am putting these values in Application class as static final Strings.

5) Configure the Elasticsearch Transport Client:

5.1) Create a static method createElasticTransportClient  in Application class and add the following lines of code:

public static TransportClient createElasticTransportClient() throws UnknownHostException {
	TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
			.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
	return client;
}

The above method returns a Transport Client which talks to locally running Elasticsearch on port 9300.

If your Elasticsearch is running on some other port or host then you may need to change the values of “localhost” to your “host” and “9300” to your “port”, if your Elasticsearch cluster name is different that “elasticsearch”, then you need to create the client like this:

TransportClient client = new PreBuiltTransportClient(Settings.builder().put("cluster.name", "myClusterName").build())
				.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

6) Streaming data from twitter:

Once the client establishes a connection:

// establish a connection
client.connect();

The blocking queue will now start being filled with messages. However we would like to read only first 1000 messages from the queue:

int count = 0;
while (!client.isDone() && count != 1000) {
  String msg = msgQueue.take(); // reading a tweet
  // Segregating the tweet and writing result to elasticsearch
  count++;
}

7) Segregating tweets based on terms and then indexing the segregated result to Elasticsearch:

For brevity I am streaming first 1000 tweets (containing terms “tea”, “coffee” & “beer”), segregating them one by one and indexing the results in Elasticsearch.

Example: Let’s say if a tweet contains the term “ tea ” then I will index the following document into Elasticsearch:

{ “tweet” : “tea” }

One thing I would like to clear: Let’s say if a tweet has tea and coffee both then I will consider only the first term. However, if you want to consider both the terms then hack into my repo stated at the end of this article.

This is how the complete Application class looks like:

package com.technocratsid.elastic;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;

public class Application {

	private final static String consumerKey = "xxxxxxxxxxxxxxxxxx";
	private final static String consumerSecret = "xxxxxxxxxxxxxxxxxx";
	private final static String token = "xxxxxxxxxxxxxxxxxx";
	private final static String secret = "xxxxxxxxxxxxxxxxxx";
	private static Logger logger = Logger.getLogger(Application.class.getName());

	public static void main(String[] args) {
		BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
		List<String> terms = Lists.newArrayList("tea", "coffee", "beer");

		// Elasticsearch Transport Client
		TransportClient elasticClient = createElasticTransportClient();

		// Twitter HoseBird Client
		Client client = createTwitterClient(msgQueue, terms);
		client.connect();

		String msg = null;
		int count = 0;

		// Streaming 1000 tweets
		while (!client.isDone() && count != 1000) {
			try {
				msg = msgQueue.take();
				logger.log(Level.INFO, msg);

				// Segregating the tweets
				if (msg.contains(" tea ")) {
					insertIntoElastic(elasticClient, "tea");
				} else if (msg.contains(" coffee ")) {
					insertIntoElastic(elasticClient, "coffee");
				} else {
					insertIntoElastic(elasticClient, "beer");
				}
				count++;
			} catch (InterruptedException ex) {
				logger.log(Level.SEVERE, ex.getMessage());
				client.stop();
			}
		}
		
		// Closing the clients 
		client.stop();
		elasticClient.close();
	}

	public static Client createTwitterClient(BlockingQueue<String> msgQueue, List<String> terms) {
		Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
		StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
		hosebirdEndpoint.trackTerms(terms); // tweets with the specified terms
		Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
		ClientBuilder builder = new ClientBuilder().name("Twitter-Elastic-Client").hosts(hosebirdHosts)
				.authentication(hosebirdAuth).endpoint(hosebirdEndpoint)
				.processor(new StringDelimitedProcessor(msgQueue));
		Client hosebirdClient = builder.build();
		return hosebirdClient;
	}

	@SuppressWarnings("resource")
	public static TransportClient createElasticTransportClient() {
		TransportClient client = null;
		try {
			client = new PreBuiltTransportClient(Settings.EMPTY)
					.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
		} catch (UnknownHostException ex) {
			logger.log(Level.SEVERE, ex.getMessage());
		}
		return client;
	}

	public static void insertIntoElastic(TransportClient client, String tweet) {
		try {
			client.prepareIndex("drink-popularity", "_doc")
					.setSource(jsonBuilder().startObject().field("tweet", tweet).endObject()).get();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

8) Configuring Canvas in Kibana:

Make sure your Kibana server is running. Mine is running locally at http://localhost:5601.

8.1) Go to http://localhost:5601.

8.2) Go to Dev Tools and perform the following requests:

PUT drink-popularity

The above PUT request creates an index drink-popularity.

PUT drink-popularity/_mapping/_doc
{
  "properties": {
    "tweet" : {
      "type" : "keyword"
    }
  }
}

The above request adds a new field tweet to the _doc mapping type.

8.3) Go to Canvas:


8.4) I have already created a Canvas workpad. You just need to download it from here and import it in your own Canvas by clicking on Import workpad JSON file and then selecting the downloaded JSON file.

8.5) Once you have imported the workpad, open it by clicking on Drink Popularity workpad from Canvas workpads list.

This is what you should see:

Now click on No of tweets metric:

On the side panel of Selected Layer click on Data:

Notice the Elasticsearch SQL query used to fetch total no of tweets. Looks familiar right?

The above Elasticsearch SQL counts the total number of documents in drink-popularity index.

Do the same for one of the Horizontal progress bars:

Notice the Data panel:

So the above query is counting the no of tweets where tweet = ‘tea’ and dividing it by total no of tweets i.e. 1000.

Same thing has been done for other two progress bars.

9) Run the program to see live results in Canvas:

Before running the program the initial Canvas looks like this:

Now run the Application class and enable the auto-refresh in Canvas to see live updates and notice the Canvas.

After sometime:

In the end:

Cheers !!! Beer is the winner 🙂 You can also check the results for a specific location by filtering the tweets based on location.

I hope you guys like the concept.

Feel free to hack into the github repo.

Getting Twitter Consumer API/Access token keys

To get Twitter’s Consumer API keys and Access token keys you must have a Twitter account.

Once your Twitter Account is ready follow the steps below:

1) Go to https://developer.twitter.com/ and sign in to your Twitter Account.

2) Click on Apps as shown in the image below:

3) Then, click on Create an app (if you already have a Twitter App, then you can skip this step):

After clicking if you get a popup as shown below:

Then click on Apply as a Twitter Development account is mandatory to create new apps.

After that you’ll get a page as shown below:

Click on Continue.

Now you’ll see a page as shown below:

Select I am requesting access for my own personal use. After selecting this option, you’ll be asked to provide Account name and Primary country of operation

You can enter whatever you want but for the demonstration I am entering Elastic Twitter Canvas as Account name and India as Primary country of operation (see image below).

After that click on Continue.

Now you’ll get a page shown in the below image. On the page, select Student project / Learning to code as your use case:

After that you’ve to describe few points about your project:

After writing at least 300 characters in the description box, select No for Will your product, service, or analysis make Twitter content or derived information available to a government entity? and then click on Continue:

In the next page accept the terms and conditions and click on Submit.

Once you click on Submit you’ll receive an email:

After that you need to verify your email and once the verification is done your Twitter Developer account will be ready.

Once your Twitter Developer Account is ready, click on Apps and then click on Create an app.

Provide an App Name & Description:

Enter a valid website name in the Website URL field. Since our app is for personal use, this isn’t really applicable. I have just entered a sample value. Please enter valid details if you wish to host your application.

Write the description:

Then click on Create.

After that you’ll get a popup as shown below:

After reviewing the Developer terms, click on Create. 

Bingo Your Twitter App is created!!

4) Getting the Keys and Tokens:

Click on Apps:

Click on Details:

After that go to Keys and Tokens:

These are my Customer API/Access keys and tokens. I have already changed the tokens and keys so you got no chance to access my data 😉

Spring Boot + Apache Spark

This post will guide you to create a simple web application using Spring Boot and Apache Spark.

For the demonstration we are going to build a maven project with Spring Boot 2.1.2 using the Spring Initializr web-based interface.

Cheers to the beginning 🙂

Please follow the steps below to create the classic Apache Spark’s WordCount example with Spring Boot :

1) Creating the Web Application template:

We’ll be using Spring Initializr to create the web application project structure.

Spring Initializr is a web application used to generate a Spring Boot project structure either in Maven or Gradle project specification.

Spring Initializr can be used in several ways, including:

  1. A web-based interface
  2. Using Spring Tool Suite
  3. Using the Spring Boot CLI

For brevity we’ll be using the Spring initializr web interface.

  1. Go to https://start.spring.io/.

Note: By default, the project type is Maven Project and if you wish to select Gradle then just click on the Maven Project drop down and select Gradle Project.

2. Enter Group and Artifact details:

3. Type Web in Search for dependencies and select the Web option.

4. Now click on Generate Project:

This will generate and download the spring-spark-word-count.zip file which is your maven project structure.

5. Unzip the file and then import it in your favourite IDE.

After you’ve imported the project in your IDE (in my case Eclipse) the project structure looks as follows:

The package names are automatically generated with the combination of group and artifact details.

Moving forward I’ve changed the package names from com.technocratsid.spring.spark.springsparkwordcount to com.technocratsid for brevity.

You can even do this while generating the project using Spring Initializr web interface. You just have to switch to full version and there you’ll find the option to change the package name.

2) Adding the required dependencies in pom.xml:

Add the following dependencies in your project’s pom.xml

<dependency>
	<groupId>com.thoughtworks.paranamer</groupId>
	<artifactId>paranamer</artifactId>
	<version>2.8</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>2.4.0</version>
</dependency>

Note: You might be thinking why we need to add the paranamer dependency as spark core dependency already has it? This is because JDK8 is compatible with paranamer version 2.8 or above and spark 2.4.0 uses paranamer version 2.7. So, if you won’t add the 2.8 version, you’ll get an error like this:

Request processing failed; nested exception is java.lang.ArrayIndexOutOfBoundsException: 10582

After this your complete pom.xml should look as follows:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.2.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.technocratsid.spring.spark</groupId>
	<artifactId>spring-spark-word-count</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>Spring Spark Word Count</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>com.thoughtworks.paranamer</groupId>
			<artifactId>paranamer</artifactId>
			<version>2.8</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.12</artifactId>
			<version>2.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

3) Adding the Spark Config:

Create a class SparkConfig.java in package com.technocratsid.config.

Add the following content to SparkConfig.java:

@Configuration
public class SparkConfig {

	@Value("${spark.app.name}")
	private String appName;
	@Value("${spark.master}")
	private String masterUri;

	@Bean
	public SparkConf conf() {
		return new SparkConf().setAppName(appName).setMaster(masterUri);
	}

	@Bean
	public JavaSparkContext sc() {
		return new JavaSparkContext(conf());
	}

}

Import the packages.

Note: Here we are declaring the JavaSparkContext and SparkConf as beans (using @Bean annotation) this tell the spring container to manage them for us.

@Configuration is used to tell Spring that this is a Java-based configuration file and contains the bean definitions.

@Value annotation is used to inject value from a properties file based on the the property name.

The application.properties file for properties spark.app.name and spark.master is inside src/main/resources and looks like this:

spark.app.name=Spring Spark Word Count Application
spark.master=local[2]

local[2] indicates to run spark locally with 2 worker threads.

If you wish to run the application with your remote spark cluster then edit spark.master pointing to your remote cluster.

4) Creating a service for Word Count:

Create a class WordCountService.java in package com.technocratsid.service and add the following content:

@Service
public class WordCountService {

	@Autowired
	JavaSparkContext sc;

	public Map<String, Long> getCount(List<String> wordList) {
		JavaRDD<String> words = sc.parallelize(wordList);
		Map<String, Long> wordCounts = words.countByValue();
		return wordCounts;
	}

}

Import the packages.

Note: This class holds our business logic which is converting the list of words into a JavaRDD and then counting them by value by calling countByValue() and returning the results.

@Service tells Spring that this file performs a business service.

@Autowired tells Spring to automatically wire or inject the value of variable from the beans which are managed by the the spring container.

5) Register a REST Controller with an endpoint:

Create a class WordCountController.java in package com.technocratsid.controller and add the following content:

@RestController
public class WordCountController {

	@Autowired
	WordCountService service;

	@RequestMapping(method = RequestMethod.POST, path = "/wordcount")
	public Map<String, Long> count(@RequestParam(required = true) String words) {
		List<String> wordList = Arrays.asList(words.split("\\|"));
		return service.getCount(wordList);
	}
}

Import the packages.

Note: This class registers an endpoint /wordcount for a POST request with a mandatory query parameter words which is basically a string like (“abc|pqr|xyz”) and we are splitting the words on pipes (|) to generate a list of words and then using our business service’s count() method with the list of words to get the word count.

6) Run the application:

Either run the SpringSparkWordCountApplication class as a Java Application from your IDE or use the following command:

mvn spring-boot:run

7) Test your application from a REST client:

For this demo I am using Insomnia REST Client which is quite handy with simple interface. You can use any REST client you want like Postman and Paw etc.

Once your application is up and running perform a POST request to the URL http://localhost:8080/wordcount with query parameter words=”Siddhant|Agnihotry|Technocrat|Siddhant|Sid”.

The response you’ll get:

You’ve just created your first Spring Boot Application and integrated Apache Spark with it.

If you want to hack into the code check out the github link.

Recommended book to learn Apache Spark: https://amzn.to/3f7XpAT