Class SimpleStringProducer

java.lang.Object
org.eso.ias.kafkautils.SimpleStringProducer

public class SimpleStringProducer extends Object
SimpleStringProducer pushes strings on a Kafka topic. The producer delegates to a KafkaProducer and allows to push data to different topics so that only one instance of this publisher can be used instead of many SimpleStringProducer or equivalent. According to the documentation, the KafkaProducer is, in fact thought to be used for this purpose: it is thread safe and generally more efficient than having multiple producers in the same process. From a performance point of view, there is a saturation point for th eproducer after which the preformances degrade. When the saturation point has been reached, the only way to increase performances is to add new producers. USAGE: normally only one SimpleStringProducer is used for pushing strings in all the topics. If there are performance problems then instantiate more SimpleStringProducers. Kafka properties are fully customizable by calling setUp(Properties): defaults values are used for the missing properties.
Author:
acaproni
  • Field Details

    • bootstrapServers

      public final String bootstrapServers
      A list of comma separated servers:port to connect to
  • Constructor Details

    • SimpleStringProducer

      public SimpleStringProducer(String servers, String clientID)
      Constructor
      Parameters:
      servers - The list of kafka servers to connect to
      clientID - The unique identifier of this producer (mapped to kafka client.id)
  • Method Details

    • setUp

      public void setUp()
      Initialize the producer with default properties
    • setUp

      public void setUp(Properties props)
      Initialize the producer with the given properties

      Servers and ID passed in the constructor override those in the passed properties

      Parameters:
      props - user defined properties
    • tearDown

      public void tearDown()
      Closes the producer
    • push

      public void push(String value, String topic, Integer partition, String key) throws KafkaUtilsException
      Asynchronously pushes a string in a kafka topic.
      Parameters:
      value - The not null nor empty string to publish in the topic
      topic - The not null nor empty topic to push the string into
      partition - The partition
      key - The key
      Throws:
      KafkaUtilsException - in case of error sending the value
    • push

      public void push(String value, String topic, Integer partition, String key, int timeout, TimeUnit unit) throws KafkaUtilsException
      Synchronously pushes the passed string in the topic
      Parameters:
      value - The not null nor empty string to publish in the topic
      topic - The not null nor empty topic to push the string into
      partition - The partition
      key - The key
      timeout - the time to wait if sync is set
      unit - the unit of the timeout
      Throws:
      KafkaUtilsException - in case of error or timeout sending the value
    • push

      protected void push(String value, String topic, Integer partition, String key, boolean sync, int timeout, TimeUnit unit) throws KafkaUtilsException
      Pushes the passed string in the given partition and key.

      Kafka sending is asynch. i.e. this methods returns before the actual value is effectively published in the topic unless the sync parameter is set to true in which case send waits for the effective sending

      Parameters:
      value - The not null nor empty string to publish in the topic
      topic - The not null nor empty topic to push the string into
      partition - The partition
      key - The key
      sync - If true the methods return
      timeout - the time to wait if sync is set
      unit - the unit of the timeout
      Throws:
      KafkaUtilsException - in case of error or timeout sending the value
    • flush

      public void flush()
      Ensures all the records have been delivered to the broker especially useful while sending records asynchronously and want be sure they have all beel sent.
    • getNumOfStringsSent

      public int getNumOfStringsSent()