IasCmdReply.IasCmdManagerKafka
index
/home/fedora/IasRoot/lib/python3.12/site-packages/IasCmdReply/IasCmdManagerKafka.py

 
Modules
       
time

 
Classes
       
IasKafkaUtils.IasKafkaConsumer.IasLogListener(builtins.object)
KafkaLogListener
builtins.object
IasCommandListener
cimpl.Producer
threading.Thread(builtins.object)
IasCmdManagerKafka

 
class IasCmdManagerKafka(threading.Thread)
    IasCmdManagerKafka(full_run_id, listener: IasCmdReply.IasCmdManagerKafka.IasCommandListener, kbrokers='localhost:9092', replyProducer: cimpl.Producer | None = None, kclient_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432', kgroup_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432')
 
This is the python equivalent of the java CommandManagerKafkaIMpl 
but with rduced functionalities as up to now there are no python clients that get comamnds
and send replies.
 
This class is the counter part of IasCommandsSender:
 -  gets commands from the kafka command topic, 
 -  forwards them to the listener for precessing
 -  sends replies to the kafka reply topic
 
 
Method resolution order:
IasCmdManagerKafka
threading.Thread
builtins.object

Methods defined here:
__init__(self, full_run_id, listener: IasCmdReply.IasCmdManagerKafka.IasCommandListener, kbrokers='localhost:9092', replyProducer: cimpl.Producer | None = None, kclient_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432', kgroup_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432')
Constructor
 
Params:
    full_run_id: the full runing ID of the process
    listener: the listener of commands
    replyProducer the producer of replies, if None a new one is built
    kbrokers Kafka brokers
    kclient_id The ID of the Kafka client
    kgroup_id The id of the Kafka group
close(self)
Stop getting commands
run(self)
The method executed by the tread that sends commands to the lsitener
and pushes replies in the topic
start(self)
Start getting commands

Methods inherited from threading.Thread:
__repr__(self)
Return repr(self).
getName(self)
Return a string used for identification purposes only.
 
This method is deprecated, use the name attribute instead.
isDaemon(self)
Return whether this thread is a daemon.
 
This method is deprecated, use the daemon attribute instead.
is_alive(self)
Return whether the thread is alive.
 
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
join(self, timeout=None)
Wait until the thread terminates.
 
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
 
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
 
When the timeout argument is not present or None, the operation will
block until the thread terminates.
 
A thread can be join()ed many times.
 
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
setDaemon(self, daemonic)
Set whether this thread is a daemon.
 
This method is deprecated, use the .daemon property instead.
setName(self, name)
Set the name string for this thread.
 
This method is deprecated, use the name attribute instead.

Readonly properties inherited from threading.Thread:
ident
Thread identifier of this thread or None if it has not been started.
 
This is a nonzero integer. See the get_ident() function. Thread
identifiers may be recycled when a thread exits and another thread is
created. The identifier is available even after the thread has exited.
native_id
Native integral thread ID of this thread, or None if it has not been started.
 
This is a non-negative integer. See the get_native_id() function.
This represents the Thread ID as reported by the kernel.

Data descriptors inherited from threading.Thread:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object
daemon
A boolean value indicating whether this thread is a daemon thread.
 
This must be set before start() is called, otherwise RuntimeError is
raised. Its initial value is inherited from the creating thread; the
main thread is not a daemon thread and therefore all threads created in
the main thread default to daemon = False.
 
The entire Python program exits when only daemon threads are left.
name
A string used for identification purposes only.
 
It has no semantics. Multiple threads may be given the same name. The
initial name is set by the constructor.

 
class IasCommandListener(builtins.object)
    The listener of commands read from IAS cmd topic
 
  Methods defined here:
cmdReceived(self, cmd: IasCmdReply.IasCommand.IasCommand) -> IasCmdReply.IasCmdExitStatus.IasCmdExitStatus
The callback to notify of a new new command to process
 
The implementation must override this method
 
Args:
   cmd received from th ekafka topic
Returns:
    the exit code afetr processing the command to be set in the reply

Data descriptors defined here:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

 
class KafkaLogListener(IasKafkaUtils.IasKafkaConsumer.IasLogListener)
    KafkaLogListener(logs: queue.Queue, full_run_id: str)
 
The listener of comands from the kafka topic
 
The listener discards the logs that are not for this process (identified
by the full running ID).
It stores the accpted logs in the queue together with the timestamp.
The logs will be fetched and processed by the tread of the IasCommandManagerKafka object
 
 
Method resolution order:
KafkaLogListener
IasKafkaUtils.IasKafkaConsumer.IasLogListener
builtins.object

