/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
'use strict';
module.exports = ProducerStream;
var Writable = require('stream').Writable;
var util = require('util');
var ErrorCode = require('./error').codes;
util.inherits(ProducerStream, Writable);
/**
* Writable stream integrating with the Kafka Producer.
*
* This class is used to write data to Kafka in a streaming way. It takes
* buffers of data and puts them into the appropriate Kafka topic. If you need
* finer control over partitions or keys, this is probably not the class for
* you. In that situation just use the Producer itself.
*
* The stream detects if Kafka is already connected. You can safely begin
* writing right away.
*
* This stream does not operate in Object mode and can only be given buffers.
*
* @param {Producer} producer - The Kafka Producer object.
* @param {array} topics - Array of topics
* @param {object} options - Topic configuration.
* @constructor
* @extends stream.Writable
*/
function ProducerStream(producer, options) {
if (!(this instanceof ProducerStream)) {
return new ProducerStream(producer, options);
}
if (options === undefined) {
options = {};
} else if (typeof options === 'string') {
options = { encoding: options };
} else if (options === null || typeof options !== 'object') {
throw new TypeError('"streamOptions" argument must be a string or an object');
}
if (!options.objectMode && !options.topic) {
throw new TypeError('ProducerStreams not using objectMode must provide a topic to produce to.');
}
if (options.objectMode !== true) {
this._write = this._write_buffer;
} else {
this._write = this._write_message;
}
Writable.call(this, options);
this.producer = producer;
this.topicName = options.topic;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.connectOptions = options.connectOptions || {};
this.producer.setPollInterval(options.pollInterval || 1000);
if (options.encoding) {
this.setDefaultEncoding(options.encoding);
}
// Connect to the producer. Unless we are already connected
if (!this.producer.isConnected()) {
this.connect(this.connectOptions);
}
var self = this;
this.once('finish', function() {
if (this.autoClose) {
this.close();
}
});
}
ProducerStream.prototype.connect = function(options) {
this.producer.connect(options, function(err, data) {
if (err) {
this.emit('error', err);
return;
}
}.bind(this));
};
/**
* Internal stream write method for ProducerStream when writing buffers.
*
* This method should never be called externally. It has some recursion to
* handle cases where the producer is not yet connected.
*
* @param {buffer} chunk - Chunk to write.
* @param {string} encoding - Encoding for the buffer
* @param {Function} cb - Callback to call when the stream is done processing
* the data.
* @private
* @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
*/
ProducerStream.prototype._write_buffer = function(data, encoding, cb) {
if (!(data instanceof Buffer)) {
this.emit('error', new Error('Invalid data. Can only produce buffers'));
return;
}
var self = this;
if (!this.producer.isConnected()) {
this.producer.once('ready', function() {
self._write(data, encoding, cb);
});
return;
}
try {
this.producer.produce(self.topicName, null, data, null);
setImmediate(cb);
} catch (e) {
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
// Poll for good measure
self.producer.poll();
// Just delay this thing a bit and pass the params
// backpressure will get exerted this way.
setTimeout(function() {
self._write(data, encoding, cb);
}, 500);
} else {
if (self.autoClose) {
self.close();
}
setImmediate(function() {
cb(e);
});
}
}
};
/**
* Internal stream write method for ProducerStream when writing objects.
*
* This method should never be called externally. It has some recursion to
* handle cases where the producer is not yet connected.
*
* @param {object} message - Message to write.
* @param {string} encoding - Encoding for the buffer
* @param {Function} cb - Callback to call when the stream is done processing
* the data.
* @private
* @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
*/
ProducerStream.prototype._write_message = function(message, encoding, cb) {
var self = this;
if (!this.producer.isConnected()) {
this.producer.once('ready', function() {
self._write(message, encoding, cb);
});
return;
}
try {
this.producer.produce(message.topic, message.partition, message.value, message.key, message.timestamp, message.opaque);
setImmediate(cb);
} catch (e) {
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
// Poll for good measure
self.producer.poll();
// Just delay this thing a bit and pass the params
// backpressure will get exerted this way.
setTimeout(function() {
self._write(message, encoding, cb);
}, 500);
} else {
if (self.autoClose) {
self.close();
}
setImmediate(function() {
cb(e);
});
}
}
};
function writev(producer, topic, chunks, cb) {
// @todo maybe a produce batch method?
var doneCount = 0;
var err = null;
var chunk = null;
function maybeDone(e) {
if (e) {
err = e;
}
doneCount ++;
if (doneCount === chunks.length) {
cb(err);
}
}
function retry(restChunks) {
// Poll for good measure
producer.poll();
// Just delay this thing a bit and pass the params
// backpressure will get exerted this way.
setTimeout(function() {
writev(producer, topic, restChunks, cb);
}, 500);
}
for (var i = 0; i < chunks.length; i++) {
chunk = chunks[i];
try {
if (Buffer.isBuffer(chunk)) {
producer.produce(topic, null, chunk, null);
} else {
producer.produce(chunk.topic, chunk.partition, chunk.value, chunk.key, chunk.timestamp, chunk.opaque);
}
maybeDone();
} catch (e) {
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
retry(chunks.slice(i));
} else {
cb(e);
}
break;
}
}
}
ProducerStream.prototype._writev = function(data, cb) {
if (!this.producer.isConnected()) {
this.once('ready', function() {
this._writev(data, cb);
});
return;
}
var self = this;
var len = data.length;
var chunks = new Array(len);
var size = 0;
for (var i = 0; i < len; i++) {
var chunk = data[i].chunk;
chunks[i] = chunk;
size += chunk.length;
}
writev(this.producer, this.topicName, chunks, function(err) {
if (err) {
self.close();
cb(err);
return;
}
cb();
});
};
ProducerStream.prototype.close = function(cb) {
var self = this;
if (cb) {
this.once('close', cb);
}
// Use interval variables in here
if (self.producer._isConnected) {
self.producer.disconnect(function() {
// Previously this set the producer to null. I'm not sure there is any benefit
// to that other than I guess helping flag it for GC?
// https://github.com/Blizzard/node-rdkafka/issues/344
close();
});
} else if (self.producer._isConnecting){
self.producer.once('ready', function() {
// Don't pass CB this time because it has already been passed
self.close();
});
} else {
setImmediate(close);
}
function close() {
self.emit('close');
}
};