Class SimpleKafkaIasiosConsumer

java.lang.Object
org.eso.ias.kafkautils.SimpleKafkaIasiosConsumer
All Implemented Interfaces:
KafkaStringsConsumer.StringsConsumer
Direct Known Subclasses:
FilteredKafkaIasiosConsumer, KafkaIasiosConsumer

public class SimpleKafkaIasiosConsumer extends Object implements KafkaStringsConsumer.StringsConsumer
KafkaIasiosConsumer gets the strings from the passed IASIO kafka topic from the KafkaStringsConsumer and forwards the IASIOs to the listener. The SimpleKafkaIasiosConsumer checks the timestaps of the received IASIOs against a threshold. If the timestamp is too old it means that the reciver is slow processing events: old IASIOs are discarded, a log is emitted and the consumers seeks to the end of the topic. To decide what old means, the user must set the SeekIfOlderThanProName to the number of desired milliseconds otherwise SeekIfOlderThanDefault is used
  • Field Details

    • SeekIfOlderThanProName

      public static final String SeekIfOlderThanProName
      The property to set the number of milliseconds between the actual time and the time of the records and seek to the end of the topic
      See Also:
    • seekIfOlderThan

      public final long seekIfOlderThan
      If the difference (millisecs) between the timestamp of the records read from kafka topic and the actual time is greater than seekIfOlderThan, then the consumer seek to the end of topic
  • Constructor Details

    • SimpleKafkaIasiosConsumer

      public SimpleKafkaIasiosConsumer(String servers, String topicName, String consumerID)
      Build a FilteredStringConsumer with no filters (i.e. all the strings read from the kafka topic are forwarded to the listener)
      Parameters:
      servers - The kafka servers to connect to
      topicName - The name of the topic to get events from
      consumerID - the ID of the consumer
  • Method Details

    • startGettingEvents

      public void startGettingEvents(KafkaStringsConsumer.StreamPosition startReadingFrom, SimpleKafkaIasiosConsumer.IasioListener listener) throws KafkaUtilsException
      Start processing received by the SimpleStringConsumer from the kafka channel.

      This method starts the thread that polls the kafka topic and returns after the consumer has been assigned to at least one partition.

      Parameters:
      startReadingFrom - Starting position in the kafka partition
      listener - The listener of events published in the topic
      Throws:
      KafkaUtilsException - in case of timeout subscribing to the kafkatopic
    • stringsReceived

      public void stringsReceived(Collection<String> strings)
      Receive the string consumed from the kafka topic and forward the IASIOs to the listener
      Specified by:
      stringsReceived in interface KafkaStringsConsumer.StringsConsumer
      Parameters:
      strings - The strings read from the Kafka topic
    • accept

      protected boolean accept(IASValue<?> iasio)
      Accept all the IASValues. Objects that inherit from SimpleKafkaIasiosConsumer can implement their filtering by overriding this method.
      Parameters:
      iasio - The IASValue to accept or discard
      Returns:
      true if teh value is accpted; false otherwise
    • setUp

      public void setUp(Properties userPros)
      Initializes the consumer with the passed kafka properties.

      The defaults are used if not found in the parameter

      Parameters:
      userPros - The user defined kafka properties
    • setUp

      public void setUp()
      Initializes the consumer with default kafka properties
    • tearDown

      public void tearDown()
      Close and cleanup the consumer
    • getNumOfProcessedRecords

      public long getNumOfProcessedRecords()
      Returns:
      the number of records processed