/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
'use strict';
module.exports = KafkaConsumerStream;
var Readable = require('stream').Readable;
var util = require('util');
util.inherits(KafkaConsumerStream, Readable);
/**
* ReadableStream integrating with the Kafka Consumer.
*
* This class is used to read data off of Kafka in a streaming way. It is
* useful if you'd like to have a way to pipe Kafka into other systems. You
* should generally not make this class yourself, as it is not even exposed
* as part of module.exports. Instead, you should KafkaConsumer.createReadStream.
*
* The stream implementation is slower than the continuous subscribe callback.
* If you don't care so much about backpressure and would rather squeeze
* out performance, use that method. Using the stream will ensure you read only
* as fast as you write.
*
* The stream detects if Kafka is already connected. If it is, it will begin
* reading. If it is not, it will connect and read when it is ready.
*
* This stream operates in objectMode. It streams {Consumer~Message}
*
* @param {Consumer} consumer - The Kafka Consumer object.
* @param {object} options - Options to configure the stream.
* @param {number} options.waitInterval - Number of ms to wait if Kafka reports
* that it has timed out or that we are out of messages (right now).
* @param {array} options.topics - Array of topics, or a function that parses
* metadata into an array of topics
* @constructor
* @extends stream.Readable
* @see Consumer~Message
*/
function KafkaConsumerStream(consumer, options) {
if (!(this instanceof KafkaConsumerStream)) {
return new KafkaConsumerStream(consumer, options);
}
if (options === undefined) {
options = { waitInterval: 1000 };
} else if (typeof options === 'number') {
options = { waitInterval: options };
} else if (options === null || typeof options !== 'object') {
throw new TypeError('"options" argument must be a number or an object');
}
var topics = options.topics;
if (typeof topics === 'function') {
// Just ignore the rest of the checks here
} else if (!Array.isArray(topics)) {
if (typeof topics !== 'string' && !(topics instanceof RegExp)) {
throw new TypeError('"topics" argument must be a string, regex, or an array');
} else {
topics = [topics];
}
}
options = Object.create(options);
var fetchSize = options.fetchSize || 1;
// Run in object mode by default.
if (options.objectMode === null || options.objectMode === undefined) {
options.objectMode = true;
// If they did not explicitly set high water mark, and we are running
// in object mode, set it to the fetch size + 2 to ensure there is room
// for a standard fetch
if (!options.highWaterMark) {
options.highWaterMark = fetchSize + 2;
}
}
if (options.objectMode !== true) {
this._read = this._read_buffer;
} else {
this._read = this._read_message;
}
Readable.call(this, options);
this.consumer = consumer;
this.topics = topics;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.waitInterval = options.waitInterval === undefined ? 1000 : options.waitInterval;
this.fetchSize = fetchSize;
this.connectOptions = options.connectOptions || {};
this.streamAsBatch = options.streamAsBatch || false;
// Hold the messages in here
this.messages = [];
var self = this;
this.consumer
.on('unsubscribed', function() {
// Invalidate the stream when we unsubscribe
self.push(null);
});
// Call connect. Handles potentially being connected already
this.connect(this.connectOptions);
this.once('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}
/**
* Internal stream read method. This method reads message objects.
* @param {number} size - This parameter is ignored for our cases.
* @private
*/
KafkaConsumerStream.prototype._read_message = function(size) {
if (this.messages.length > 0) {
return this.push(this.messages.shift());
}
if (!this.consumer) {
// This consumer is set to `null` in the close function
return;
}
if (!this.consumer.isConnected()) {
this.consumer.once('ready', function() {
// This is the way Node.js does it
// https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
this._read(size);
}.bind(this));
return;
}
if (this.destroyed) {
return;
}
var self = this;
// If the size (number of messages) we are being advised to fetch is
// greater than or equal to the fetch size, use the fetch size.
// Only opt to use the size in case it is LESS than the fetch size.
// Essentially, we want to use the smaller value here
var fetchSize = size >= this.fetchSize ? this.fetchSize : size;
this.consumer.consume(fetchSize, onread);
// Retry function. Will wait up to the wait interval, with some
// random noise if one is provided. Otherwise, will go immediately.
function retry() {
if (!self.waitInterval) {
setImmediate(function() {
self._read(size);
});
} else {
setTimeout(function() {
self._read(size);
}, self.waitInterval * Math.random()).unref();
}
}
function onread(err, messages) {
// If there was an error we still want to emit it.
// Essentially, if the user does not register an error
// handler, it will still cause the stream to blow up.
//
// But... if one is provided, consumption will move on
// as normal
if (err) {
self.emit('error', err);
}
// If there are no messages it means we reached EOF or a timeout.
// Do what we used to do
if (err || messages.length < 1) {
// If we got an error or if there were no messages, initiate a retry
retry();
return;
} else {
if (self.streamAsBatch) {
self.push(messages);
} else {
for (var i = 0; i < messages.length; i++) {
self.messages.push(messages[i]);
}
// Now that we have added them all the inner messages buffer,
// we can just push the most recent one
self.push(self.messages.shift());
}
}
}
};
/**
* Internal stream read method. This method reads message buffers.
* @param {number} size - This parameter is ignored for our cases.
* @private
*/
KafkaConsumerStream.prototype._read_buffer = function(size) {
if (this.messages.length > 0) {
return this.push(this.messages.shift());
}
if (!this.consumer) {
// This consumer is set to `null` in the close function
return;
}
if (!this.consumer.isConnected()) {
this.consumer.once('ready', function() {
// This is the way Node.js does it
// https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
this._read(size);
}.bind(this));
return;
}
if (this.destroyed) {
return;
}
var self = this;
// If the size (number of messages) we are being advised to fetch is
// greater than or equal to the fetch size, use the fetch size.
// Only opt to use the size in case it is LESS than the fetch size.
// Essentially, we want to use the smaller value here
var fetchSize = size >= this.fetchSize ? this.fetchSize : size;
this.consumer.consume(fetchSize, onread);
// Retry function. Will wait up to the wait interval, with some
// random noise if one is provided. Otherwise, will go immediately.
function retry() {
if (!self.waitInterval) {
setImmediate(function() {
self._read(size);
});
} else {
setTimeout(function() {
self._read(size);
}, self.waitInterval * Math.random()).unref();
}
}
function onread(err, messages) {
// If there was an error we still want to emit it.
// Essentially, if the user does not register an error
// handler, it will still cause the stream to blow up.
//
// But... if one is provided, consumption will move on
// as normal
if (err) {
self.emit('error', err);
}
// If there are no messages it means we reached EOF or a timeout.
// Do what we used to do
if (err || messages.length < 1) {
// If we got an error or if there were no messages, initiate a retry
retry();
return;
} else {
if (self.streamAsBatch) {
self.push(messages);
} else {
for (var i = 0; i < messages.length; i++) {
self.messages.push(messages[i].value);
}
// Now that we have added them all the inner messages buffer,
// we can just push the most recent one
self.push(self.messages.shift());
}
}
}
};
KafkaConsumerStream.prototype.connect = function(options) {
var self = this;
function connectCallback(err, metadata) {
if (err) {
self.emit('error', err);
self.destroy();
return;
}
try {
// Subscribe to the topics as well so we will be ready
// If this throws the stream is invalid
// This is the magic part. If topics is a function, before we subscribe,
// pass the metadata in
if (typeof self.topics === 'function') {
var topics = self.topics(metadata);
self.consumer.subscribe(topics);
} else {
self.consumer.subscribe(self.topics);
}
} catch (e) {
self.emit('error', e);
self.destroy();
return;
}
// start the flow of data
self.read();
}
if (!this.consumer.isConnected()) {
self.consumer.connect(options, connectCallback);
} else {
// Immediately call the connect callback
setImmediate(function() {
connectCallback(null, self.consumer._metadata);
});
}
};
KafkaConsumerStream.prototype.destroy = function() {
if (this.destroyed) {
return;
}
this.destroyed = true;
this.close();
};
KafkaConsumerStream.prototype.close = function(cb) {
var self = this;
if (cb) {
this.once('close', cb);
}
if (!self.consumer._isConnecting && !self.consumer._isConnected) {
// If we aren't even connected just exit. We are done.
close();
return;
}
if (self.consumer._isConnecting) {
self.consumer.once('ready', function() {
// Don't pass the CB because it has already been passed.
self.close();
});
return;
}
if (self.consumer._isConnected) {
self.consumer.unsubscribe();
self.consumer.disconnect(function() {
close();
});
}
function close() {
self.emit('close');
}
};