Kafka Streams is a client library that is used to process a stream of messages on a topic and either store it within Kafka or send it to some other Kafka topic. One can check complete documentation of Kafka Streams.
In Kafka Stream, I will show with one example, how one can use Spring boot to transforming incoming stream. For this example, I will use my producer from the previous post-Kafka Spring Boot Example of Producer and Consumer.
In that one producer is simulating agents for pushing cities weather data to one Kafka topic.
WeatherInfo info = new WeatherInfo(city.getName(), city.getLatitude(), city.getLongitude(), temp); kafkaTemplate.send("extreme-weather",""+info.getLogDate().getTimeInMillis(), info);
Agent(s) should be sending weather data continuously on Kafka topic “extreme-weather”. I will show how you can use the Kafka Streams library to filter extreme weather and source it to another Kafka Topic for further processing.
Spring Boot Kafka Stream Configuration
Here is a simple Kafka Streams example to filter extreme where and source filtered weather data to another topic using stateless processing.
In the above code, you can see @EnableKafkaStreams annotation that enables Kafka Stream configuration.
There are 2 beans first one streamsConfig() bean used to configure the stream. We used JsonSerde to configure stream serialization from JSON to Weather object.
2nd bean is kStreamJson(), used to do the processing of the stream. I used KStream DSL method filter() to filter extreme weather. It will filter weather objects based on temp higher than 43℃ and lower than -13℃. After filtering I used the peek method to just show you can process each filter record of the stream. Finally, I source filtered stream data to another Kafka topic.
There are many operations available in KStream, you can read and learn those.
Check complete project on Github weather-kafka-stream.