Package org.eso.ias.command.kafka
Class CommandManagerKafkaImpl
java.lang.Object
org.eso.ias.command.CommandManager
org.eso.ias.command.kafka.CommandManagerKafkaImpl
- All Implemented Interfaces:
Runnable
,SimpleStringConsumer.KafkaConsumerListener
public class CommandManagerKafkaImpl
extends CommandManager
implements SimpleStringConsumer.KafkaConsumerListener, Runnable
The
CommandManagerKafkaImpl
is the command executor that subscribes as a consumer of the kafka command topic
to get commands and as producer of the kafka reply topic to publish replies
Its task is to receive the commands on behalf of the process where it runs, discarding the commands targeted to
other processes.
Commands are forwarded to the listener for execution and the replies published
in the reply topic. SHUTDOWN and RESTART must be executed by the CommandManager
because a
reply must be sent before shutting down however the lister is invoked to run customized code.
Commands are executed in a dedicated thread in FIFO order.
Received commands are queued and discarded when the queue is full as we do not expect many commands.-
Field Summary
Modifier and TypeFieldDescriptionstatic final int
Max number of commands in the queue waiting to be processed.Fields inherited from class org.eso.ias.command.CommandManager
id
-
Constructor Summary
ConstructorDescriptionCommandManagerKafkaImpl
(String id, String servers, SimpleStringProducer repliesProducer) Constructor -
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
Close the producer and the consumer and release all the allocated resources.void
run()
Fetch commands from the queue and sends them to the lister for execution.void
start
(CommandListener commandListener, AutoCloseable closeable) Start getting events from the command topic and send them to the passed listener.void
stringEventReceived
(String event) Get the JSON strings representing commands.
-
Field Details
-
LENGTH_OF_CMD_QUEUE
public static final int LENGTH_OF_CMD_QUEUEMax number of commands in the queue waiting to be processed. Commands arriving when the queue is full are rejected- See Also:
-
-
Constructor Details
-
CommandManagerKafkaImpl
Constructor- Parameters:
id
- the id of the processservers
- The servers to connect torepliesProducer
- The producer of replies in the rpely topic
-
-
Method Details
-
start
public void start(CommandListener commandListener, AutoCloseable closeable) throws KafkaUtilsException Start getting events from the command topic and send them to the passed listener. The listener is usually an instance ofDefaultCommandExecutor
or an object extendingDefaultCommandExecutor
to customize the commands- Specified by:
start
in classCommandManager
- Parameters:
commandListener
- The listener of commands that execute all the commandscloseable
- The closeable class to free the resources while exiting/restating- Throws:
KafkaUtilsException
-
close
public void close()Close the producer and the consumer and release all the allocated resources.- Specified by:
close
in classCommandManager
-
stringEventReceived
Get the JSON strings representing commands. The string is converted into aCommandMessage
and pushed in thecmdsToProcess
queue only if the destination of the command has the same ID of the manager. Another thread, (run()
, is in charge of poling commands from the queue and send them to the lister for execution.- Specified by:
stringEventReceived
in interfaceSimpleStringConsumer.KafkaConsumerListener
- Parameters:
event
- The string received in the topic
-
run
public void run()Fetch commands from the queue and sends them to the lister for execution. The return code and properties provided by the listener are packed into the reply before being sent to the reply topic. This method catches the case fo an error from the lister implementation of the command and pushes a reply with an ERROR return code in the reply topic. Note: the only commands that are directly executed by this method are SHUTDOWN and RESTART because in these cases the reply must be sent before before shutting down the process.
-