Methods defined here:
__init__(self, logs: queue.Queue, full_run_id: str)
Constructor
 
Params:
    logs: The queue to store IasCommands into
    full_run_id: the full runing ID of the process
iasLogReceived(self, log: str) -> None
The callback to notify new IasValues
received from the BSDB
 
The implementation must override this method

Data descriptors inherited from IasKafkaUtils.IasKafkaConsumer.IasLogListener:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

 
class Producer(builtins.object)
    Asynchronous Kafka Producer
 
.. py:function:: Producer(config)
 
  :param dict config: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set
 
  Create a new Producer instance using the provided configuration dict.
 
 
.. py:function:: __len__(self)
 
  Producer implements __len__ that can be used as len(producer) to obtain number of messages waiting.
  :returns: Number of messages and Kafka protocol requests waiting to be delivered to broker.
  :rtype: int
 
  Methods defined here:
__bool__(self, /)
True if self else False
__init__(self, /, *args, **kwargs)
Initialize self.  See help(type(self)) for accurate signature.
__len__(self, /)
Return len(self).
abort_transaction(...)
.. py:function:: abort_transaction([timeout])
 
Aborts the current transaction.
This function should also be used to recover from non-fatal
abortable transaction errors when KafkaError.txn_requires_abort()
is True.
 
Any outstanding messages will be purged and fail with
_PURGE_INFLIGHT or _PURGE_QUEUE.
 
Note: This function will block until all outstanding messages
are purged and the transaction abort request has been
successfully handled by the transaction coordinator, or until
the timeout expires, which ever comes first. On timeout the
application may call the function again.
 
Note: Will automatically call purge() and flush()  to ensure
all queued and in-flight messages are purged before attempting
to abort the transaction.
 
:param float timeout: The maximum amount of time to block
     waiting for transaction to abort in seconds.
 
:raises: KafkaError: Use exc.args[0].retriable() to check if the
         operation may be retried.
         Treat any other error as a fatal error.
begin_transaction(...)
.. py:function:: begin_transaction()
 
Begin a new transaction.
 
init_transactions() must have been called successfully (once)
before this function is called.
 
Any messages produced or offsets sent to a transaction, after
the successful return of this function will be part of the
transaction and committed or aborted atomically.
 
Complete the transaction by calling commit_transaction() or
Abort the transaction by calling abort_transaction().
 
:raises: KafkaError: Use exc.args[0].retriable() to check if the
                     operation may be retried, else treat the
                     error as a fatal error.
commit_transaction(...)
.. py:function:: commit_transaction([timeout])
 
Commmit the current transaction.
Any outstanding messages will be flushed (delivered) before
actually committing the transaction.
 
If any of the outstanding messages fail permanently the current
transaction will enter the abortable error state and this
function will return an abortable error, in this case the
application must call abort_transaction() before attempting
a new transaction with begin_transaction().
 
Note: This function will block until all outstanding messages
are delivered and the transaction commit request has been
successfully handled by the transaction coordinator, or until
the timeout expires, which ever comes first. On timeout the
application may call the function again.
 
Note: Will automatically call flush() to ensure all queued
messages are delivered before attempting to commit the
transaction. Delivery reports and other callbacks may thus be
triggered from this method.
 
:param float timeout: The amount of time to block in seconds.
 
:raises: KafkaError: Use exc.args[0].retriable() to check if the
         operation may be retried, or
         exc.args[0].txn_requires_abort() if the current
         transaction has failed and must be aborted by calling
         abort_transaction() and then start a new transaction
         with begin_transaction().
         Treat any other error as a fatal error.
flush(...)
.. py:function:: flush([timeout])
 
   Wait for all messages in the Producer queue to be delivered.
   This is a convenience method that calls :py:func:`poll()` until :py:func:`len()` is zero or the optional timeout elapses.
 
  :param: float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)
  :returns: Number of messages still in queue.
 
.. note:: See :py:func:`poll()` for a description on what callbacks may be triggered.
init_transactions(...)
.. py:function: init_transactions([timeout])
 
Initializes transactions for the producer instance.
 
This function ensures any transactions initiated by previous
instances of the producer with the same `transactional.id` are
completed.
If the previous instance failed with a transaction in progress
the previous transaction will be aborted.
This function needs to be called before any other transactional
or produce functions are called when the `transactional.id` is
configured.
 
If the last transaction had begun completion (following
transaction commit) but not yet finished, this function will
await the previous transaction's completion.
 
