Consumer

org.eso.ias.kafkaneo.consumer.Consumer
See theConsumer companion object
class Consumer[K, V](val id: String, val groupId: String, val kafkaServers: String, val keyDeserializer: String, val valueDeserializer: String, val startReadingPos: StartReadingPos, val kafkaProperties: Map[String, String]) extends Thread, ConsumerRebalanceListener

Kafka consumer to get events from the BSDB.

This consumer allows to get events from several topics: compared to consumers in the org.eso.ias.kafkautils package, a process can instantiate only one consumer to get events for more topics (for example IASIOs and commands).

Features:

  • more listeners for each topic
  • collection of events are delivered to listeners

'''Limitations''':

  • the consumer uses only one group.id as it belongs to only one group; this is subject to kafka strategy on assigning consumers to topic partitions depending on the group (and other consumers in the same group). In short if you want your consumer to get all events from topic 1 and all events from topic 2 (most common case in IAS) then you have to ensure that the consumer is the only one consumer in the group for both topics.
  • the serializer/deserializer is the same for all the topics for example a StringSerialize/StringDeserializer (for example the event is a JSON string, then it must be parsed to build the java object like a command or a IASIO)

Type parameters

K

The type of the key

V

The type of the value

Value parameters

groupId

the group.id property for the kafka consumer

id

the id of the consumer for the kafka consumer

kafkaProperties

Other kafka properties to allow the user to pass custom properties

kafkaServers

the string of servers and ports to connect to kafka servers

keyDeserializer

the name of the class to deserialize the key

startReadingPos

The position in the kafka stream to start reading message, default to the end of the stream

valueDeserializer

the name of the class to deserialize the value

Attributes

Constructor

create a new kafka consumer

Since

13.0

Companion
object
Graph
Supertypes
trait ConsumerRebalanceListener
class Thread
trait Runnable
class Object
trait Matchable
class Any
Show all

Members list

Type members

Classlikes

case class TopicEvent(key: Any, value: Any)

The events read from the topic.

The events read from the topic.

This Consumer does not deal with serialization so we do not want to force a type to keys and values at this stage.

Value parameters

key

The key

value

the value

Attributes

Supertypes
trait Serializable
trait Product
trait Equals
class Object
trait Matchable
class Any
Show all

Value members

Concrete methods

def addListener(listener: ConsumerListener[K, V]): Unit

Adds a listener (consumer) of events published in the topic

Adds a listener (consumer) of events published in the topic

Value parameters

listener

the listener (consumer) of events

Attributes

def close(): Unit

Stop getting events, close the connection with the BSDB and close the thread

Stop getting events, close the connection with the BSDB and close the thread

Attributes

def dispatchEvents(events: List[ConsumerRecord[K, V]]): Unit

Dispatch the events read from the kafka topic to the listeners

Dispatch the events read from the kafka topic to the listeners

Value parameters

events

the events read from the topics

Attributes

def init(): Unit

Start the consumer: connect to the BSDB and start getting events from the thread

Start the consumer: connect to the BSDB and start getting events from the thread

Attributes

override def onPartitionsAssigned(parts: Collection[TopicPartition]): Unit

Called after partitions have been reassigned but before the consumer starts consuming messages

Called after partitions have been reassigned but before the consumer starts consuming messages

Value parameters

parts

The list of partitions that are now assigned to the consumer (previously owned partitions will NOT be included, i.e. this list will only include newly added partitions)

Attributes

Definition Classes
ConsumerRebalanceListener
override def onPartitionsLost(parts: Collection[TopicPartition]): Unit

Value parameters

parts

The list of partitions that were assigned to the consumer and now have been reassigned to other consumers. With the current protocol this will always include all of the consumer's previously assigned partitions, but this may change in future protocols (ie there would still be some partitions left)

Attributes

Definition Classes
ConsumerRebalanceListener
override def onPartitionsRevoked(parts: Collection[TopicPartition]): Unit

Called before the rebalancing starts and after the consumer stopped consuming events.

Called before the rebalancing starts and after the consumer stopped consuming events.

Value parameters

parts

