Package org.eso.ias.kafkautils
Class SimpleStringProducer
java.lang.Object
org.eso.ias.kafkautils.SimpleStringProducer
SimpleStringProducer pushes strings on a Kafka topic.
The producer delegates to a
KafkaProducer
and allows
to push data to different topics so that only one instance of this publisher can be used
instead of many SimpleStringProducer
or equivalent.
According to the documentation, the KafkaProducer
is,
in fact thought to be used for this purpose: it is thread safe and generally more efficient than having
multiple producers in the same process.
From a performance point of view, there is a saturation point for th eproducer after which the preformances degrade.
When the saturation point has been reached, the only way to increase performances is to add new producers.
USAGE: normally only one SimpleStringProducer is used for pushing strings in all the topics.
If there are performance problems then instantiate more SimpleStringProducers.
Kafka properties are fully customizable by calling setUp(Properties)
:
defaults values are used for the missing properties.- Author:
- acaproni
-
Field Summary
Modifier and TypeFieldDescriptionfinal String
A list of comma separated servers:port to connect to -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
flush()
Ensures all the records have been delivered to the broker especially useful while sending records asynchronously and want be sure they have all beel sent.int
void
Asynchronously pushes a string in a kafka topic.protected void
push
(String value, String topic, Integer partition, String key, boolean sync, int timeout, TimeUnit unit) Pushes the passed string in the given partition and key.void
Synchronously pushes the passed string in the topicvoid
setUp()
Initialize the producer with default propertiesvoid
setUp
(Properties props) Initialize the producer with the given propertiesvoid
tearDown()
Closes the producer
-
Field Details
-
bootstrapServers
A list of comma separated servers:port to connect to
-
-
Constructor Details
-
SimpleStringProducer
Constructor- Parameters:
servers
- The list of kafka servers to connect toclientID
- The unique identifier of this producer (mapped to kafka client.id)
-
-
Method Details
-
setUp
public void setUp()Initialize the producer with default properties -
setUp
Initialize the producer with the given propertiesServers and ID passed in the constructor override those in the passed properties
- Parameters:
props
- user defined properties
-
tearDown
public void tearDown()Closes the producer -
push
public void push(String value, String topic, Integer partition, String key) throws KafkaUtilsException Asynchronously pushes a string in a kafka topic.- Parameters:
value
- The notnull
nor empty string to publish in the topictopic
- The notnull
nor empty topic to push the string intopartition
- The partitionkey
- The key- Throws:
KafkaUtilsException
- in case of error sending the value
-
push
public void push(String value, String topic, Integer partition, String key, int timeout, TimeUnit unit) throws KafkaUtilsException Synchronously pushes the passed string in the topic- Parameters:
value
- The notnull
nor empty string to publish in the topictopic
- The notnull
nor empty topic to push the string intopartition
- The partitionkey
- The keytimeout
- the time to wait if sync is setunit
- the unit of the timeout- Throws:
KafkaUtilsException
- in case of error or timeout sending the value
-
push
protected void push(String value, String topic, Integer partition, String key, boolean sync, int timeout, TimeUnit unit) throws KafkaUtilsException Pushes the passed string in the given partition and key.Kafka sending is asynch. i.e. this methods returns before the actual value is effectively published in the topic unless the sync parameter is set to
true
in which case send waits for the effective sending- Parameters:
value
- The notnull
nor empty string to publish in the topictopic
- The notnull
nor empty topic to push the string intopartition
- The partitionkey
- The keysync
- If true the methods returntimeout
- the time to wait if sync is setunit
- the unit of the timeout- Throws:
KafkaUtilsException
- in case of error or timeout sending the value
-
flush
public void flush()Ensures all the records have been delivered to the broker especially useful while sending records asynchronously and want be sure they have all beel sent. -
getNumOfStringsSent
public int getNumOfStringsSent()
-