IasValueProcessor

org.eso.ias.sink.IasValueProcessor
See theIasValueProcessor companion object
class IasValueProcessor(val processorIdentifier: String, val listeners: List[ValueListener], val kafkaServersOpt: Option[String], hbProducerOpt: Option[HbProducer], cmdManagerOpt: Option[CommandManager], inputSubscriberOpt: Option[InputSubscriber], val iasDao: IasDao, val iasioDaos: List[IasioDao], val templateDaos: List[TemplateDao]) extends InputsListener, AutoCloseable

The IasValueProcessor gets all the IasValues published in the BSDB and sends them to the listener for further processing.

The processing is triggered when the buffer receivedValues contains at least minSizeOfValsToProcessAtOnce items. It is also periodically triggered every periodicProcessingTime msecs.

In this version the IasValueProcessor does not take any action if one of the listeners is too slow apart of logging messages. The slowness is detected when the queue of received and not yet processed values grows too much. In this case the IasValueProcessor logs a warning. To avoid submitting too many logs, the message is logged with a throttling.

The buffer is bounded by maxBufferSize: if threads do not consume values fast enough the oldest values in the buffer are removed to avoid out of memory.

The IasValueProcessor monitors the termination time of the threads and kill threads that do not terminate in killThreadAfter seconds. To kill a thread, its close method is invoked and it will be removed from the active listener.

The constructor builds the HB producer, the input subscriber and the command manager unless they are passed as optional parameters. This is useful for customization or for testing but normally, in operation they are expected to be empty.

Value parameters

cmdManagerOpt

Th eoptional command manager

hbProducerOpt

the optional HbProducer

iasDao

The configuration of the IAS read from the CDB

iasioDaos

The configuration of the IASIOs read from the CDB

inputSubscriberOpt

The optional input subscriber

kafkaServersOpt

The optional string with the kafka servers (if empty the default from KafkaHelper will be used

listeners

the processors of the IasValues read from the BSDB

processorIdentifier

the identifier of the value processor

templateDaos

The configuration of templates read from CDB

Attributes

Companion
object
Graph
Supertypes
trait AutoCloseable
class Object
trait Matchable
class Any

Members list

Value members

Constructors

def this(processorIdentifier: String, listeners: List[ValueListener], kafkaServers: String, iasDao: IasDao, iasioDaos: List[IasioDao], templateDaos: List[TemplateDao])

Constructor that build data structor to connect to kafka using the passed server list

Constructor that build data structor to connect to kafka using the passed server list

This constructor is supposed to be used in operation

Value parameters

iasDao

The configuration of the IAS read from the CDB

iasioDaos

The configuration of the IASIOs read from the CDB

kafkaServers

The string with the kafka servers

listeners

the processors of the IasValues read from the BSDB

processorIdentifier

the identifier of the value processor

templateDaos

The configuration of templates read from CDB

Attributes

Concrete methods

The active listeners are those that are actively processing events. When a listeners throws an exception, it is marked as broken and will stop processing events

The active listeners are those that are actively processing events. When a listeners throws an exception, it is marked as broken and will stop processing events

Attributes

Returns

the active (not broken) listeners

Attributes

Returns

The broken (i.e. not active) listeners

override def close(): Unit

Closes the processor

Closes the processor

Attributes

Definition Classes
AutoCloseable
def init(): Try[Unit]

Initialize the processor

Initialize the processor

Attributes

override def inputsReceived(iasios: Iterable[IASValue[_]]): Unit

An IASIO has been read from the BSDB

An IASIO has been read from the BSDB

IASVales are initially grouped in the received IasValues

Value parameters

iasios

the IasValues read from the BSDB

Attributes

Definition Classes
def isThereActiveListener: Boolean

Attributes

Returns

true if there is at leat one active listener; false otherwise

Concrete fields

val bufferSizeThreshold: Integer

If the size of the buffer is greater than bufferSizeThreshold the processor emits a warning because the listener are too slow processing vales read from the BSDB

If the size of the buffer is greater than bufferSizeThreshold the processor emits a warning because the listener are too slow processing vales read from the BSDB

Attributes

val closed: AtomicBoolean

Signal if the processor has been closed

Signal if the processor has been closed

Attributes

val commandManager: CommandManager

The command manager

The command manager

Attributes

val executorService: ExecutorCompletionService[String]

The executor service to async process the IasValues in the listeners

The executor service to async process the IasValues in the listeners

Attributes

The heartbeat Engine

The heartbeat Engine

Attributes

val iasDao: IasDao
val iasioDaos: List[IasioDao]
val iasioDaosMap: Map[String, IasioDao]

The map of IasioDao by ID to pass to the listeners

The map of IasioDao by ID to pass to the listeners

Attributes

val initialized: AtomicBoolean

Signal if the processor has been initialized

Signal if the processor has been initialized

Attributes

The consumer of IASIOs from the kafka tiopic

The consumer of IASIOs from the kafka tiopic

Attributes

val kafkaServersOpt: Option[String]
val killThreadAfter: Int

Kill threads that do not terminate in killThreadAfter seconds

Kill threads that do not terminate in killThreadAfter seconds

Attributes

val lastProcessingTime: AtomicLong

The point in time when the values has been proccessed for the last time

The point in time when the values has been proccessed for the last time

Attributes

val lastSubmittedWarningTime: AtomicLong

The point in time when the last warning log has been submitted

The point in time when the last warning log has been submitted

Attributes

val logThrottlingTime: Long

A log to warn about the size of the buffer is submitted only if the the last one was published more then logThrottlingTime milliseconds before

A log to warn about the size of the buffer is submitted only if the the last one was published more then logThrottlingTime milliseconds before

Attributes

val maxBufferSize: Int

The max allowed size of the buffer of received and not yet processed IASValues (receivedIasValues): if the buffer grows over this limit, oldest values are removed

The max allowed size of the buffer of received and not yet processed IASValues (receivedIasValues): if the buffer grows over this limit, oldest values are removed

Attributes

IasValues to process are buffered and sent to the listener when the size of the buffer reached minSizeOfValsToProceesAtOnce size

IasValues to process are buffered and sent to the listener when the size of the buffer reached minSizeOfValsToProceesAtOnce size

Attributes

val periodicScheduledExecutor: ScheduledExecutorService

The periodic executor for periodic processing of values

The periodic executor for periodic processing of values

Attributes

val processorIdentifier: String
val receivedIasValues: ListBuffer[IASValue[_]]

Values received from the BSDB are saved in this list until being processed by the listeners

Values received from the BSDB are saved in this list until being processed by the listeners

Attributes

val shutdownHookThread: Thread

The hook for a clean shutdown

The hook for a clean shutdown

Attributes

val stringProducerOpt: Option[SimpleStringProducer]

The kafka string producer is defined only if needed

The kafka string producer is defined only if needed

Attributes

val suppressedWarningMessages: AtomicLong

The number of warning messages suppressed by the throttling

The number of warning messages suppressed by the throttling

Attributes

val templateDaos: List[TemplateDao]

The thread factory for the executors

The thread factory for the executors

Attributes

val threadsRunning: AtomicBoolean

Signal if at least one thread of the processors is still running

Signal if at least one thread of the processors is still running

Attributes

Timeout (secs) waiting for termination of threads: if a timeout elapses a log is issued reporting the name of threads that did not yet terminate for investigation

Timeout (secs) waiting for termination of threads: if a timeout elapses a log is issued reporting the name of threads that did not yet terminate for investigation

Attributes