Source: kafka-consumer-stream.js

/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

'use strict';

module.exports = KafkaConsumerStream;

var Readable = require('stream').Readable;
var util = require('util');

util.inherits(KafkaConsumerStream, Readable);

/**
 * ReadableStream integrating with the Kafka Consumer.
 *
 * This class is used to read data off of Kafka in a streaming way. It is
 * useful if you'd like to have a way to pipe Kafka into other systems. You
 * should generally not make this class yourself, as it is not even exposed
 * as part of module.exports. Instead, you should KafkaConsumer.createReadStream.
 *
 * The stream implementation is slower than the continuous subscribe callback.
 * If you don't care so much about backpressure and would rather squeeze
 * out performance, use that method. Using the stream will ensure you read only
 * as fast as you write.
 *
 * The stream detects if Kafka is already connected. If it is, it will begin
 * reading. If it is not, it will connect and read when it is ready.
 *
 * This stream operates in objectMode. It streams {Consumer~Message}
 *
 * @param {Consumer} consumer - The Kafka Consumer object.
 * @param {object} options - Options to configure the stream.
 * @param {number} options.waitInterval - Number of ms to wait if Kafka reports
 * that it has timed out or that we are out of messages (right now).
 * @param {array} options.topics - Array of topics, or a function that parses
 * metadata into an array of topics
 * @constructor
 * @extends stream.Readable
 * @see Consumer~Message
 */
function KafkaConsumerStream(consumer, options) {
  if (!(this instanceof KafkaConsumerStream)) {
    return new KafkaConsumerStream(consumer, options);
  }

  if (options === undefined) {
    options = { waitInterval: 1000 };
  } else if (typeof options === 'number') {
    options = { waitInterval: options };
  } else if (options === null || typeof options !== 'object') {
    throw new TypeError('"options" argument must be a number or an object');
  }

  var topics = options.topics;

  if (typeof topics === 'function') {
    // Just ignore the rest of the checks here
  } else if (!Array.isArray(topics)) {
    if (typeof topics !== 'string' && !(topics instanceof RegExp)) {
      throw new TypeError('"topics" argument must be a string, regex, or an array');
    } else {
      topics = [topics];
    }
  }

  options = Object.create(options);

  var fetchSize = options.fetchSize || 1;

  // Run in object mode by default.
  if (options.objectMode === null || options.objectMode === undefined) {
    options.objectMode = true;

    // If they did not explicitly set high water mark, and we are running
    // in object mode, set it to the fetch size + 2 to ensure there is room
    // for a standard fetch
    if (!options.highWaterMark) {
      options.highWaterMark = fetchSize + 2;
    }
  }

  if (options.objectMode !== true) {
    this._read = this._read_buffer;
  } else {
    this._read = this._read_message;
  }

  Readable.call(this, options);

  this.consumer = consumer;
  this.topics = topics;
  this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
  this.waitInterval = options.waitInterval === undefined ? 1000 : options.waitInterval;
  this.fetchSize = fetchSize;
  this.connectOptions = options.connectOptions || {};
  this.streamAsBatch = options.streamAsBatch || false;

  // Hold the messages in here
  this.messages = [];

  var self = this;

  this.consumer
    .on('unsubscribed', function() {
      // Invalidate the stream when we unsubscribe
      self.push(null);
    });

  // Call connect. Handles potentially being connected already
  this.connect(this.connectOptions);

  this.once('end', function() {
    if (this.autoClose) {
      this.destroy();
    }
  });

}

/**
 * Internal stream read method. This method reads message objects.
 * @param {number} size - This parameter is ignored for our cases.
 * @private
 */
KafkaConsumerStream.prototype._read_message = function(size) {
  if (this.messages.length > 0) {
    return this.push(this.messages.shift());
  }

  if (!this.consumer) {
    // This consumer is set to `null` in the close function
    return;
  }

  if (!this.consumer.isConnected()) {
    this.consumer.once('ready', function() {
      // This is the way Node.js does it
      // https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
      this._read(size);
    }.bind(this));
    return;
  }

  if (this.destroyed) {
    return;
  }

  var self = this;

  // If the size (number of messages) we are being advised to fetch is
  // greater than or equal to the fetch size, use the fetch size.
  // Only opt to use the size in case it is LESS than the fetch size.
  // Essentially, we want to use the smaller value here
  var fetchSize = size >= this.fetchSize ? this.fetchSize : size;

  this.consumer.consume(fetchSize, onread);

  // Retry function. Will wait up to the wait interval, with some
  // random noise if one is provided. Otherwise, will go immediately.
  function retry() {
    if (!self.waitInterval) {
      setImmediate(function() {
        self._read(size);
      });
    } else {
      setTimeout(function() {
        self._read(size);
      }, self.waitInterval * Math.random()).unref();
    }
  }

  function onread(err, messages) {

    // If there was an error we still want to emit it.
    // Essentially, if the user does not register an error
    // handler, it will still cause the stream to blow up.
    //
    // But... if one is provided, consumption will move on
    // as normal
    if (err) {
      self.emit('error', err);
    }

    // If there are no messages it means we reached EOF or a timeout.
    // Do what we used to do

    if (err || messages.length < 1) {
      // If we got an error or if there were no messages, initiate a retry
      retry();
      return;
    } else {
      if (self.streamAsBatch) {
        self.push(messages);
      } else {
        for (var i = 0; i < messages.length; i++) {
          self.messages.push(messages[i]);
        }

        // Now that we have added them all the inner messages buffer,
        // we can just push the most recent one
        self.push(self.messages.shift());
      }
    }

  }
};

