DasuImpl

org.eso.ias.dasu.DasuImpl
See theDasuImpl companion object
class DasuImpl(dasuIdentifier: Identifier, dasuDao: DasuDao, outputPublisher: OutputPublisher, inputSubscriber: InputSubscriber, autoSendTimeInterval: Integer, validityThreshold: Integer) extends Dasu

The implementation of the DASU.

A DASU normally has 2 threads running:

  • the automatic sending of the output when no new input arrives or the output did not change
  • a thread (with throttling) to process the inputs and generate the output

As of #248, the DASU processes the inputs to generate the output in a thread that is immediately executed upon reception of the inputs. To avoid using 100% of CPU two consecutive generation of the output are delayed by a throttling time interval. During that time the DASU continues to collect inputs.

Value parameters

autoSendTimeInterval

the time (seconds) to automatically resend the last calculated

dasuDao

The CDB configuration of the DASU

dasuIdentifier

the identifier of the DASU

inputSubscriber

the subscriber getting events to be processed

outputPublisher

the publisher to send the output

validityThreshold

the max delay (secs) before declaring an input unreliable

Attributes

Companion
object
Graph
Supertypes
class Dasu
class Object
trait Matchable
class Any

Members list

Value members

Concrete methods

override def ack(alarmIdentifier: Identifier): Boolean

ACK the alarm if the ASCE that produces it runs in this DASU.

ACK the alarm if the ASCE that produces it runs in this DASU.

The DASU delegates the acknowledgment to the ASCE that produces the alarm.

This function checks if the id of the DASU in the alarm identifier matches with its own identifier and if the ASCE (again from the Identifier of the alarm) is actually running in this DASU before forwarding the ack to the ASCE.

Value parameters

alarmIdentifier

the identifier of the alarm to ACK

Attributes

Returns

true if the alarm has been ACKed, false otherwise

See also
Definition Classes

Calculate the validity of the output depending on its last update time. The DASU consider that the IASIO is valid if it has been generated resh validityThreshold milliseconds before.

Calculate the validity of the output depending on its last update time. The DASU consider that the IASIO is valid if it has been generated resh validityThreshold milliseconds before.

Attributes

override def cleanUp(): Unit

Release all the resources before exiting

Release all the resources before exiting

Attributes

Definition Classes
def enableAutoRefreshOfOutput(enable: Boolean): Unit

Enable/disable the automatic update of the output in case no new inputs arrive.

Enable/disable the automatic update of the output in case no new inputs arrive.

Most likely, the value of the output remains the same while the validity could change.

Attributes

def getAsceIds(): Set[String]

Attributes

Returns

the IDs of the ASCEs running in the DASU

def getInputIds(): Set[String]

Attributes

Returns

the IDs of the inputs of the DASU

def getInputsOfAsce(id: String): Set[String]

Attributes

Returns

the IDs of the of the inputs of the ASCE with the given ID

def hasInputsToProcess: Boolean

Check if there are inputs to process

Check if there are inputs to process

Attributes

def hasScheduledTask: Boolean

Check if the a task for processing inputs has been scheduled

Check if the a task for processing inputs has been scheduled

Attributes

Returns

true if a task for processing inputs has been scheduled

override def inputsReceived(iasios: Iterable[IASValue[_]]): Unit

New inputs have been received from the BSDB: the recalculation of the output is performed by a thread: the method returns immediately after scheduling the recalculation

New inputs have been received from the BSDB: the recalculation of the output is performed by a thread: the method returns immediately after scheduling the recalculation

This method is not invoked while automatically re-sending the last computed value. Such value is in fact stored into lastSentOutput and does not trigger a recalculation by the DASU.

This method gets the inputs received from the BSDB and calculate the output running a thread. The calculation of the output is delayed if the throttling is in place by scheduling a task to be run after the end of the throttling window. If the throttling is not in place, the output is calculated concurrently by running a task immediately.

This method only schedules the thread to recalculate the output when new inputs are received and the thread has not been already scheduled or is running.

There is one case that is not covered here:

  • the thread is running and new inputs arrive before it terminates
  • no new inputs arrive This case is corner case the likely will never happens when many inputs are continously produced. However if it happens the inputs recived in this situation will never be processsed (they will only as soon as a new inputs arrives). For this reason, when the thread that calculates the new output terminates, it checks if there are new inputs and in that case it schedules a new execution of the output calculation.

Value parameters

iasios

the inputs received

Attributes

See also

InputsListener

Definition Classes
def mustSendOutput(oldOutput: InOut[_], oldValidity: Validity, newOutput: InOut[_], newValidity: Validity): Boolean

Check if the new output must be sent to the BSDB i.e if its mode or value or validity has changed

Check if the new output must be sent to the BSDB i.e if its mode or value or validity has changed

Value parameters

newOutput

the new output

newValidity

the new validity

oldOutput

the last output sent to the BSDB

oldValidity

the last validity sent to the BSDB

Attributes

Returns

true if the new output changed from the last sent output and must be sent to the BSDB and false otherwise

Schedule the next recalculation of the output based on

Schedule the next recalculation of the output based on

  • the presence of new inputs
  • the point in time when the last calculation ended and the throttling time.

