Class KafkaStringsConsumer
- All Implemented Interfaces:
Runnable
,org.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Direct Known Subclasses:
SimpleStringConsumer
SimpleStringConsumer
instead.
KafkaStringsConsumer
runs the next poll after the listener terminates processing the
records.
Kafka properties are fully customizable by calling setUp(Properties)
:
defaults values are used for the missing ones.
Life cycle: setUp()
or setUp(Properties)
must be called to initialize the object;
tearDown()
must be called when finished using the object;
startGettingEvents(StringsConsumer, StreamPosition)
must be called to start
polling events from the kafka topic
startGettingEvents(StringsConsumer, StreamPosition)
returns when the consumer has been assigned to
at least one partition. There are situations when the partitions assigned to the consumer
can be revoked and reassigned like for example when another consumer subscribe or disconnect
as the assignment of consumers to partitions is left to kafka in this version.
- Author:
- acaproni
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic enum
The start position when connecting to a kafka topic for reading stringsstatic interface
The interface for the listener of strings -
Field Summary
Modifier and TypeFieldDescriptionstatic final long
Default time to wait until the consumer is readystatic final long
Min allowed timeout to wait for the consumer readyprotected final String
The name of the topic to get events fromstatic final String
The property to setup the waiting time until the consumer is ready (in msecs) -
Constructor Summary
ConstructorDescriptionKafkaStringsConsumer
(String servers, String topicName, String consumerID) Constructor -
Method Summary
Modifier and TypeMethodDescriptionlong
boolean
isReady()
Return true if the consumer is ready.void
onPartitionsAssigned
(Collection<org.apache.kafka.common.TopicPartition> parts) Called after partitions have been reassigned but before the consumer starts consuming messagesvoid
onPartitionsLost
(Collection<org.apache.kafka.common.TopicPartition> parts) void
onPartitionsRevoked
(Collection<org.apache.kafka.common.TopicPartition> parts) Called before the rebalancing starts and after the consumer stopped consuming events.void
run()
The thread to poll data from the topicboolean
Seek the consumer to the passed postion.void
setUp()
Initializes the consumer with default kafka propertiesvoid
setUp
(Properties userPros) Initializes the consumer with the passed kafka properties.void
startGettingEvents
(KafkaStringsConsumer.StringsConsumer listener, KafkaStringsConsumer.StreamPosition startReadingFrom) Start polling events from the kafka channel.void
tearDown()
Close and cleanup the consumervoid
Waits until the consumer is ready or a timeout elapses.
-
Field Details
-
topicName
The name of the topic to get events from -
WAIT_CONSUMER_READY_TIMEOUT_PROP_NAME
The property to setup the waiting time until the consumer is ready (in msecs)- See Also:
-
DEFAULT_CONSUMER_READY_TIMEOUT
public static final long DEFAULT_CONSUMER_READY_TIMEOUTDefault time to wait until the consumer is ready- See Also:
-
MIN_CONSUMER_READY_TIMEOUT
public static final long MIN_CONSUMER_READY_TIMEOUTMin allowed timeout to wait for the consumer ready- See Also:
-
-
Constructor Details
-
KafkaStringsConsumer
Constructor- Parameters:
servers
- The kafka servers to connect totopicName
- The name of the topic to get events fromconsumerID
- the ID of the consumer
-
-
Method Details
-
getNumOfProcessedRecords
public long getNumOfProcessedRecords()- Returns:
- the number of records processed
-
setUp
Initializes the consumer with the passed kafka properties.The defaults are used if not found in the parameter
- Parameters:
userPros
- The user defined kafka properties
-
startGettingEvents
public void startGettingEvents(KafkaStringsConsumer.StringsConsumer listener, KafkaStringsConsumer.StreamPosition startReadingFrom) throws KafkaUtilsException Start polling events from the kafka channel.This method starts the thread that polls the kafka topic and returns after the consumer has been assigned to at least one partition.
- Parameters:
listener
- The listener of events published in the topicstartReadingFrom
- Starting position in the kafka partition- Throws:
KafkaUtilsException
- in case of timeout subscribing to the kafkatopic
-
setUp
public void setUp()Initializes the consumer with default kafka properties -
tearDown
public void tearDown()Close and cleanup the consumer -
onPartitionsRevoked
Called before the rebalancing starts and after the consumer stopped consuming events.- Specified by:
onPartitionsRevoked
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Parameters:
parts
- The list of partitions that were assigned to the consumer and now need to be revoked (may not include all currently assigned partitions, i.e. there may still be some partitions left)
-
onPartitionsAssigned
Called after partitions have been reassigned but before the consumer starts consuming messages- Specified by:
onPartitionsAssigned
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Parameters:
parts
- The list of partitions that are now assigned to the consumer (previously owned partitions will NOT be included, i.e. this list will only include newly added partitions)
-
onPartitionsLost
- Specified by:
onPartitionsLost
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Parameters:
parts
- The list of partitions that were assigned to the consumer and now have been reassigned to other consumers. With the current protocol this will always include all of the consumer's previously assigned partitions, but this may change in future protocols (ie there would still be some partitions left)
-
run
public void run()The thread to poll data from the topic -
seekTo
Seek the consumer to the passed postion. Kafka allows to seek only if a partition is assigned otherwise the seek is ignored.- Parameters:
pos
- The position to seek the consumer- Returns:
- true is the seek has been done with an assigned partiton; false otherwise
-
isReady
public boolean isReady()Return true if the consumer is ready. Kafka API does not provide a way to know if the consumer is ready. This method usesKafkaConsumer.assignment()
} to understand if the consumer is ready: the consumer is ready if there are partitions assigned to it.- Returns:
- true if the consumer is ready, false otherwise
-
waitUntilConsumerReady
public void waitUntilConsumerReady()Waits until the consumer is ready or a timeout elapses. This methods waits until the consumer is ready (delegating toisReady()
)or a timeout elapses. If at the end of the timeout the consumer is not yet ready, this method only logs a warning as we do not want to block the process in case the consumer becomes ready after for whatever reason. But still the situation is probably not normal and we want to log the event. The timeout can be set by settingWAIT_CONSUMER_READY_TIMEOUT_PROP_NAME
property otherwise the default (DEFAULT_CONSUMER_READY_TIMEOUT
) is used.
-