Apache Kafka is open-source and widely used software for event stream platform. It is mainly used for service integration, data integration, creating data pipelines and real-time data analytics, and many. You can get more details from here Apache Kafka.
For development, it has libs for almost every development platform. In this post, I am going to explain how one can use Spring framework lib for Apache Kafka using Spring boot. Spring also provides lib for creating producers, consumers, and streaming apps.
In this example, I will create two sample apps using spring boot for Kafka producer and Kafka consumer. The producer will be a simulator agent for publishing weather(temperature) data to a Kafka Topic from worldwide and the consumer app will be used to process weather data and store it into Postgres monthly partitioned table.
You can refer to Installation and running Kafka post
Spring Boot Kafka Producer for Generating Weather Data
This Producer app is using world cities opensource MySQL db for city list with their geo infomation.
This can be found here countries-states-cities-database/world.sql at master · dr5hn/countries-states-cities-database (github.com)
In this example, MySQL has been used for the worlds’ cities list.
Spring Kafka Topic Config
For produce and consumer both we need to create one Spring Kafka topic config class, that we will automatically create topic(s).
The above code has a Kafka Admin client that will automatically create the Kafka topic not present in the Kafka cluster.
Kafka Producer Config
You also need to create one spring Kafka producer config. This will configure Kafka producer client bean. This will be used to stream weather information.
In the above config class, I used
StringSerializer for the producer key serialization and
JsonSerializer for value serialization.
KafkaTemplate bean will be used as producer client to publish weather information as json.
Service to publish weather data
Below snippet is implementation to publish city weather data to a Kafka topic, it will generate temp randomly between -20 to 50 degree celsius.
Spring boot application
Finally, I create a Spring boot application with a Schedule to publish all city data every hour.
Complete Spring boot Kafka producer code can be found on Github here
Spring Boot Kafka Consumer for Processing and storing Weather Data
In this section, I will tell how to create a simple Kafka consumer app using Spring boot. For storing weather data I used Postgres database partitioned table. This script will create Postgres partitioned table to store monthly data. For more details read Postgres Table Partitioning.
Spring Kafka Consumer Config
For the Consumer client, we need to create one spring boot Kafka consumer config. In this config, we will define the Kafka bootstrap server and deserializer config.
Most important this is to define Kafka’s consumer deserializer.
In the above code, we defined custom JSON deserialize as given below, we need to define the trusted package.
JsonDeserializer<WeatherInfo> deserializer = new JsonDeserializer<>(WeatherInfo.class); deserializer.setRemoveTypeHeaders(false); deserializer.addTrustedPackages("*"); deserializer.setUseTypeMapperForKey(true); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
We also defined kafkaListenerContainerFactory which will be used in listeners.
Spring Boot Service to Save Data to Postgres
Below service class will be used to save data to Postgres using Hibernate repository.
Spring Boot Application for Listener
Below is the code with Kafka topic listener.
In the above code, Listener will receive data from Kafka Consumer poller, which is taken care of by Spring library. This data will be saved to Postgres database partitioned table and partition will save monthly data in one partition.
You can run as many instances based on Kafka Topic partitions.
Complete Spring boot Kafka Consumer example can be found on my GIT repository.