Package org.eso.ias.kafkautils
Class SimpleKafkaIasiosConsumer
java.lang.Object
org.eso.ias.kafkautils.SimpleKafkaIasiosConsumer
- All Implemented Interfaces:
KafkaStringsConsumer.StringsConsumer
- Direct Known Subclasses:
FilteredKafkaIasiosConsumer
,KafkaIasiosConsumer
public class SimpleKafkaIasiosConsumer
extends Object
implements KafkaStringsConsumer.StringsConsumer
KafkaIasiosConsumer gets the strings from the passed IASIO kafka topic
from the
KafkaStringsConsumer
and forwards the IASIOs to the listener.
The SimpleKafkaIasiosConsumer checks the timestaps of the received IASIOs against a threshold.
If the timestamp is too old it means that the reciver is slow processing events: old IASIOs are discarded,
a log is emitted and the consumers seeks to the end of the topic.
To decide what old means, the user must set the SeekIfOlderThanProName
to the number
of desired milliseconds otherwise SeekIfOlderThanDefault
is used-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interface
The listener to be notified of Iasios read from the kafka topic. -
Field Summary
FieldsModifier and TypeFieldDescriptionfinal long
If the difference (millisecs) between the timestamp of the records read from kafka topic and the actual time is greater than seekIfOlderThan, then the consumer seek to the end of topicstatic final String
The property to set the number of milliseconds between the actual time and the time of the records and seek to the end of the topic -
Constructor Summary
ConstructorsConstructorDescriptionSimpleKafkaIasiosConsumer
(String servers, String topicName, String consumerID) Build a FilteredStringConsumer with no filters (i.e. -
Method Summary
Modifier and TypeMethodDescriptionprotected boolean
Accept all the IASValues.long
void
setUp()
Initializes the consumer with default kafka propertiesvoid
setUp
(Properties userPros) Initializes the consumer with the passed kafka properties.void
startGettingEvents
(KafkaStringsConsumer.StreamPosition startReadingFrom, SimpleKafkaIasiosConsumer.IasioListener listener) Start processing received by the SimpleStringConsumer from the kafka channel.void
stringsReceived
(Collection<String> strings) Receive the string consumed from the kafka topic and forward the IASIOs to the listenervoid
tearDown()
Close and cleanup the consumer
-
Field Details
-
SeekIfOlderThanProName
The property to set the number of milliseconds between the actual time and the time of the records and seek to the end of the topic- See Also:
-
seekIfOlderThan
public final long seekIfOlderThanIf the difference (millisecs) between the timestamp of the records read from kafka topic and the actual time is greater than seekIfOlderThan, then the consumer seek to the end of topic
-
-
Constructor Details
-
SimpleKafkaIasiosConsumer
Build a FilteredStringConsumer with no filters (i.e. all the strings read from the kafka topic are forwarded to the listener)- Parameters:
servers
- The kafka servers to connect totopicName
- The name of the topic to get events fromconsumerID
- the ID of the consumer
-
-
Method Details
-
startGettingEvents
public void startGettingEvents(KafkaStringsConsumer.StreamPosition startReadingFrom, SimpleKafkaIasiosConsumer.IasioListener listener) throws KafkaUtilsException Start processing received by the SimpleStringConsumer from the kafka channel.This method starts the thread that polls the kafka topic and returns after the consumer has been assigned to at least one partition.
- Parameters:
startReadingFrom
- Starting position in the kafka partitionlistener
- The listener of events published in the topic- Throws:
KafkaUtilsException
- in case of timeout subscribing to the kafkatopic
-
stringsReceived
Receive the string consumed from the kafka topic and forward the IASIOs to the listener- Specified by:
stringsReceived
in interfaceKafkaStringsConsumer.StringsConsumer
- Parameters:
strings
- The strings read from the Kafka topic
-
accept
Accept all the IASValues. Objects that inherit fromSimpleKafkaIasiosConsumer
can implement their filtering by overriding this method.- Parameters:
iasio
- The IASValue to accept or discard- Returns:
- true if teh value is accpted; false otherwise
-
setUp
Initializes the consumer with the passed kafka properties.The defaults are used if not found in the parameter
- Parameters:
userPros
- The user defined kafka properties
-
setUp
public void setUp()Initializes the consumer with default kafka properties -
tearDown
public void tearDown()Close and cleanup the consumer -
getNumOfProcessedRecords
public long getNumOfProcessedRecords()- Returns:
- the number of records processed
-