Source: producer.js

/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

module.exports = Producer;

var Client = require('./client');

var util = require('util');
var Kafka = require('../librdkafka.js');
var ProducerStream = require('./producer-stream');
var LibrdKafkaError = require('./error');
var shallowCopy = require('./util').shallowCopy;

util.inherits(Producer, Client);

/**
 * Producer class for sending messages to Kafka
 *
 * This is the main entry point for writing data to Kafka. You
 * configure this like you do any other client, with a global
 * configuration and default topic configuration.
 *
 * Once you instantiate this object, you need to connect to it first.
 * This allows you to get the metadata and make sure the connection
 * can be made before you depend on it. After that, problems with
 * the connection will by brought down by using poll, which automatically
 * runs when a transaction is made on the object.
 *
 * @param {object} conf - Key value pairs to configure the producer
 * @param {object} topicConf - Key value pairs to create a default
 * topic configuration
 * @extends Client
 * @constructor
 */
function Producer(conf, topicConf) {
  if (!(this instanceof Producer)) {
    return new Producer(conf, topicConf);
  }

  conf = shallowCopy(conf);
  topicConf = shallowCopy(topicConf);

  /**
   * Producer message. This is sent to the wrapper, not received from it
   *
   * @typedef {object} Producer~Message
   * @property {string|buffer} message - The buffer to send to Kafka.
   * @property {Topic} topic - The Kafka topic to produce to.
   * @property {number} partition - The partition to produce to. Defaults to
   * the partitioner
   * @property {string} key - The key string to use for the message.
   * @see Consumer~Message
   */

  var gTopic = conf.topic || false;
  var gPart = conf.partition || null;
  var dr_cb = conf.dr_cb || null;
  var dr_msg_cb = conf.dr_msg_cb || null;

  // delete keys we don't want to pass on
  delete conf.topic;
  delete conf.partition;

  delete conf.dr_cb;
  delete conf.dr_msg_cb;

  // client is an initialized consumer object
  // @see NodeKafka::Consumer::Init
  Client.call(this, conf, Kafka.Producer, topicConf);
  var self = this;

  // Delete these keys after saving them in vars
  this.globalConfig = conf;
  this.topicConfig = topicConf;
  this.defaultTopic = gTopic || null;
  this.defaultPartition = gPart == null ? -1 : gPart;

  this.sentMessages = 0;

  this.pollInterval = undefined;

  if (dr_msg_cb || dr_cb) {
    this._client.onDeliveryReport(function onDeliveryReport(err, report) {
      if (err) {
        err = LibrdKafkaError.create(err);
      }
      self.emit('delivery-report', err, report);
    }, !!dr_msg_cb);

    if (typeof dr_cb === 'function') {
      self.on('delivery-report', dr_cb);
    }

  }
}

/**
 * Produce a message to Kafka synchronously.
 *
 * This is the method mainly used in this class. Use it to produce
 * a message to Kafka. The first and only parameter of the synchronous
 * variant is the message object.
 *
 * When this is sent off, there is no guarantee it is delivered. If you need
 * guaranteed delivery, change your *acks* settings, or use delivery reports.
 *
 * @param {string} topic - The topic name to produce to.
 * @param {number|null} partition - The partition number to produce to.
 * @param {Buffer|null} message - The message to produce.
 * @param {string} key - The key associated with the message.
 * @param {number|null} timestamp - Timestamp to send with the message.
 * @param {object} opaque - An object you want passed along with this message, if provided.
 * @throws {LibrdKafkaError} - Throws a librdkafka error if it failed.
 * @return {boolean} - returns an error if it failed, or true if not
 * @see Producer#produce
 */
Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque) {
  if (!this._isConnected) {
    throw new Error('Producer not connected');
  }

  // I have removed support for using a topic object. It is going to be removed
  // from librdkafka soon, and it causes issues with shutting down
  if (!topic || typeof topic !== 'string') {
    throw new TypeError('"topic" must be a string');
  }

  this.sentMessages++;

  partition = partition == null ? this.defaultPartition : partition;

  return this._errorWrap(
    this._client.produce(topic, partition, message, key, timestamp, opaque));

};

