ConsumersStatistics

org.eso.ias.kafkaneo.consumer.ConsumersStatistics
See theConsumersStatistics companion object
class ConsumersStatistics(val id: String, val timerPeriod: Int, val logLevel: Level) extends TimerTask

Build and publish the statistics for the consumers in the form of log messages.

The ConsumersStatistics collects the number of events read from the topics and periodically publish log messages with the statistics. The period to collect data and publish statistics and the log level to use are customizable.

ConsumersStatistics.start must be invoked to activate the timer task that periodically publishes the logs. ConsumersStatistics.stop) must be invoked to stop publishing the statistics. Once stopped the ConsumersStatistics cannot be started again.

In case of overflow incrementing the number of the events read from the topics, the numbers are reset to 0. For synchronization, ConsumersStatistics uses AtomicLong whose values are reset to 0 when there is an overflow. The operation of (increase -> checkOverflow -> reset) is not atomic so it could happen that the statistics are not reporting useful values in that case but such event is so rare that it is not worst to add more complexity.

Value parameters

id

The id for the consumer

logLevel:

the level of the logs published for the statistics

timerPeriod

the number of minutes to push statistics

Attributes

Companion
object
Graph
Supertypes
class TimerTask
trait Runnable
class Object
trait Matchable
class Any

Members list

Type members

Classlikes

class Statistics()

The numbers collected for each topic

The numbers collected for each topic

Value parameters

totMsgs

total number of messages (each message can contains more records)

totRecords

total number of events

totRecordsInPeriod

the number of records processed in the ongoing period It is reset when the period elapses

Attributes

Supertypes
class Object
trait Matchable
class Any

Value members

Concrete methods

def newRecordsReceived(topic: IasTopic, numOfRecords: Int): Unit

A new message has been received with the passed number of records.

A new message has been received with the passed number of records.

This function shall be invoked for every new message read from a topic (i.e. when the kafka poll exits)

Value parameters

numOfRecords

the number of records in the message

topic

the topic from where the records have been consumed

Attributes

def run(): Unit

The timer task that publishes the statistics

The timer task that publishes the statistics

Attributes

def start(): Unit

Start the timer task to publish the logs with statistics

Start the timer task to publish the logs with statistics

Attributes

def stop(): Unit

Stop the timer task to publish statistics

Stop the timer task to publish statistics

Attributes

Inherited methods

def cancel(): Boolean

Attributes

Inherited from:
TimerTask

Attributes

Inherited from:
TimerTask

Concrete fields

val id: String
val logLevel: Level
val started: AtomicBoolean

Signal if the timer has been started

Signal if the timer has been started

Attributes

The statistics for each topic

The statistics for each topic

Attributes

val stopped: AtomicBoolean

Signal if the timer has been stopped

Signal if the timer has been stopped

Attributes

val timer: Timer

The timer to periodically publish the logs of statistics

The timer to periodically publish the logs of statistics

Attributes

val timerPeriod: Int