Updating Kafka Offset Immediately

I am developing a Spring Boot app that reacts to messages pushed onto a Kafka queue.

The version is Spring Boot 2.0.5, Finchley.SR1.

The Kafka version is kafka_2.12-1.1.0

The issue I am facing is that sometimes when I restart the application it replays old messages. This doesn't always happen - the only pattern I have spotted is that it seems to be after a few days of inactivity (say on Monday morning, just after the weekend).

I stop and start the app multiple times during the day as part of the development and don't see the same issue, only sporadically. It isn't linked to errors in the application either, as all the processing is clean.

I have configured my Kafka listener to use MANUAL_IMMEDIATE acknowledgement, and call ack.acknowledge() at the end of the listener method.

My Spring property file looks as follows:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    listener:
      ack-mode: MANUAL_IMMEDIATE
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      group-id: user-mgmt-app

My Listener class is defined as follows:

@org.springframework.kafka.annotation.KafkaListener(topics = "aggregate-event-topic")
public void receive(ConsumerRecord<?, ?> cr, Acknowledgment ack) {

   ...
   ack.acknowledge();

}

I have one instance of the app running, so it's the leader in the consumer group each time.

I have used the Kafka tools to look at the offset for the consumer group, and one thing I've noticed is that when I breakpoint the app at the acknowledge step it's not updating the CURRENT-OFFSET, it only seems to update it once all messages have been processed.

./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group user-mgmt-app --describe

My understanding from other posts was that MANUAL_IMMEDIATE would update the Kafka server straight away after calling acknowedge(), rather than at the end of the batch.

Is my understanding incorrect? If so is there anyway to get the functionality I want (such as setting the batch size to 1 on each read from the partition, which I'm guessing may have performance implications). If so, how do I do this (any help gratefully accepted!)

TIA

1 answer

  • answered 2018-10-17 12:46 Gary Russell

    The issue I am facing is that sometimes when I restart the application it replays old messages. This doesn't always happen - the only pattern I have spotted is that it seems to be after a few days of inactivity (say on Monday morning, just after the weekend).

    I am guessing you are not using a 2.0.0 broker, where the default retention for consumer offsets was increased from 24 hours to 7 days. Older brokers expire the offsets after only one day - classic problem if you don't have messages over the weekend.

    See Notable Changes in 2.0.0.

    KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config offsets.retention.minutes to 1440.

    I am not sure why you are not seeing the offsets update via the command line tool. AckMode.RECORD will update the offset after each record. MANUAL_IMMEDIATE will update when you call acknowledge() as long as the Spring Kafka version is >= 1.3 (Boot 2.0.x will pull in Spring Kafka 2.0.x).