When any previous transactions have been fenced this function
will acquire the internal producer id and epoch, used in all
future transactional messages issued by this producer instance.
 
Upon successful return from this function the application has to
perform at least one of the following operations within 
`transaction.timeout.ms` to avoid timing out the transaction
on the broker:
produce() (et.al)
send_offsets_to_transaction()
commit_transaction()
abort_transaction()
 
:param float timeout: Maximum time to block in seconds.
 
:raises: KafkaError: Use exc.args[0].retriable() to check if the
                     operation may be retried, else treat the
                     error as a fatal error.
list_topics(...)
.. py:function:: list_topics([topic=None], [timeout=-1])
 
Request metadata from the cluster.
This method provides the same information as  listTopics(), describeTopics() and describeCluster() in  the Java Admin client.
 
:param str topic: If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
:param float timeout: The maximum response time before timing out, or -1 for infinite timeout.
:rtype: ClusterMetadata
:raises: KafkaException
poll(...)
.. py:function:: poll([timeout])
 
Polls the producer for events and calls the corresponding callbacks (if registered).
 
Callbacks:
 
- ``on_delivery`` callbacks from :py:func:`produce()`
- ...
 
:param float timeout: Maximum time to block waiting for events. (Seconds)
:returns: Number of events processed (callbacks served)
:rtype: int
produce(...)
.. py:function:: produce(topic, [value], [key], [partition], [on_delivery], [timestamp], [headers])
 
Produce message to topic.
This is an asynchronous operation, an application may use the ``callback`` (alias ``on_delivery``) argument to pass a function (or lambda) that will be called from :py:func:`poll()` when the message has been successfully delivered or permanently fails delivery.
 
Currently message headers are not supported on the message returned to the callback. The ``msg.headers()`` will return None even if the original message had headers set.
 
:param str topic: Topic to produce message to
:param str|bytes value: Message payload
:param str|bytes key: Message key
:param int partition: Partition to produce to, else uses the configured built-in partitioner.
:param func on_delivery(err,msg): Delivery report callback to call (from :py:func:`poll()` or :py:func:`flush()`) on successful or failed delivery
:param int timestamp: Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
 
:param dict|list headers: Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
:rtype: None
:raises BufferError: if the internal producer message queue is full (``queue.buffering.max.messages`` exceeded)
:raises KafkaException: for other errors, see exception code
:raises NotImplementedError: if timestamp is specified without underlying library support.
purge(...)
.. py:function:: purge([in_queue=True], [in_flight=True], [blocking=True])
 
 Purge messages currently handled by the producer instance.
 The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.
 
:param: bool in_queue: Purge messages from internal queues. By default, true.
:param: bool in_flight: Purge messages in flight to or from the broker. By default, true.
:param: bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true.
send_offsets_to_transaction(...)
.. py:function:: send_offsets_to_transaction(positions, group_metadata, [timeout])
 
Sends a list of topic partition offsets to the consumer group
coordinator for group_metadata and marks the offsets as part
of the current transaction.
These offsets will be considered committed only if the
transaction is committed successfully.
 
The offsets should be the next message your application will
consume, i.e., the last processed message's offset + 1 for each
partition.
Either track the offsets manually during processing or use
consumer.position() (on the consumer) to get the current offsets
for the partitions assigned to the consumer.
 
Use this method at the end of a consume-transform-produce loop
prior to committing the transaction with commit_transaction().
 
Note: The consumer must disable auto commits
      (set `enable.auto.commit` to false on the consumer).
 
Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in
offsets will be ignored. If there are no valid offsets in
offsets the function will return successfully and no action
will be taken.
 
:param list(TopicPartition) offsets: current consumer/processing
                                     position(offsets) for the
                                     list of partitions.
:param object group_metadata: consumer group metadata retrieved
                              from the input consumer's
                              get_consumer_group_metadata().
:param float timeout: Amount of time to block in seconds.
 
:raises: KafkaError: Use exc.args[0].retriable() to check if the
         operation may be retried, or
         exc.args[0].txn_requires_abort() if the current
         transaction has failed and must be aborted by calling
         abort_transaction() and then start a new transaction
         with begin_transaction().
         Treat any other error as a fatal error.
set_sasl_credentials(...)
.. py:function:: set_sasl_credentials(username, password)
 
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.
This method will not disconnect existing broker connections that have been established with the old credentials.
This method is applicable only to SASL PLAIN and SCRAM mechanisms.

Static methods defined here:
__new__(*args, **kwargs)
Create and return a new object.  See help(type) for accurate signature.