No recalulcalation is scheduled if there are no new inputs to process. The delay depends on the time when the last calculation ended and the throttling time.

Attributes

def start(): Try[Unit]

Start getting events from the inputs subscriber to produce the output

Start getting events from the inputs subscriber to produce the output

Attributes

Concrete fields

val asceDaos: List[AsceDao]

The configuration of the ASCEs that run in the DASU

The configuration of the ASCEs that run in the DASU

Attributes

The ASCE that produces the output

The ASCE that produces the output

Attributes

val asces: Map[String, ComputingElement[_]]
val ascesInitedOk: Boolean
val autoSendTimerTask: AtomicReference[ScheduledFuture[_]]

The task that timely send the last computed output when no new inputs arrived: initially disabled, must be enabled invoking enableAutoRefreshOfOutput.

The task that timely send the last computed output when no new inputs arrived: initially disabled, must be enabled invoking enableAutoRefreshOfOutput.

If a new output is generated before this time interval elapses, this task is delayed of the duration of autoSendTimeInterval msecs.

Attributes

val calcEndTime: AtomicLong

The point in time when the DASU finished calculating the output for the last time.

The point in time when the DASU finished calculating the output for the last time.

It is used to schedulate the next generation of the output against the throttling time

Together with calcStartTime can be used to calculate how long does the DASU take to generate the output

Attributes

val calcStartTime: AtomicLong

The point in time when the DASU started calculating the output for the last time

The point in time when the DASU started calculating the output for the last time

Together with calcEndTime can be used to calculate how long does the DASU take to generate the output

Attributes

val closed: AtomicBoolean

Closed: the DASU does not process inputs

Closed: the DASU does not process inputs

Attributes

val dasuOutputId: String
val delayedUpdateTask: Runnable

The Runnable to update the output when it is delayed by the throttling

The Runnable to update the output when it is delayed by the throttling

Attributes

val fullRunningIdsOfInputs: Map[String, String]

The fullRuning Ids of the received inputs

The fullRuning Ids of the received inputs

This map must be taken synchronized because it is accessed by several threads

Attributes

val lastCalculatedOutput: AtomicReference[Option[IASValue[_]]]

The last calculated output by ASCEs

The last calculated output by ASCEs

Attributes

val lastSentOutputAndValidity: AtomicReference[Option[(InOut[_], IasValidity)]]

The last sent output and validity this is sent again if no new inputs arrives and the autoSendTimerTask is running and the validity did not change since the last sending

The last sent output and validity this is sent again if no new inputs arrives and the autoSendTimerTask is running and the validity did not change since the last sending

Attributes

val lastSentTime: AtomicLong

The point in time when the output has been sent to the BSDB either due to new inputs or auto-refresh

The point in time when the output has been sent to the BSDB either due to new inputs or auto-refresh

It is better to initialize at the actual timestamp for the calculation of the throttling in inputsReceived

Attributes

val lastUpdateTime: AtomicLong

The point in time of the last time when the output has been generated and published

The point in time of the last time when the output has been generated and published

Attributes

val notYetProcessedInputs: Map[String, IASValue[_]]

Values that have been received in input from plugins or other DASUs (BSDB) and not yet processed by the ASCEs

Values that have been received in input from plugins or other DASUs (BSDB) and not yet processed by the ASCEs

This map must be taken synchronized because it is accessed by several threads

Attributes

The thread executor service

The thread executor service

Attributes

val started: AtomicBoolean

True if the DASU has been started

True if the DASU has been started

Attributes

The generator of statistics

The generator of statistics

Attributes

val throttlingTask: AtomicReference[Option[ScheduledFuture[_]]]

The task to delay the generation the output when new inputs must be processed

The task to delay the generation the output when new inputs must be processed

It is defined only when a task is running or scheduled to run in a near future.

Attributes

val timelyRefreshing: AtomicBoolean

True if the automatic re-sending of the output has been enabled

True if the automatic re-sending of the output has been enabled

Attributes

Inherited fields

Auto send time interval in milliseconds

Auto send time interval in milliseconds

Attributes

Inherited from:
Dasu
val fromTemplate: Boolean

True if the DASU has been generated from a template, False otherwise

True if the DASU has been generated from a template, False otherwise

Attributes

Inherited from:
Dasu
val id: String

The ID of the DASU

The ID of the DASU

Attributes

Inherited from:
Dasu
lazy val templateInstance: Option[Int]

The number of the instance if the DASU has been generated from a template; empty otherwise

The number of the instance if the DASU has been generated from a template; empty otherwise

Attributes

Inherited from:
Dasu
val throttling: Long

The minimum allowed refresh rate when a flow of inputs arrive (i.e. the throttiling) is given by Dasu.DefaultMinAllowedRefreshRate, if not overridden by a java property

The minimum allowed refresh rate when a flow of inputs arrive (i.e. the throttiling) is given by Dasu.DefaultMinAllowedRefreshRate, if not overridden by a java property

Attributes

Inherited from:
Dasu

The validityThreshold in milliseconds

The validityThreshold in milliseconds

Attributes

Inherited from:
Dasu