/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
'use strict';
module.exports = {
create: createAdminClient,
};
var Client = require('./client');
var util = require('util');
var Kafka = require('../librdkafka');
var LibrdKafkaError = require('./error');
var shallowCopy = require('./util').shallowCopy;
/**
* Create a new AdminClient for making topics, partitions, and more.
*
* This is a factory method because it immediately starts an
* active handle with the brokers.
*
*/
function createAdminClient(conf) {
var client = new AdminClient(conf);
// Wrap the error so we throw if it failed with some context
LibrdKafkaError.wrap(client.connect(), true);
// Return the client if we succeeded
return client;
}
/**
* AdminClient class for administering Kafka
*
* This client is the way you can interface with the Kafka Admin APIs.
* This class should not be made using the constructor, but instead
* should be made using the factory method.
*
* <code>
* var client = AdminClient.create({ ... });
* </code>
*
* Once you instantiate this object, it will have a handle to the kafka broker.
* Unlike the other node-rdkafka classes, this class does not ensure that
* it is connected to the upstream broker. Instead, making an action will
* validate that.
*
* @param {object} conf - Key value pairs to configure the admin client
* topic configuration
* @constructor
*/
function AdminClient(conf) {
if (!(this instanceof AdminClient)) {
return new AdminClient(conf);
}
conf = shallowCopy(conf);
/**
* NewTopic model.
*
* This is the representation of a new message that is requested to be made
* using the Admin client.
*
* @typedef {object} AdminClient~NewTopic
* @property {string} topic - the topic name to create
* @property {number} num_partitions - the number of partitions to give the topic
* @property {number} replication_factor - the replication factor of the topic
* @property {object} config - a list of key values to be passed as configuration
* for the topic.
*/
this._client = new Kafka.AdminClient(conf);
this._isConnected = false;
this.globalConfig = conf;
}
/**
* Connect using the admin client.
*
* Should be run using the factory method, so should never
* need to be called outside.
*
* Unlike the other connect methods, this one is synchronous.
*/
AdminClient.prototype.connect = function() {
LibrdKafkaError.wrap(this._client.connect(), true);
this._isConnected = true;
};
/**
* Disconnect the admin client.
*
* This is a synchronous method, but all it does is clean up
* some memory and shut some threads down
*/
AdminClient.prototype.disconnect = function() {
LibrdKafkaError.wrap(this._client.disconnect(), true);
this._isConnected = false;
};
/**
* Create a topic with a given config.
*
* @param {NewTopic} topic - Topic to create.
* @param {number} timeout - Number of milliseconds to wait while trying to create the topic.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.createTopic = function(topic, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 1000;
}
if (!timeout) {
timeout = 1000;
}
this._client.createTopic(topic, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};
/**
* Delete a topic.
*
* @param {string} topic - The topic to delete, by name.
* @param {number} timeout - Number of milliseconds to wait while trying to delete the topic.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 1000;
}
if (!timeout) {
timeout = 1000;
}
this._client.deleteTopic(topic, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};
/**
* Create new partitions for a topic.
*
* @param {string} topic - The topic to add partitions to, by name.
* @param {number} totalPartitions - The total number of partitions the topic should have
* after the request
* @param {number} timeout - Number of milliseconds to wait while trying to delete the topic.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 1000;
}
if (!timeout) {
timeout = 1000;
}
this._client.createPartitions(topic, totalPartitions, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};