/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
'use strict';
module.exports = KafkaConsumer;
var Client = require('./client');
var util = require('util');
var Kafka = require('../librdkafka');
var KafkaConsumerStream = require('./kafka-consumer-stream');
var LibrdKafkaError = require('./error');
var TopicPartition = require('./topic-partition');
var shallowCopy = require('./util').shallowCopy;
util.inherits(KafkaConsumer, Client);
/**
* KafkaConsumer class for reading messages from Kafka
*
* This is the main entry point for reading data from Kafka. You
* configure this like you do any other client, with a global
* configuration and default topic configuration.
*
* Once you instantiate this object, connecting will open a socket.
* Data will not be read until you tell the consumer what topics
* you want to read from.
*
* @param {object} conf - Key value pairs to configure the consumer
* @param {object} topicConf - Key value pairs to create a default
* topic configuration
* @extends Client
* @constructor
*/
function KafkaConsumer(conf, topicConf) {
if (!(this instanceof KafkaConsumer)) {
return new KafkaConsumer(conf, topicConf);
}
conf = shallowCopy(conf);
topicConf = shallowCopy(topicConf);
var onRebalance = conf.rebalance_cb;
var self = this;
// If rebalance is undefined we don't want any part of this
if (onRebalance && typeof onRebalance === 'boolean') {
conf.rebalance_cb = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
self.emit('rebalance', err, assignment);
// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (self.isConnected()) {
self.emit('rebalance.error', e);
}
}
};
} else if (onRebalance && typeof onRebalance === 'function') {
/*
* Once this is opted in to, that's it. It's going to manually rebalance
* forever. There is no way to unset config values in librdkafka, just
* a way to override them.
*/
conf.rebalance_cb = function(err, assignment) {
// Create the librdkafka error
err = err ? LibrdKafkaError.create(err) : undefined;
self.emit('rebalance', err, assignment);
onRebalance.call(self, err, assignment);
};
}
// Same treatment for offset_commit_cb
var onOffsetCommit = conf.offset_commit_cb;
if (onOffsetCommit && typeof onOffsetCommit === 'boolean') {
conf.offset_commit_cb = function(err, offsets) {
// Emit the event
self.emit('offset.commit', offsets);
};
} else if (onOffsetCommit && typeof onOffsetCommit === 'function') {
conf.offset_commit_cb = function(err, offsets) {
// Emit the event
self.emit('offset.commit', offsets);
onOffsetCommit.call(self, err, offsets);
};
}
/**
* KafkaConsumer message.
*
* This is the representation of a message read from Kafka.
*
* @typedef {object} KafkaConsumer~Message
* @property {buffer} value - the message buffer from Kafka.
* @property {string} topic - the topic name
* @property {number} partition - the partition on the topic the
* message was on
* @property {number} offset - the offset of the message
* @property {string} key - the message key
* @property {number} size - message size, in bytes.
* @property {number} timestamp - message timestamp
*/
Client.call(this, conf, Kafka.KafkaConsumer, topicConf);
this.globalConfig = conf;
this.topicConfig = topicConf;
this._consumeTimeout = 1000;
}
/**
* Set the default consume timeout provided to c++land
* @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched
*/
KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};
/**
* Get a stream representation of this KafkaConsumer
*
* @see TopicReadable
* @example
* var consumerStream = Kafka.KafkaConsumer.createReadStream({
* 'metadata.broker.list': 'localhost:9092',
* 'group.id': 'librd-test',
* 'socket.keepalive.enable': true,
* 'enable.auto.commit': false
* }, {}, { topics: [ 'test' ] });
*
* @param {object} conf - Key value pairs to configure the consumer
* @param {object} topicConf - Key value pairs to create a default
* topic configuration
* @param {object} streamOptions - Stream options
* @param {array} streamOptions.topics - Array of topics to subscribe to.
* @return {KafkaConsumerStream} - Readable stream that receives messages
* when new ones become available.
*/
KafkaConsumer.createReadStream = function(conf, topicConf, streamOptions) {
var consumer = new KafkaConsumer(conf, topicConf);
return new KafkaConsumerStream(consumer, streamOptions);
};
/**
* Get a current list of the committed offsets per topic partition
*
* Returns an array of objects in the form of a topic partition list
*
* @param {TopicPartition[]} toppars - Topic partition list to query committed
* offsets for. Defaults to the current assignment
* @param {number} timeout - Number of ms to block before calling back
* and erroring
* @param {Function} cb - Callback method to execute when finished or timed
* out
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.committed = function(toppars, timeout, cb) {
// We want to be backwards compatible here, and the previous version of
// this function took two arguments
// If CB is not set, shift to backwards compatible version
if (!cb) {
cb = arguments[1];
timeout = arguments[0];
toppars = this.assignments();
} else {
toppars = toppars || this.assignments();
}
var self = this;
this._client.committed(toppars, timeout, function(err, topicPartitions) {
if (err) {
cb(LibrdKafkaError.create(err));
return;
}
cb(null, topicPartitions);
});
return this;
};
/**
* Seek consumer for topic+partition to offset which is either an absolute or
* logical offset.
*
* Does not return anything, as it is asynchronous. There are special cases
* with the timeout parameter. The consumer must have previously been assigned
* to topics and partitions that seek seeks to seek.
*
* @example
* consumer.seek({ topic: 'topic', partition: 0, offset: 1000 }, 0, function(err) {
* if (err) {
*
* }
* });
*
* @param {TopicPartition} toppar - Topic partition to seek.
* @param {number} timeout - Number of ms to block before calling back
* and erroring. If the parameter is null or 0, the call will not wait
* for the seek to be performed. Essentially, it will happen in the background
* with no notification
* @param {Function} cb - Callback method to execute when finished or timed
* out. If the seek timed out, the internal state of the consumer is unknown.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.seek = function(toppar, timeout, cb) {
var self = this;
this._client.seek(TopicPartition.create(toppar), timeout, function(err) {
if (err) {
cb(LibrdKafkaError.create(err));
return;
}
cb();
});
return this;
};
/**
* Assign the consumer specific partitions and topics
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.assign = function(assignments) {
this._client.assign(TopicPartition.map(assignments));
return this;
};
/**
* Unassign the consumer from its assigned partitions and topics.
*
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.unassign = function() {
this._client.unassign();
return this;
};
/**
* Get the assignments for the consumer
*
* @return {array} assignments - Array of topic partitions
*/
KafkaConsumer.prototype.assignments = function() {
return this._errorWrap(this._client.assignments(), true);
};
/**
* Subscribe to an array of topics (synchronously).
*
* This operation is pretty fast because it just sets
* an assignment in librdkafka. This is the recommended
* way to deal with subscriptions in a situation where you
* will be reading across multiple files or as part of
* your configure-time initialization.
*
* This is also a good way to do it for streams.
*
* @param {array} topics - An array of topics to listen to
* @throws - Throws when an error code came back from native land
* @return {KafkaConsumer} - Returns itself.
*/
KafkaConsumer.prototype.subscribe = function(topics) {
// Will throw if it is a bad error.
this._errorWrap(this._client.subscribe(topics));
this.emit('subscribed', topics);
return this;
};
/**
* Get the current subscription of the KafkaConsumer
*
* Get a list of subscribed topics. Should generally match what you
* passed on via subscribe
*
* @see KafkaConsumer::subscribe
* @throws - Throws when an error code came back from native land
* @return {array} - Array of strings to show the current assignment
*/
KafkaConsumer.prototype.subscription = function() {
return this._errorWrap(this._client.subscription(), true);
};
/**
* Get the current offset position of the KafkaConsumer
*
* Returns a list of RdKafka::TopicPartitions on success, or throws
* an error on failure
*
* @param {TopicPartition[]} toppars - List of topic partitions to query
* position for. Defaults to the current assignment
* @throws - Throws when an error code came back from native land
* @return {array} - TopicPartition array. Each item is an object with
* an offset, topic, and partition
*/
KafkaConsumer.prototype.position = function(toppars) {
if (!toppars) {
toppars = this.assignments();
}
return this._errorWrap(this._client.position(toppars), true);
};
/**
* Unsubscribe from all currently subscribed topics
*
* Before you subscribe to new topics you need to unsubscribe
* from the old ones, if there is an active subscription.
* Otherwise, you will get an error because there is an
* existing subscription.
*
* @throws - Throws when an error code comes back from native land
* @return {KafkaConsumer} - Returns itself.
*/
KafkaConsumer.prototype.unsubscribe = function() {
this._errorWrap(this._client.unsubscribe());
this.emit('unsubscribed', []);
// Backwards compatible change
this.emit('unsubscribe', []);
return this;
};
/**
* Read a number of messages from Kafka.
*
* This method is similar to the main one, except that it reads a number
* of messages before calling back. This may get better performance than
* reading a single message each time in stream implementations.
*
* This will keep going until it gets ERR__PARTITION_EOF or ERR__TIMED_OUT
* so the array may not be the same size you ask for. The size is advisory,
* but we will not exceed it.
*
* @param {number} size - Number of messages to read
* @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
*//**
* Read messages from Kafka as fast as possible
*
* This method keeps a background thread running to fetch the messages
* as quickly as it can, sleeping only in between EOF and broker timeouts.
*
* Use this to get the maximum read performance if you don't care about the
* stream backpressure.
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
var timeoutMs = this._consumeTimeout || 1000;
var self = this;
if ((number && typeof number === 'number') || (number && cb)) {
if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}
this._consumeNum(timeoutMs, number, cb);
} else {
// See https://github.com/Blizzard/node-rdkafka/issues/220
// Docs specify just a callback can be provided but really we needed
// a fallback to the number argument
// @deprecated
if (cb === undefined) {
if (typeof number === 'function') {
cb = number;
} else {
cb = function() {};
}
}
this._consumeLoop(timeoutMs, cb);
}
};
/**
* Open a background thread and keep getting messages as fast
* as we can. Should not be called directly, and instead should
* be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
var self = this;
self._client.consumeLoop(timeoutMs, function readCallback(err, message) {
if (err) {
// A few different types of errors here
// but the two we do NOT care about are
// time outs and broker no more messages
// at least now
cb(LibrdKafkaError.create(err));
} else {
/**
* Data event. called whenever a message is received.
*
* @event KafkaConsumer#data
* @type {KafkaConsumer~Message}
*/
self.emit('data', message);
cb(err, message);
}
});
};
/**
* Consume a number of messages and wrap in a try catch with
* proper error reporting. Should not be called directly,
* and instead should be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
var self = this;
this._client.consume(timeoutMs, numMessages, function(err, messages) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
cb(err);
}
return;
}
for (var i = 0; i < messages.length; i++) {
self.emit('data', messages[i]);
}
if (cb) {
cb(null, messages);
}
});
};
/**
* This callback returns the message read from Kafka.
*
* @callback KafkaConsumer~readCallback
* @param {LibrdKafkaError} err - An error, if one occurred while reading
* the data.
* @param {KafkaConsumer~Message} message
*/
/**
* Commit a topic partition or all topic partitions that have been read
*
* If you provide a topic partition, it will commit that. Otherwise,
* it will commit all read offsets for all topic partitions.
*
* @param {object|array|null} - Topic partition object to commit, list of topic
* partitions, or null if you want to commit all read offsets.
* @throws When commit returns a non 0 error code
*
* @return {KafkaConsumer} - returns itself.
*/
KafkaConsumer.prototype.commit = function(topicPartition) {
this._errorWrap(this._client.commit(topicPartition), true);
return this;
};
/**
* Commit a message
*
* This is basically a convenience method to map commit properly. We need to
* add one to the offset in this case
*
* @param {object} - Message object to commit
* @throws When commit returns a non 0 error code
*
* @return {KafkaConsumer} - returns itself.
*/
KafkaConsumer.prototype.commitMessage = function(msg) {
var topicPartition = {
topic: msg.topic,
partition: msg.partition,
offset: msg.offset + 1
};
this._errorWrap(this._client.commit(topicPartition), true);
return this;
};
/**
* Commit a topic partition (or all topic partitions) synchronously
*
* @param {object|array|null} - Topic partition object to commit, list of topic
* partitions, or null if you want to commit all read offsets.
* @throws {LibrdKafkaError} - if the commit fails
*
* @return {KafkaConsumer} - returns itself.
*/
KafkaConsumer.prototype.commitSync = function(topicPartition) {
this._errorWrap(this._client.commitSync(topicPartition), true);
return this;
};
/**
* Commit a message synchronously
*
* @see KafkaConsumer#commitMessageSync
* @param {object} msg - A message object to commit.
*
* @throws {LibrdKafkaError} - if the commit fails
*
* @return {KafkaConsumer} - returns itself.
*/
KafkaConsumer.prototype.commitMessageSync = function(msg) {
var topicPartition = {
topic: msg.topic,
partition: msg.partition,
offset: msg.offset + 1
};
this._errorWrap(this._client.commitSync(topicPartition), true);
return this;
};
/**
* Get last known offsets from the client.
*
* The low offset is updated periodically (if statistics.interval.ms is set)
* while the high offset is updated on each fetched message set from the
* broker.
*
* If there is no cached offset (either low or high, or both), then this will
* throw an error.
*
* @param {string} topic - Topic to recieve offsets from.
* @param {number} partition - Partition of the provided topic to recieve offsets from
* @return {Client~watermarkOffsets} - Returns an object with a high and low property, specifying
* the high and low offsets for the topic partition
* @throws {LibrdKafkaError} - Throws when there is no offset stored
*/
KafkaConsumer.prototype.getWatermarkOffsets = function(topic, partition) {
if (!this.isConnected()) {
throw new Error('Client is disconnected');
}
return this._errorWrap(this._client.getWatermarkOffsets(topic, partition), true);
};
/**
* Store offset for topic partition.
*
* The offset will be committed (written) to the offset store according to the auto commit interval,
* if auto commit is on, or next manual offset if not.
*
* enable.auto.offset.store must be set to false to use this API,
*
* @see https://github.com/edenhill/librdkafka/blob/261371dc0edef4cea9e58a076c8e8aa7dc50d452/src-cpp/rdkafkacpp.h#L1702
*
* @param {Array.<TopicPartition>} topicPartitions - Topic partitions with offsets to store offsets for.
* @throws {LibrdKafkaError} - Throws when there is no offset stored
*/
KafkaConsumer.prototype.offsetsStore = function(topicPartitions) {
if (!this.isConnected()) {
throw new Error('Client is disconnected');
}
return this._errorWrap(this._client.offsetsStore(topicPartitions), true);
};
/**
* Resume consumption for the provided list of partitions.
*
* @param {Array.<TopicPartition>} topicPartitions - List of topic partitions to resume consumption on.
* @throws {LibrdKafkaError} - Throws when there is no offset stored
*/
KafkaConsumer.prototype.resume = function(topicPartitions) {
if (!this.isConnected()) {
throw new Error('Client is disconnected');
}
return this._errorWrap(this._client.resume(topicPartitions), true);
};
/**
* Pause producing or consumption for the provided list of partitions.
*
* @param {Array.<TopicPartition>} topicPartitions - List of topics to pause consumption on.
* @throws {LibrdKafkaError} - Throws when there is no offset stored
*/
KafkaConsumer.prototype.pause = function(topicPartitions) {
if (!this.isConnected()) {
throw new Error('Client is disconnected');
}
return this._errorWrap(this._client.pause(topicPartitions), true);
};