The list of partitions that were assigned to the consumer and now need to be revoked (may not include all currently assigned partitions, i.e. there may still be some partitions left)

Attributes

Definition Classes
ConsumerRebalanceListener
def removeListener(listener: ConsumerListener[K, V]): Boolean

Removes a listener (consumer) of events published in the topic

Removes a listener (consumer) of events published in the topic

Value parameters

listener

the listener (consumer) to remove

Attributes

Returns

true if the listener was in the container, false otherwise

override def run(): Unit

The thread to poll data from the topic

The thread to poll data from the topic

Attributes

See also

java.lang.Runnable#run()

Definition Classes
Thread -> Runnable

The topics from where this consumer get events from (i.e. the topics for which there are listeners)

The topics from where this consumer get events from (i.e. the topics for which there are listeners)

Attributes

Inherited methods

def getContextClassLoader(): ClassLoader

Attributes

Inherited from:
Thread
def getId(): Long

Attributes

Inherited from:
Thread
final def getName(): String

Attributes

Inherited from:
Thread
final def getPriority(): Int

Attributes

Inherited from:
Thread
def getStackTrace(): Array[StackTraceElement]

Attributes

Inherited from:
Thread
def getState(): State

Attributes

Inherited from:
Thread
final def getThreadGroup(): ThreadGroup

Attributes

Inherited from:
Thread
def getUncaughtExceptionHandler(): UncaughtExceptionHandler

Attributes

Inherited from:
Thread
def interrupt(): Unit

Attributes

Inherited from:
Thread
final def isAlive(): Boolean

Attributes

Inherited from:
Thread
final def isDaemon(): Boolean

Attributes

Inherited from:
Thread
def isInterrupted(): Boolean

Attributes

Inherited from:
Thread
final def join(): Unit

Attributes

Inherited from:
Thread
final def join(x$0: Long, x$1: Int): Unit

Attributes

Inherited from:
Thread
final def join(x$0: Long): Unit

Attributes

Inherited from:
Thread
def setContextClassLoader(x$0: ClassLoader): Unit

Attributes

Inherited from:
Thread
final def setDaemon(x$0: Boolean): Unit

Attributes

Inherited from:
Thread
final def setName(x$0: String): Unit

Attributes

Inherited from:
Thread
final def setPriority(x$0: Int): Unit

Attributes

Inherited from:
Thread
def setUncaughtExceptionHandler(x$0: UncaughtExceptionHandler): Unit

Attributes

Inherited from:
Thread
def start(): Unit

Attributes

Inherited from:
Thread
def toString(): String

Returns a string representation of the object.

Returns a string representation of the object.

The default representation is platform dependent.

Attributes

Returns

a string representation of the object.

Inherited from:
Thread

Deprecated and Inherited methods

@Deprecated(since = "17", forRemoval = true)
final def checkAccess(): Unit

Attributes

Deprecated
true
Inherited from:
Thread
@Deprecated(since = "1.2", forRemoval = true)
def countStackFrames(): Int

Attributes

Deprecated
true
Inherited from:
Thread
@Deprecated(since = "1.2", forRemoval = true)
final def resume(): Unit

Attributes

Deprecated
true
Inherited from:
Thread
@Deprecated(since = "1.2")
final def stop(): Unit

Attributes

Deprecated
true
Inherited from:
Thread
@Deprecated(since = "1.2", forRemoval = true)
final def suspend(): Unit

Attributes

Deprecated
true
Inherited from:
Thread

Concrete fields

val consumer: KafkaConsumer[K, V]

The Kafka consumer getting events from the kafka topic

The Kafka consumer getting events from the kafka topic

Attributes

val groupId: String
val id: String
val isClosed: AtomicBoolean

Signal if the consumer has been closed

Signal if the consumer has been closed

Attributes

val isStarted: AtomicBoolean

Signal if the consumer has been started

Signal if the consumer has been started

Attributes

val kafkaProperties: Map[String, String]
val kafkaServers: String
val keyDeserializer: String
val shutDownThread: Thread

The tread to shutdown the consumer if not done by the user

The tread to shutdown the consumer if not done by the user

Attributes

The statistics

The statistics

Attributes

val valueDeserializer: String