Problem Statement

Current Gobblin Kafka High Level Consumer uses Kafka Consumer (0.8) APIs and gobblin support for them will be deprecated. The Re-design's primary goal is to replace old kafka consumer APIs like ConsumerConnector and MessageAndMetadata with a consumer abstraction GobblinKafkaConsumerClient. Additionally, the old design uses kafka auto commit feature which can cause potential loss of messages when offsets are committed and the system fails before messages are processed.

Detailed design and implementation details can be found here

New Design & Details

GobblinKafkaConsumerClient

The new design uses GobblinKafkaConsumerClient which is a simplified, generic wrapper client to communicate with Kafka. This class does not depend on classes defined in kafka-clients library. This allows the high level consumer to work with different versions of kafka. Concrete classes implementing this interface use a specific version of kafka-client library. See Kafka09ConsumerClient

Manual Offset Commit

GobblinKafkaConsumerClient API has been enhanced to allow manual committing of offsets.

  /**
   * Commit offsets manually to Kafka asynchronously
   */
  default void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
    return;
  }

  /**
   * Commit offsets manually to Kafka synchronously
   */
  default void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
    return;
  }

  /**
   * returns the last committed offset for a KafkaPartition
   * @param partition
   * @return last committed offset or -1 for invalid KafkaPartition
   */
  default long committed(KafkaPartition partition) {
    return -1L;
  }

High level consumer records topic partitions and their offsets AFTER the messages are processed and commits them periodically to kafka. This ensures at-least once delivery in case of a failure.

Additionally, APIs are provided to subscribe to a topic along with a GobblinKafkaRebalanceListener that provides hooks to when a consumer joins/leaves a consumer group. In this case, we commit remaining offsets and clear offset caches.

  /**
   * Subscribe to a topic
   * @param topic
   */
  default void subscribe(String topic) {
    return;
  }

  /**
   * Subscribe to a topic along with a GobblinKafkaRebalanceListener
   * @param topic
   */
  default void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
    return;
  }