/**
 * Internal stream read method. This method reads message buffers.
 * @param {number} size - This parameter is ignored for our cases.
 * @private
 */
KafkaConsumerStream.prototype._read_buffer = function(size) {
  if (this.messages.length > 0) {
    return this.push(this.messages.shift());
  }

  if (!this.consumer) {
    // This consumer is set to `null` in the close function
    return;
  }

  if (!this.consumer.isConnected()) {
    this.consumer.once('ready', function() {
      // This is the way Node.js does it
      // https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
      this._read(size);
    }.bind(this));
    return;
  }

  if (this.destroyed) {
    return;
  }

  var self = this;

  // If the size (number of messages) we are being advised to fetch is
  // greater than or equal to the fetch size, use the fetch size.
  // Only opt to use the size in case it is LESS than the fetch size.
  // Essentially, we want to use the smaller value here
  var fetchSize = size >= this.fetchSize ? this.fetchSize : size;

  this.consumer.consume(fetchSize, onread);

  // Retry function. Will wait up to the wait interval, with some
  // random noise if one is provided. Otherwise, will go immediately.
  function retry() {
    if (!self.waitInterval) {
      setImmediate(function() {
        self._read(size);
      });
    } else {
      setTimeout(function() {
        self._read(size);
      }, self.waitInterval * Math.random()).unref();
    }
  }

  function onread(err, messages) {
    // If there was an error we still want to emit it.
    // Essentially, if the user does not register an error
    // handler, it will still cause the stream to blow up.
    //
    // But... if one is provided, consumption will move on
    // as normal
    if (err) {
      self.emit('error', err);
    }

    // If there are no messages it means we reached EOF or a timeout.
    // Do what we used to do

    if (err || messages.length < 1) {
      // If we got an error or if there were no messages, initiate a retry
      retry();
      return;
    } else {
      if (self.streamAsBatch) {
        self.push(messages);
      } else {
        for (var i = 0; i < messages.length; i++) {
          self.messages.push(messages[i].value);
        }
        // Now that we have added them all the inner messages buffer,
        // we can just push the most recent one
        self.push(self.messages.shift());
      }
    }

  }
};

KafkaConsumerStream.prototype.connect = function(options) {
  var self = this;

  function connectCallback(err, metadata) {
    if (err) {
      self.emit('error', err);
      self.destroy();
      return;
    }

    try {
      // Subscribe to the topics as well so we will be ready
      // If this throws the stream is invalid

      // This is the magic part. If topics is a function, before we subscribe,
      // pass the metadata in
      if (typeof self.topics === 'function') {
        var topics = self.topics(metadata);
        self.consumer.subscribe(topics);
      } else {
        self.consumer.subscribe(self.topics);
      }
    } catch (e) {
      self.emit('error', e);
      self.destroy();
      return;
    }

    // start the flow of data
    self.read();
  }

  if (!this.consumer.isConnected()) {
    self.consumer.connect(options, connectCallback);
  } else {
    // Immediately call the connect callback
    setImmediate(function() {
      connectCallback(null, self.consumer._metadata);
    });
  }

};

KafkaConsumerStream.prototype.destroy = function() {
  if (this.destroyed) {
    return;
  }
  this.destroyed = true;
  this.close();
};

KafkaConsumerStream.prototype.close = function(cb) {
  var self = this;
  if (cb) {
    this.once('close', cb);
  }

  if (!self.consumer._isConnecting && !self.consumer._isConnected) {
    // If we aren't even connected just exit. We are done.
    close();
    return;
  }

  if (self.consumer._isConnecting) {
    self.consumer.once('ready', function() {
      // Don't pass the CB because it has already been passed.
      self.close();
    });
    return;
  }

  if (self.consumer._isConnected) {
    self.consumer.unsubscribe();
    self.consumer.disconnect(function() {
      close();
    });
  }

  function close() {
    self.emit('close');
  }
};