Package org.eso.ias.kafkautils
Class SimpleStringProducer
java.lang.Object
org.eso.ias.kafkautils.SimpleStringProducer
SimpleStringProducer pushes strings on a Kafka topic.
The producer delegates to a
KafkaProducer and allows
to push data to different topics so that only one instance of this publisher can be used
instead of many SimpleStringProducer or equivalent.
According to the documentation, the KafkaProducer is,
in fact thought to be used for this purpose: it is thread safe and generally more efficient than having
multiple producers in the same process.
From a performance point of view, there is a saturation point for th eproducer after which the preformances degrade.
When the saturation point has been reached, the only way to increase performances is to add new producers.
USAGE: normally only one SimpleStringProducer is used for pushing strings in all the topics.
If there are performance problems then instantiate more SimpleStringProducers.
Kafka properties are fully customizable by calling setUp(Properties):
defaults values are used for the missing properties.- Author:
- acaproni
-
Field Summary
FieldsModifier and TypeFieldDescriptionfinal StringA list of comma separated servers:port to connect to -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidflush()Ensures all the records have been delivered to the broker especially useful while sending records asynchronously and want be sure they have all beel sent.intvoidAsynchronously pushes a string in a kafka topic.protected voidpush(String value, String topic, Integer partition, String key, boolean sync, int timeout, TimeUnit unit) Pushes the passed string in the given partition and key.voidSynchronously pushes the passed string in the topicvoidsetUp()Initialize the producer with default propertiesvoidsetUp(Properties props) Initialize the producer with the given propertiesvoidtearDown()Closes the producer
-
Field Details
-
bootstrapServers
A list of comma separated servers:port to connect to
-
-
Constructor Details
-
SimpleStringProducer
Constructor- Parameters:
servers- The list of kafka servers to connect toclientID- The unique identifier of this producer (mapped to kafka client.id)
-
-
Method Details
-
setUp
public void setUp()Initialize the producer with default properties -
setUp
Initialize the producer with the given propertiesServers and ID passed in the constructor override those in the passed properties
- Parameters:
props- user defined properties
-
tearDown
public void tearDown()Closes the producer -
push
public void push(String value, String topic, Integer partition, String key) throws KafkaUtilsException Asynchronously pushes a string in a kafka topic.- Parameters:
value- The notnullnor empty string to publish in the topictopic- The notnullnor empty topic to push the string intopartition- The partitionkey- The key- Throws:
KafkaUtilsException- in case of error sending the value
-
push
public void push(String value, String topic, Integer partition, String key, int timeout, TimeUnit unit) throws KafkaUtilsException Synchronously pushes the passed string in the topic- Parameters:
value- The notnullnor empty string to publish in the topictopic- The notnullnor empty topic to push the string intopartition- The partitionkey- The keytimeout- the time to wait if sync is setunit- the unit of the timeout- Throws:
KafkaUtilsException- in case of error or timeout sending the value
-
push
protected void push(String value, String topic, Integer partition, String key, boolean sync, int timeout, TimeUnit unit) throws KafkaUtilsException Pushes the passed string in the given partition and key.Kafka sending is asynch. i.e. this methods returns before the actual value is effectively published in the topic unless the sync parameter is set to
truein which case send waits for the effective sending- Parameters:
value- The notnullnor empty string to publish in the topictopic- The notnullnor empty topic to push the string intopartition- The partitionkey- The keysync- If true the methods returntimeout- the time to wait if sync is setunit- the unit of the timeout- Throws:
KafkaUtilsException- in case of error or timeout sending the value
-
flush
public void flush()Ensures all the records have been delivered to the broker especially useful while sending records asynchronously and want be sure they have all beel sent. -
getNumOfStringsSent
public int getNumOfStringsSent()
-