Class KafkaStringsConsumer

java.lang.Object
org.eso.ias.kafkautils.KafkaStringsConsumer
All Implemented Interfaces:
Runnable, org.apache.kafka.clients.consumer.ConsumerRebalanceListener
Direct Known Subclasses:
SimpleStringConsumer

public class KafkaStringsConsumer extends Object implements Runnable, org.apache.kafka.clients.consumer.ConsumerRebalanceListener
A generic kafka consumer that forwards all the strings received in each message to the listener. Kafka optimize the polling by returning a bounch of messages to each poll: objects of this class returns all the records received in a poll to the listener. If the listener wants to process one message at a time, should use SimpleStringConsumer instead. KafkaStringsConsumer runs the next poll after the listener terminates processing the records.

Kafka properties are fully customizable by calling setUp(Properties): defaults values are used for the missing ones.

Life cycle: setUp() or setUp(Properties) must be called to initialize the object; tearDown() must be called when finished using the object; startGettingEvents(StringsConsumer, StreamPosition) must be called to start polling events from the kafka topic startGettingEvents(StringsConsumer, StreamPosition) returns when the consumer has been assigned to at least one partition. There are situations when the partitions assigned to the consumer can be revoked and reassigned like for example when another consumer subscribe or disconnect as the assignment of consumers to partitions is left to kafka in this version.

Author:
acaproni
  • Field Details

    • topicName

      protected final String topicName
      The name of the topic to get events from
    • WAIT_CONSUMER_READY_TIMEOUT_PROP_NAME

      public static final String WAIT_CONSUMER_READY_TIMEOUT_PROP_NAME
      The property to setup the waiting time until the consumer is ready (in msecs)
      See Also:
    • DEFAULT_CONSUMER_READY_TIMEOUT

      public static final long DEFAULT_CONSUMER_READY_TIMEOUT
      Default time to wait until the consumer is ready
      See Also:
    • MIN_CONSUMER_READY_TIMEOUT

      public static final long MIN_CONSUMER_READY_TIMEOUT
      Min allowed timeout to wait for the consumer ready
      See Also:
  • Constructor Details

    • KafkaStringsConsumer

      public KafkaStringsConsumer(String servers, String topicName, String consumerID)
      Constructor
      Parameters:
      servers - The kafka servers to connect to
      topicName - The name of the topic to get events from
      consumerID - the ID of the consumer
  • Method Details

    • getNumOfProcessedRecords

      public long getNumOfProcessedRecords()
      Returns:
      the number of records processed
    • setUp

      public void setUp(Properties userPros)
      Initializes the consumer with the passed kafka properties.

      The defaults are used if not found in the parameter

      Parameters:
      userPros - The user defined kafka properties
    • startGettingEvents

      public void startGettingEvents(KafkaStringsConsumer.StringsConsumer listener, KafkaStringsConsumer.StreamPosition startReadingFrom) throws KafkaUtilsException
      Start polling events from the kafka channel.

      This method starts the thread that polls the kafka topic and returns after the consumer has been assigned to at least one partition.

      Parameters:
      listener - The listener of events published in the topic
      startReadingFrom - Starting position in the kafka partition
      Throws:
      KafkaUtilsException - in case of timeout subscribing to the kafkatopic
    • setUp

      public void setUp()
      Initializes the consumer with default kafka properties
    • tearDown

      public void tearDown()
      Close and cleanup the consumer
    • onPartitionsRevoked

      public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> parts)
      Called before the rebalancing starts and after the consumer stopped consuming events.
      Specified by:
      onPartitionsRevoked in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
      Parameters:
      parts - The list of partitions that were assigned to the consumer and now need to be revoked (may not include all currently assigned partitions, i.e. there may still be some partitions left)
    • onPartitionsAssigned

      public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> parts)
      Called after partitions have been reassigned but before the consumer starts consuming messages
      Specified by:
      onPartitionsAssigned in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
      Parameters:
      parts - The list of partitions that are now assigned to the consumer (previously owned partitions will NOT be included, i.e. this list will only include newly added partitions)
    • onPartitionsLost

      public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> parts)
      Specified by:
      onPartitionsLost in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
      Parameters:
      parts - The list of partitions that were assigned to the consumer and now have been reassigned to other consumers. With the current protocol this will always include all of the consumer's previously assigned partitions, but this may change in future protocols (ie there would still be some partitions left)
    • run

      public void run()
      The thread to poll data from the topic
      Specified by:
      run in interface Runnable
      See Also:
    • seekTo

      public boolean seekTo(KafkaStringsConsumer.StreamPosition pos)
      Seek the consumer to the passed postion. Kafka allows to seek only if a partition is assigned otherwise the seek is ignored.
      Parameters:
      pos - The position to seek the consumer
      Returns:
      true is the seek has been done with an assigned partiton; false otherwise
    • isReady

      public boolean isReady()
      Return true if the consumer is ready. Kafka API does not provide a way to know if the consumer is ready. This method uses KafkaConsumer.assignment()} to understand if the consumer is ready: the consumer is ready if there are partitions assigned to it.
      Returns:
      true if the consumer is ready, false otherwise
    • waitUntilConsumerReady

      public void waitUntilConsumerReady()
      Waits until the consumer is ready or a timeout elapses. This methods waits until the consumer is ready (delegating to isReady())or a timeout elapses. If at the end of the timeout the consumer is not yet ready, this method only logs a warning as we do not want to block the process in case the consumer becomes ready after for whatever reason. But still the situation is probably not normal and we want to log the event. The timeout can be set by setting WAIT_CONSUMER_READY_TIMEOUT_PROP_NAME property otherwise the default (DEFAULT_CONSUMER_READY_TIMEOUT) is used.