| |
- IasKafkaUtils.IasKafkaConsumer.IasLogListener(builtins.object)
-
- KafkaLogListener
- builtins.object
-
- IasCommandListener
- cimpl.Producer
- threading.Thread(builtins.object)
-
- IasCmdManagerKafka
class IasCmdManagerKafka(threading.Thread) |
|
IasCmdManagerKafka(full_run_id, listener: IasCmdReply.IasCmdManagerKafka.IasCommandListener, kbrokers='localhost:9092', replyProducer: cimpl.Producer | None = None, kclient_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432', kgroup_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432')
This is the python equivalent of the java CommandManagerKafkaIMpl
but with rduced functionalities as up to now there are no python clients that get comamnds
and send replies.
This class is the counter part of IasCommandsSender:
- gets commands from the kafka command topic,
- forwards them to the listener for precessing
- sends replies to the kafka reply topic |
|
- Method resolution order:
- IasCmdManagerKafka
- threading.Thread
- builtins.object
Methods defined here:
- __init__(self, full_run_id, listener: IasCmdReply.IasCmdManagerKafka.IasCommandListener, kbrokers='localhost:9092', replyProducer: cimpl.Producer | None = None, kclient_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432', kgroup_id='IasCmdReply.IasCmdManagerKafka2025-01-10T15:08:17.432')
- Constructor
Params:
full_run_id: the full runing ID of the process
listener: the listener of commands
replyProducer the producer of replies, if None a new one is built
kbrokers Kafka brokers
kclient_id The ID of the Kafka client
kgroup_id The id of the Kafka group
- close(self)
- Stop getting commands
- run(self)
- The method executed by the tread that sends commands to the lsitener
and pushes replies in the topic
- start(self)
- Start getting commands
Methods inherited from threading.Thread:
- __repr__(self)
- Return repr(self).
- getName(self)
- Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
- isDaemon(self)
- Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
- is_alive(self)
- Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
- join(self, timeout=None)
- Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating-point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
- setDaemon(self, daemonic)
- Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
- setName(self, name)
- Set the name string for this thread.
This method is deprecated, use the name attribute instead.
Readonly properties inherited from threading.Thread:
- ident
- Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the get_ident() function. Thread
identifiers may be recycled when a thread exits and another thread is
created. The identifier is available even after the thread has exited.
- native_id
- Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function.
This represents the Thread ID as reported by the kernel.
Data descriptors inherited from threading.Thread:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
- daemon
- A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is
raised. Its initial value is inherited from the creating thread; the
main thread is not a daemon thread and therefore all threads created in
the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
- name
- A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The
initial name is set by the constructor.
|
class IasCommandListener(builtins.object) |
|
The listener of commands read from IAS cmd topic |
|
Methods defined here:
- cmdReceived(self, cmd: IasCmdReply.IasCommand.IasCommand) -> IasCmdReply.IasCmdExitStatus.IasCmdExitStatus
- The callback to notify of a new new command to process
The implementation must override this method
Args:
cmd received from th ekafka topic
Returns:
the exit code afetr processing the command to be set in the reply
Data descriptors defined here:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
|
class Producer(builtins.object) |
|
Asynchronous Kafka Producer
.. py:function:: Producer(config)
:param dict config: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set
Create a new Producer instance using the provided configuration dict.
.. py:function:: __len__(self)
Producer implements __len__ that can be used as len(producer) to obtain number of messages waiting.
:returns: Number of messages and Kafka protocol requests waiting to be delivered to broker.
:rtype: int |
|
Methods defined here:
- __bool__(self, /)
- True if self else False
- __init__(self, /, *args, **kwargs)
- Initialize self. See help(type(self)) for accurate signature.
- __len__(self, /)
- Return len(self).
- abort_transaction(...)
- .. py:function:: abort_transaction([timeout])
Aborts the current transaction.
This function should also be used to recover from non-fatal
abortable transaction errors when KafkaError.txn_requires_abort()
is True.
Any outstanding messages will be purged and fail with
_PURGE_INFLIGHT or _PURGE_QUEUE.
Note: This function will block until all outstanding messages
are purged and the transaction abort request has been
successfully handled by the transaction coordinator, or until
the timeout expires, which ever comes first. On timeout the
application may call the function again.
Note: Will automatically call purge() and flush() to ensure
all queued and in-flight messages are purged before attempting
to abort the transaction.
:param float timeout: The maximum amount of time to block
waiting for transaction to abort in seconds.
:raises: KafkaError: Use exc.args[0].retriable() to check if the
operation may be retried.
Treat any other error as a fatal error.
- begin_transaction(...)
- .. py:function:: begin_transaction()
Begin a new transaction.
init_transactions() must have been called successfully (once)
before this function is called.
Any messages produced or offsets sent to a transaction, after
the successful return of this function will be part of the
transaction and committed or aborted atomically.
Complete the transaction by calling commit_transaction() or
Abort the transaction by calling abort_transaction().
:raises: KafkaError: Use exc.args[0].retriable() to check if the
operation may be retried, else treat the
error as a fatal error.
- commit_transaction(...)
- .. py:function:: commit_transaction([timeout])
Commmit the current transaction.
Any outstanding messages will be flushed (delivered) before
actually committing the transaction.
If any of the outstanding messages fail permanently the current
transaction will enter the abortable error state and this
function will return an abortable error, in this case the
application must call abort_transaction() before attempting
a new transaction with begin_transaction().
Note: This function will block until all outstanding messages
are delivered and the transaction commit request has been
successfully handled by the transaction coordinator, or until
the timeout expires, which ever comes first. On timeout the
application may call the function again.
Note: Will automatically call flush() to ensure all queued
messages are delivered before attempting to commit the
transaction. Delivery reports and other callbacks may thus be
triggered from this method.
:param float timeout: The amount of time to block in seconds.
:raises: KafkaError: Use exc.args[0].retriable() to check if the
operation may be retried, or
exc.args[0].txn_requires_abort() if the current
transaction has failed and must be aborted by calling
abort_transaction() and then start a new transaction
with begin_transaction().
Treat any other error as a fatal error.
- flush(...)
- .. py:function:: flush([timeout])
Wait for all messages in the Producer queue to be delivered.
This is a convenience method that calls :py:func:`poll()` until :py:func:`len()` is zero or the optional timeout elapses.
:param: float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)
:returns: Number of messages still in queue.
.. note:: See :py:func:`poll()` for a description on what callbacks may be triggered.
- init_transactions(...)
- .. py:function: init_transactions([timeout])
Initializes transactions for the producer instance.
This function ensures any transactions initiated by previous
instances of the producer with the same `transactional.id` are
completed.
If the previous instance failed with a transaction in progress
the previous transaction will be aborted.
This function needs to be called before any other transactional
or produce functions are called when the `transactional.id` is
configured.
If the last transaction had begun completion (following
transaction commit) but not yet finished, this function will
await the previous transaction's completion.
When any previous transactions have been fenced this function
will acquire the internal producer id and epoch, used in all
future transactional messages issued by this producer instance.
Upon successful return from this function the application has to
perform at least one of the following operations within
`transaction.timeout.ms` to avoid timing out the transaction
on the broker:
* produce() (et.al)
* send_offsets_to_transaction()
* commit_transaction()
* abort_transaction()
:param float timeout: Maximum time to block in seconds.
:raises: KafkaError: Use exc.args[0].retriable() to check if the
operation may be retried, else treat the
error as a fatal error.
- list_topics(...)
- .. py:function:: list_topics([topic=None], [timeout=-1])
Request metadata from the cluster.
This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
:param str topic: If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
:param float timeout: The maximum response time before timing out, or -1 for infinite timeout.
:rtype: ClusterMetadata
:raises: KafkaException
- poll(...)
- .. py:function:: poll([timeout])
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
- ``on_delivery`` callbacks from :py:func:`produce()`
- ...
:param float timeout: Maximum time to block waiting for events. (Seconds)
:returns: Number of events processed (callbacks served)
:rtype: int
- produce(...)
- .. py:function:: produce(topic, [value], [key], [partition], [on_delivery], [timestamp], [headers])
Produce message to topic.
This is an asynchronous operation, an application may use the ``callback`` (alias ``on_delivery``) argument to pass a function (or lambda) that will be called from :py:func:`poll()` when the message has been successfully delivered or permanently fails delivery.
Currently message headers are not supported on the message returned to the callback. The ``msg.headers()`` will return None even if the original message had headers set.
:param str topic: Topic to produce message to
:param str|bytes value: Message payload
:param str|bytes key: Message key
:param int partition: Partition to produce to, else uses the configured built-in partitioner.
:param func on_delivery(err,msg): Delivery report callback to call (from :py:func:`poll()` or :py:func:`flush()`) on successful or failed delivery
:param int timestamp: Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
:param dict|list headers: Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
:rtype: None
:raises BufferError: if the internal producer message queue is full (``queue.buffering.max.messages`` exceeded)
:raises KafkaException: for other errors, see exception code
:raises NotImplementedError: if timestamp is specified without underlying library support.
- purge(...)
- .. py:function:: purge([in_queue=True], [in_flight=True], [blocking=True])
Purge messages currently handled by the producer instance.
The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.
:param: bool in_queue: Purge messages from internal queues. By default, true.
:param: bool in_flight: Purge messages in flight to or from the broker. By default, true.
:param: bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true.
- send_offsets_to_transaction(...)
- .. py:function:: send_offsets_to_transaction(positions, group_metadata, [timeout])
Sends a list of topic partition offsets to the consumer group
coordinator for group_metadata and marks the offsets as part
of the current transaction.
These offsets will be considered committed only if the
transaction is committed successfully.
The offsets should be the next message your application will
consume, i.e., the last processed message's offset + 1 for each
partition.
Either track the offsets manually during processing or use
consumer.position() (on the consumer) to get the current offsets
for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop
prior to committing the transaction with commit_transaction().
Note: The consumer must disable auto commits
(set `enable.auto.commit` to false on the consumer).
Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in
offsets will be ignored. If there are no valid offsets in
offsets the function will return successfully and no action
will be taken.
:param list(TopicPartition) offsets: current consumer/processing
position(offsets) for the
list of partitions.
:param object group_metadata: consumer group metadata retrieved
from the input consumer's
get_consumer_group_metadata().
:param float timeout: Amount of time to block in seconds.
:raises: KafkaError: Use exc.args[0].retriable() to check if the
operation may be retried, or
exc.args[0].txn_requires_abort() if the current
transaction has failed and must be aborted by calling
abort_transaction() and then start a new transaction
with begin_transaction().
Treat any other error as a fatal error.
- set_sasl_credentials(...)
- .. py:function:: set_sasl_credentials(username, password)
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.
This method will not disconnect existing broker connections that have been established with the old credentials.
This method is applicable only to SASL PLAIN and SCRAM mechanisms.
Static methods defined here:
- __new__(*args, **kwargs)
- Create and return a new object. See help(type) for accurate signature.
| |