This project covers Spring Kafka in two variants: mtitek-spring-kafka-json uses Spring Kafka's built-in
JsonSerializer/JsonDeserializer with producer/consumer factories autoconfigured from
application.yml; mtitek-spring-kafka-jackson uses Spring Kafka's
JacksonJsonSerializer/JacksonJsonDeserializer and defines producer/consumer factories
explicitly as beans in a @Configuration class.
Both projects demonstrate three consumption patterns: raw KafkaConsumer, ConsumerFactory,
and @KafkaListener. A running Kafka broker on localhost:9092 is required — no embedded broker is included.
spring-boot-starter-kafka pulls in spring-kafka and the Apache Kafka client. jackson-databind is declared explicitly — it is required by both JsonSerializer and JacksonJsonSerializer but is not transitively guaranteed by the Kafka starter alone.<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
JsonSerializer/JsonDeserializer (spring-kafka built-in) embed a __TypeInfo header in each Kafka record containing the fully qualified class name of the serialized type. The consumer uses this header to determine the target deserialization type — so VALUE_DEFAULT_TYPE is optional when the header is present. The trusted.packages property is a security gate that must whitelist the producer's package; "*" trusts all packages (used in the JSON project).JacksonJsonSerializer/JacksonJsonDeserializer (also spring-kafka, Jackson-backed) do not embed type headers by default. The consumer must explicitly declare VALUE_DEFAULT_TYPE pointing to the target class, and TRUSTED_PACKAGES must whitelist it. Without VALUE_DEFAULT_TYPE, deserialization produces a LinkedHashMap instead of an AppProfile.spring.jackson.mapper.accept-case-insensitive-enums: true is set in both projects — it configures the shared Jackson ObjectMapper to accept enum values regardless of case in JSON payloads.application.yml — no @Configuration class is needed. Spring Boot's Kafka autoconfiguration reads spring.kafka.producer.* and spring.kafka.consumer.* and creates DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory beans automatically.spring.kafka.template.default-topic sets the topic used by kafkaTemplate.send(message) (no topic argument). It does not affect kafkaTemplate.send(topic, ...) calls that specify a topic explicitly."[spring.json.trusted.packages]" YAML key requires bracket quoting because the dots would otherwise be interpreted as nested YAML keys.spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: mtitek-kafka-topic-1
producer:
keySerializer: org.springframework.kafka.support.serializer.JsonSerializer
valueSerializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
"[spring.json.trusted.packages]": "*"
jackson:
mapper:
accept-case-insensitive-enums: true
ProducerFactory and ConsumerFactory beans override Spring Boot's autoconfigured ones entirely — the application.yml producer/consumer properties are ignored when matching-type beans are present in the context.ENABLE_AUTO_COMMIT_CONFIG = false disables Kafka's automatic offset commit. Offsets must be committed manually via consumer.commitSync() — giving the application control over exactly-once or at-least-once semantics. Without this, offsets are committed on a background timer regardless of processing success.AUTO_OFFSET_RESET_CONFIG = "earliest" causes a new consumer group (or one with no committed offsets) to start reading from the beginning of the partition. The alternative "latest" skips all existing records and only reads new ones produced after the consumer starts."com.mtitek.spring.kafka.producerfactory": "producerfactory1" is a user-defined marker — Kafka ignores unknown properties, so this has no runtime effect. It appears to be a label for identification purposes.kafkaTemplate.setConsumerFactory(consumerFactory) is required to enable kafkaTemplate.receive(topic, partition, offset) — a synchronous single-record fetch used in AppProfileKafkaTemplateController. Without it, receive() throws because the template has no consumer factory to create a temporary consumer from.@Bean
public ProducerFactory<String, AppProfile> appProfileProducerFactory() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configurations);
}
@Bean
public ConsumerFactory<String, AppProfile> appProfileConsumerFactory() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, "mtitek-kafka-consumer-group-id-b");
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
configurations.put(JacksonJsonDeserializer.TRUSTED_PACKAGES, "com.mtitek.spring.model");
configurations.put(JacksonJsonDeserializer.VALUE_DEFAULT_TYPE, AppProfile.class);
configurations.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(configurations);
}
@Bean
public KafkaTemplate<String, AppProfile> kafkaTemplate(
ProducerFactory<String, AppProfile> producerFactory,
ConsumerFactory<String, AppProfile> consumerFactory) {
KafkaTemplate<String, AppProfile> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setConsumerFactory(consumerFactory);
return kafkaTemplate;
}
kafkaTemplate.send(topic, partition, key, value) specifies topic, partition (0), and key explicitly. Pinning partition 0 means all messages go to the same partition — appropriate for this single-partition demo topic but must be reconsidered for production throughput.AppProfileProducerFactoryController sends to mtitek-kafka-topic-2 while AppProfileKafkaMessagingService sends to mtitek-kafka-topic-1 — two separate topics consumed by different consumer group configurations.// single message
kafkaTemplate.send("mtitek-kafka-topic-1", 0, "key0", appProfile);
// batch — send() is non-blocking; each call returns a CompletableFuture
appProfiles.forEach(ap -> kafkaTemplate.send(TOPIC, 0, "key2", ap));
consumer.assign(List.of(new TopicPartition(TOPIC, 0))) manually assigns a partition without joining a consumer group. This bypasses group coordination and rebalancing — the consumer always reads partition 0 regardless of other consumers. subscribe() would use group-based assignment instead.pollMessages() method is synchronized — KafkaConsumer is not thread-safe, so concurrent HTTP requests to the endpoint would otherwise cause ConcurrentModificationException.records.records(tp).getLast().offset() + 1 computes the next offset to commit (committed offset = last processed offset + 1). Committing the last record's offset (without +1) would re-deliver that record on the next poll.public synchronized List<AppProfile> pollMessages() {
ConsumerRecords<String, AppProfile> records = consumer.poll(Duration.ofSeconds(5));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.partitions().forEach(tp ->
offsets.put(tp, new OffsetAndMetadata(records.records(tp).getLast().offset() + 1))
);
consumer.commitSync(offsets);
return StreamSupport.stream(records.spliterator(), false)
.map(ConsumerRecord::value)
.toList();
}
consumerFactory.createConsumer() instantiates a consumer from the factory's configuration — reusing the GROUP_ID_CONFIG, deserializer, and other settings defined in KafkaConfig. This avoids duplicating consumer properties between the factory and the controller.poll/commitSync pattern applies — the consumer is still a raw Consumer<String, AppProfile> instance under the hood.public AppProfileConsumerFactoryController(ConsumerFactory<String, AppProfile> consumerFactory) {
this.consumer = consumerFactory.createConsumer();
this.consumer.assign(List.of(new TopicPartition(TOPIC, 0)));
}
@KafkaListener runs in a Spring-managed listener container — offset commits, threading, and error handling are delegated to the container. The listener uses its own consumer group (mtitek-kafka-consumer-group-id-c), independent of the raw consumers above.ConsumerRecord: listen(AppProfile appProfile, ConsumerRecord<String, AppProfile> consumerRecord). Spring Kafka resolves method parameters by type — the payload is injected as the deserialized value, and ConsumerRecord provides access to offset, partition, key, headers, and timestamp. Alternatively, Message<AppProfile> can be used to access Kafka metadata as message headers (commented-out variant in the code).listen(String message) variant shows raw string consumption — requires no value deserializer configuration but receives the JSON string unparsed.@KafkaListener(topics = "mtitek-kafka-topic-1", groupId = "mtitek-kafka-consumer-group-id-c")
public void listen(AppProfile appProfile, ConsumerRecord<String, AppProfile> consumerRecord) {
System.out.println("Received Message: " + appProfile + " - " + consumerRecord);
// ConsumerRecord exposes: topic, partition, offset, timestamp, key, value, headers
}
kafkaTemplate.receive(topic, partition, offset) fetches a single record at a specific offset synchronously — bypassing the listener container and any consumer group. It requires kafkaTemplate.setConsumerFactory(...) to have been called; otherwise it throws IllegalStateException.null if no record exists at that offset (the offset is beyond the log end, or the record has been deleted by retention). The controller handles this with a null check.@GetMapping("/messages/{topic}/{partition}/{offset}")
public String receiveMessage(@PathVariable String topic,
@PathVariable int partition, @PathVariable long offset) {
ConsumerRecord<String, AppProfile> record = kafkaTemplate.receive(topic, partition, offset);
return record != null ? record.value().toString() : "No message found";
}