/**
 * Create a write stream interface for a producer.
 *
 * This stream does not run in object mode. It only takes buffers of data.
 *
 * @param {object} conf - Key value pairs to configure the producer
 * @param {object} topicConf - Key value pairs to create a default
 * topic configuration
 * @param {object} streamOptions - Stream options
 * @return {ProducerStream} - returns the write stream for writing to Kafka.
 */
Producer.createWriteStream = function(conf, topicConf, streamOptions) {
  var producer = new Producer(conf, topicConf);
  return new ProducerStream(producer, streamOptions);
};

/**
 * Poll for events
 *
 * We need to run poll in order to learn about new events that have occurred.
 * This is no longer done automatically when we produce, so we need to run
 * it manually, or set the producer to automatically poll.
 *
 * @return {Producer} - returns itself.
 */
Producer.prototype.poll = function() {
  if (!this._isConnected) {
    throw new Error('Producer not connected');
  }
  this._client.poll();
  return this;
};

/**
 * Set automatic polling for events.
 *
 * We need to run poll in order to learn about new events that have occurred.
 * If you would like this done on an interval with disconnects and reconnections
 * managed, you can do it here
 *
 * @param {number} interval - Interval, in milliseconds, to poll
 *
 * @return {Producer} - returns itself.
 */
Producer.prototype.setPollInterval = function(interval) {
  // If we already have a poll interval we need to stop it
  if (this.pollInterval) {
    clearInterval(this.pollInterval);
    this.pollInterval = undefined;
  }

  if (interval === 0) {
    // If the interval was set to 0, bail out. We don't want to process this.
    // If there was an interval previously set, it has been removed.
    return;
  }

  var self = this;

  // Now we want to make sure we are connected.
  if (!this._isConnected) {
    // If we are not, execute this once the connection goes through.
    this.once('ready', function() {
      self.setPollInterval(interval);
    });
    return;
  }

  // We know we are connected at this point.
  // Unref this interval
  this.pollInterval = setInterval(function() {
    try {
      self.poll();
    } catch (e) {
      // We can probably ignore errors here as far as broadcasting.
      // Disconnection issues will get handled below
    }
  }, interval).unref();

  // Handle disconnections
  this.once('disconnected', function() {
    // Just rerun this function. It will unset the original
    // interval and then bind to ready
    self.setPollInterval(interval);
  });

  return this;
};

/**
 * Flush the producer
 *
 * Flush everything on the internal librdkafka producer buffer. Do this before
 * disconnects usually
 *
 * @param {number} timeout - Number of milliseconds to try to flush before giving up.
 * @param {function} callback - Callback to fire when the flush is done.
 *
 * @return {Producer} - returns itself.
 */
Producer.prototype.flush = function(timeout, callback) {
  if (!this._isConnected) {
    throw new Error('Producer not connected');
  }

  if (timeout === undefined || timeout === null) {
    timeout = 500;
  }

  this._client.flush(timeout, function(err) {
    if (err) {
      err = LibrdKafkaError.create(err);
    }

    if (callback) {
      callback(err);
    }
  });
  return this;
};

/**
 * Save the base disconnect method here so we can overwrite it and add a flush
 */
Producer.prototype._disconnect = Producer.prototype.disconnect;

/**
 * Disconnect the producer
 *
 * Flush everything on the internal librdkafka producer buffer. Then disconnect
 *
 * @param {number} timeout - Number of milliseconds to try to flush before giving up.
 *                           defaults to 5 seconds.
 * @param {function} cb - The callback to fire when
 */
Producer.prototype.disconnect = function(timeout, cb) {
  var self = this;
  var timeoutInterval = 5000;

  if (typeof timeout === 'function') {
    cb = timeout;
  } else {
    timeoutInterval = timeout;
  }

  this.flush(timeoutInterval, function() {
    self._disconnect(cb);
  });
};