In an evolving Apache Kafka-based application, consumer tasks become increasingly complex. The existing consumer code lacks support for advanced operations like aggregation and enrichment, requiring extensive framework development. Managing statefulness in memory poses fault tolerance risks, necessitating complex persistence schemes. Apache Kafka's stream processing API, Kafka Streams, addresses these challenges efficiently.
What is Kafka Streams?
Kafka Streams simplifies stream processing in Java by offering primitives like filtering, grouping, and aggregation without the need for additional framework code. It efficiently manages large amounts of state and supports distributed processing across machine clusters. This enables seamless integration with other functions in microservices, such as combining event streams for notifications while also serving REST APIs for synchronous queries, resulting in scalable and fault-tolerant stream processing.
Stream API Example
StreamsBuilder builder = new StreamsBuilder();
// Parsing raw concert data
builder.stream("raw-concerts", Consumed.with(Serdes.Long(), Serdes.String()))
.mapValues(Parser::parseConcert)
.map((key, concert) -〉 new KeyValue〈〉(concert.getConcertId(), concert))
.to("concerts", Produced.with(Serdes.Long(), concertSerde));
// Creating a KTable for concerts
KTable〈Long, Concert〉 concerts = builder.table("concerts", Materialized
.〈Long, Concert, KeyValueStore〈Bytes, byte[]〉〉as("concerts-store")
.withValueSerde(concertSerde)
.withKeySerde(Serdes.Long()));
// Parsing raw attendance data
KStream〈Long, String〉 rawAttendance = builder.stream("raw-attendance", Consumed.with(Serdes.Long(), Serdes.String()));
KStream〈Long, Attendance〉 attendance = rawAttendance.mapValues(Parser::parseAttendance)
.map((key, attendanceRecord) -〉 new KeyValue〈〉(attendanceRecord.getConcertId(), attendanceRecord));
// Calculating average attendance per concert
KGroupedStream〈Long, Integer〉 attendanceById = attendance.mapValues(Attendance::getNumberOfAttendees).groupByKey();
KTable〈Long, Long〉 attendanceCounts = attendanceById.count();
KTable〈Long, Integer〉 attendanceSums = attendanceById.reduce(Integer::sum);
KTable〈Long, Double〉 averageAttendance = attendanceSums.join(attendanceCounts, (sum, count) -〉 sum / count.doubleValue(),
Materialized.as("average-attendance"));
averageAttendance.toStream().to("average-attendance");
// Joining concert data with average attendance
KTable〈Long, String〉 ratedConcerts = averageAttendance.join(concerts, (avgAttendance, concert) -〉
concert.getName() + " - Average Attendance: " + avgAttendance);
ratedConcerts.toStream().to("rated-concerts", Produced.with(Serdes.Long(), Serdes.String()));
This code snippet processes raw data about rap concerts, calculates average attendance per concert, and then joins the concert data with average attendance to produce rated concerts. Finally, it writes the rated concerts to an output topic.