MTITEK.com
Spring Framework / Spring Kafka

Spring Kafka

This project covers Spring Kafka in two variants: mtitek-spring-kafka-json uses Spring Kafka's built-in JsonSerializer/JsonDeserializer with producer/consumer factories autoconfigured from application.yml; mtitek-spring-kafka-jackson uses Spring Kafka's JacksonJsonSerializer/JacksonJsonDeserializer and defines producer/consumer factories explicitly as beans in a @Configuration class. Both projects demonstrate three consumption patterns: raw KafkaConsumer, ConsumerFactory, and @KafkaListener. A running Kafka broker on localhost:9092 is required — no embedded broker is included.

Maven Dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Serializer Choice — JsonSerializer vs. JacksonJsonSerializer

Configuration — JSON project (application.yml)

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: mtitek-kafka-topic-1
    producer:
      keySerializer: org.springframework.kafka.support.serializer.JsonSerializer
      valueSerializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.trusted.packages]": "*"
  jackson:
    mapper:
      accept-case-insensitive-enums: true

Configuration — Jackson project (KafkaConfig)

@Bean
public ProducerFactory<String, AppProfile> appProfileProducerFactory() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configurations);
}

@Bean
public ConsumerFactory<String, AppProfile> appProfileConsumerFactory() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configurations.put(ConsumerConfig.GROUP_ID_CONFIG, "mtitek-kafka-consumer-group-id-b");
    configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
    configurations.put(JacksonJsonDeserializer.TRUSTED_PACKAGES, "com.mtitek.spring.model");
    configurations.put(JacksonJsonDeserializer.VALUE_DEFAULT_TYPE, AppProfile.class);
    configurations.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
    configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(configurations);
}

@Bean
public KafkaTemplate<String, AppProfile> kafkaTemplate(
        ProducerFactory<String, AppProfile> producerFactory,
        ConsumerFactory<String, AppProfile> consumerFactory) {
    KafkaTemplate<String, AppProfile> kafkaTemplate = new KafkaTemplate<>(producerFactory);
    kafkaTemplate.setConsumerFactory(consumerFactory);
    return kafkaTemplate;
}

Producer — KafkaTemplate

// single message
kafkaTemplate.send("mtitek-kafka-topic-1", 0, "key0", appProfile);

// batch — send() is non-blocking; each call returns a CompletableFuture
appProfiles.forEach(ap -> kafkaTemplate.send(TOPIC, 0, "key2", ap));

Consumer — Three Patterns

Raw KafkaConsumer

public synchronized List<AppProfile> pollMessages() {
    ConsumerRecords<String, AppProfile> records = consumer.poll(Duration.ofSeconds(5));

    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    records.partitions().forEach(tp ->
        offsets.put(tp, new OffsetAndMetadata(records.records(tp).getLast().offset() + 1))
    );

    consumer.commitSync(offsets);

    return StreamSupport.stream(records.spliterator(), false)
            .map(ConsumerRecord::value)
            .toList();
}

ConsumerFactory

public AppProfileConsumerFactoryController(ConsumerFactory<String, AppProfile> consumerFactory) {
    this.consumer = consumerFactory.createConsumer();
    this.consumer.assign(List.of(new TopicPartition(TOPIC, 0)));
}

@KafkaListener

@KafkaListener(topics = "mtitek-kafka-topic-1", groupId = "mtitek-kafka-consumer-group-id-c")
public void listen(AppProfile appProfile, ConsumerRecord<String, AppProfile> consumerRecord) {
    System.out.println("Received Message: " + appProfile + " - " + consumerRecord);
    // ConsumerRecord exposes: topic, partition, offset, timestamp, key, value, headers
}

KafkaTemplate.receive() — Synchronous Single-Record Fetch

@GetMapping("/messages/{topic}/{partition}/{offset}")
public String receiveMessage(@PathVariable String topic,
        @PathVariable int partition, @PathVariable long offset) {
    ConsumerRecord<String, AppProfile> record = kafkaTemplate.receive(topic, partition, offset);
    return record != null ? record.value().toString() : "No message found";
}