
See theIasValueProcessor companion object
class IasValueProcessor(val processorIdentifier: String, val listeners: List[ValueListener], val kafkaServersOpt: Option[String], hbProducerOpt: Option[HbProducer], cmdManagerOpt: Option[CommandManager], inputSubscriberOpt: Option[InputSubscriber], val iasDao: IasDao, val iasioDaos: List[IasioDao], val templateDaos: List[TemplateDao]) extends InputsListener, AutoCloseable

The IasValueProcessor gets all the IasValues published in the BSDB and sends them to the listener for further processing.

The processing is triggered when the buffer receivedValues contains at least minSizeOfValsToProcessAtOnce items. It is also periodically triggered every periodicProcessingTime msecs.

In this version the IasValueProcessor does not take any action if one of the listeners is too slow apart of logging messages. The slowness is detected when the queue of received and not yet processed values grows too much. In this case the IasValueProcessor logs a warning. To avoid submitting too many logs, the message is logged with a throttling.

The buffer is bounded by maxBufferSize: if threads do not consume values fast enough the oldest values in the buffer are removed to avoid out of memory.

The IasValueProcessor monitors the termination time of the threads and kill threads that do not terminate in killThreadAfter seconds. To kill a thread, its close method is invoked and it will be removed from the active listener.

The constructor builds the HB producer, the input subscriber and the command manager unless they are passed as optional parameters. This is useful for customization or for testing but normally, in operation they are expected to be empty.

Value parameters


Th eoptional command manager


the optional HbProducer


The configuration of the IAS read from the CDB


The configuration of the IASIOs read from the CDB


The optional input subscriber


The optional string with the kafka servers (if empty the default from KafkaHelper will be used


the processors of the IasValues read from the BSDB


the identifier of the value processor


The configuration of templates read from CDB


trait AutoCloseable
class Object
trait Matchable
class Any

Members list

Value members


def this(processorIdentifier: String, listeners: List[ValueListener], kafkaServers: String, iasDao: IasDao, iasioDaos: List[IasioDao], templateDaos: List[TemplateDao])

Constructor that build data structor to connect to kafka using the passed server list

Constructor that build data structor to connect to kafka using the passed server list

This constructor is supposed to be used in operation

Value parameters


The configuration of the IAS read from the CDB


The configuration of the IASIOs read from the CDB


The string with the kafka servers


the processors of the IasValues read from the BSDB


the identifier of the value processor


The configuration of templates read from CDB


Concrete methods

The active listeners are those that are actively processing events. When a listeners throws an exception, it is marked as broken and will stop processing events

The active listeners are those that are actively processing events. When a listeners throws an exception, it is marked as broken and will stop processing events



the active (not broken) listeners



The broken (i.e. not active) listeners

override def close(): Unit

Closes the processor

Closes the processor


Definition Classes
def init(): Try[Unit]

Initialize the processor

Initialize the processor


override def inputsReceived(iasios: Iterable[IASValue[_]]): Unit

An IASIO has been read from the BSDB

An IASIO has been read from the BSDB

IASVales are initially grouped in the received IasValues

Value parameters


the IasValues read from the BSDB


Definition Classes
def isThereActiveListener: Boolean



true if there is at leat one active listener; false otherwise

Concrete fields

val bufferSizeThreshold: Integer

If the size of the buffer is greater than bufferSizeThreshold the processor emits a warning because the listener are too slow processing vales read from the BSDB

If the size of the buffer is greater than bufferSizeThreshold the processor emits a warning because the listener are too slow processing vales read from the BSDB


val closed: AtomicBoolean

Signal if the processor has been closed

Signal if the processor has been closed


val commandManager: CommandManager

The command manager

The command manager


val executorService: ExecutorCompletionService[String]

The executor service to async process the IasValues in the listeners

The executor service to async process the IasValues in the listeners


The heartbeat Engine

The heartbeat Engine


val iasDao: IasDao
val iasioDaos: List[IasioDao]
val iasioDaosMap: Map[String, IasioDao]

The map of IasioDao by ID to pass to the listeners

The map of IasioDao by ID to pass to the listeners


val initialized: AtomicBoolean

Signal if the processor has been initialized

Signal if the processor has been initialized


The consumer of IASIOs from the kafka tiopic

The consumer of IASIOs from the kafka tiopic


val kafkaServersOpt: Option[String]
val killThreadAfter: Int

Kill threads that do not terminate in killThreadAfter seconds

Kill threads that do not terminate in killThreadAfter seconds


val lastProcessingTime: AtomicLong

The point in time when the values has been proccessed for the last time

The point in time when the values has been proccessed for the last time


val lastSubmittedWarningTime: AtomicLong

The point in time when the last warning log has been submitted

The point in time when the last warning log has been submitted


val logThrottlingTime: Long

A log to warn about the size of the buffer is submitted only if the the last one was published more then logThrottlingTime milliseconds before

A log to warn about the size of the buffer is submitted only if the the last one was published more then logThrottlingTime milliseconds before


val maxBufferSize: Int

The max allowed size of the buffer of received and not yet processed IASValues (receivedIasValues): if the buffer grows over this limit, oldest values are removed

The max allowed size of the buffer of received and not yet processed IASValues (receivedIasValues): if the buffer grows over this limit, oldest values are removed


IasValues to process are buffered and sent to the listener when the size of the buffer reached minSizeOfValsToProceesAtOnce size

IasValues to process are buffered and sent to the listener when the size of the buffer reached minSizeOfValsToProceesAtOnce size


val periodicScheduledExecutor: ScheduledExecutorService

The periodic executor for periodic processing of values

The periodic executor for periodic processing of values


val processorIdentifier: String
val receivedIasValues: ListBuffer[IASValue[_]]

Values received from the BSDB are saved in this list until being processed by the listeners

Values received from the BSDB are saved in this list until being processed by the listeners


val shutdownHookThread: Thread

The hook for a clean shutdown

The hook for a clean shutdown


val stringProducerOpt: Option[SimpleStringProducer]

The kafka string producer is defined only if needed

The kafka string producer is defined only if needed


val suppressedWarningMessages: AtomicLong

The number of warning messages suppressed by the throttling

The number of warning messages suppressed by the throttling


val templateDaos: List[TemplateDao]

The thread factory for the executors

The thread factory for the executors


val threadsRunning: AtomicBoolean

Signal if at least one thread of the processors is still running

Signal if at least one thread of the processors is still running


Timeout (secs) waiting for termination of threads: if a timeout elapses a log is issued reporting the name of threads that did not yet terminate for investigation

Timeout (secs) waiting for termination of threads: if a timeout elapses a log is issued reporting the name of threads that did not yet terminate for investigation
