/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
module.exports = LibrdKafkaError;
var util = require('util');
var librdkafka = require('../librdkafka');
util.inherits(LibrdKafkaError, Error);
LibrdKafkaError.create = createLibrdkafkaError;
LibrdKafkaError.wrap = errorWrap;
/**
* Enum for identifying errors reported by the library
*
* You can find this list in the C++ code at
* https://github.com/edenhill/librdkafka/blob/master/src-cpp/rdkafkacpp.h#L148
*
* @readonly
* @enum {number}
* @constant
*/
LibrdKafkaError.codes = {
/** Begin internal error codes */
ERR__BEGIN: -200,
/** Received message is incorrect */
ERR__BAD_MSG: -199,
/** Bad/unknown compression */
ERR__BAD_COMPRESSION: -198,
/** Broker is going away */
ERR__DESTROY: -197,
/** Generic failure */
ERR__FAIL: -196,
/** Broker transport failure */
ERR__TRANSPORT: -195,
/** Critical system resource */
ERR__CRIT_SYS_RESOURCE: -194,
/** Failed to resolve broker */
ERR__RESOLVE: -193,
/** Produced message timed out */
ERR__MSG_TIMED_OUT: -192,
/** Reached the end of the topic+partition queue on the broker. Not really an error. */
ERR__PARTITION_EOF: -191,
/** Permanent: Partition does not exist in cluster. */
ERR__UNKNOWN_PARTITION: -190,
/** File or filesystem error */
ERR__FS: -189,
/** Permanent: Topic does not exist in cluster. */
ERR__UNKNOWN_TOPIC: -188,
/** All broker connections are down. */
ERR__ALL_BROKERS_DOWN: -187,
/** Invalid argument, or invalid configuration */
ERR__INVALID_ARG: -186,
/** Operation timed out */
ERR__TIMED_OUT: -185,
/** Queue is full */
ERR__QUEUE_FULL: -184,
/** ISR count < required.acks */
ERR__ISR_INSUFF: -183,
/** Broker node update */
ERR__NODE_UPDATE: -182,
/** SSL error */
ERR__SSL: -181,
/** Waiting for coordinator to become available. */
ERR__WAIT_COORD: -180,
/** Unknown client group */
ERR__UNKNOWN_GROUP: -179,
/** Operation in progress */
ERR__IN_PROGRESS: -178,
/** Previous operation in progress, wait for it to finish. */
ERR__PREV_IN_PROGRESS: -177,
/** This operation would interfere with an existing subscription */
ERR__EXISTING_SUBSCRIPTION: -176,
/** Assigned partitions (rebalance_cb) */
ERR__ASSIGN_PARTITIONS: -175,
/** Revoked partitions (rebalance_cb) */
ERR__REVOKE_PARTITIONS: -174,
/** Conflicting use */
ERR__CONFLICT: -173,
/** Wrong state */
ERR__STATE: -172,
/** Unknown protocol */
ERR__UNKNOWN_PROTOCOL: -171,
/** Not implemented */
ERR__NOT_IMPLEMENTED: -170,
/** Authentication failure */
ERR__AUTHENTICATION: -169,
/** No stored offset */
ERR__NO_OFFSET: -168,
/** Outdated */
ERR__OUTDATED: -167,
/** Timed out in queue */
ERR__TIMED_OUT_QUEUE: -166,
/** Feature not supported by broker */
ERR__UNSUPPORTED_FEATURE: -165,
/** Awaiting cache update */
ERR__WAIT_CACHE: -164,
/** End internal error codes */
ERR__END: -100,
/** Unknown broker error */
ERR_UNKNOWN: -1,
/** Success */
ERR_NO_ERROR: 0,
/** Offset out of range */
ERR_OFFSET_OUT_OF_RANGE: 1,
/** Invalid message */
ERR_INVALID_MSG: 2,
/** Unknown topic or partition */
ERR_UNKNOWN_TOPIC_OR_PART: 3,
/** Invalid message size */
ERR_INVALID_MSG_SIZE: 4,
/** Leader not available */
ERR_LEADER_NOT_AVAILABLE: 5,
/** Not leader for partition */
ERR_NOT_LEADER_FOR_PARTITION: 6,
/** Request timed out */
ERR_REQUEST_TIMED_OUT: 7,
/** Broker not available */
ERR_BROKER_NOT_AVAILABLE: 8,
/** Replica not available */
ERR_REPLICA_NOT_AVAILABLE: 9,
/** Message size too large */
ERR_MSG_SIZE_TOO_LARGE: 10,
/** StaleControllerEpochCode */
ERR_STALE_CTRL_EPOCH: 11,
/** Offset metadata string too large */
ERR_OFFSET_METADATA_TOO_LARGE: 12,
/** Broker disconnected before response received */
ERR_NETWORK_EXCEPTION: 13,
/** Group coordinator load in progress */
ERR_GROUP_LOAD_IN_PROGRESS: 14,
/** Group coordinator not available */
ERR_GROUP_COORDINATOR_NOT_AVAILABLE: 15,
/** Not coordinator for group */
ERR_NOT_COORDINATOR_FOR_GROUP: 16,
/** Invalid topic */
ERR_TOPIC_EXCEPTION: 17,
/** Message batch larger than configured server segment size */
ERR_RECORD_LIST_TOO_LARGE: 18,
/** Not enough in-sync replicas */
ERR_NOT_ENOUGH_REPLICAS: 19,
/** Message(s) written to insufficient number of in-sync replicas */
ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND: 20,
/** Invalid required acks value */
ERR_INVALID_REQUIRED_ACKS: 21,
/** Specified group generation id is not valid */
ERR_ILLEGAL_GENERATION: 22,
/** Inconsistent group protocol */
ERR_INCONSISTENT_GROUP_PROTOCOL: 23,
/** Invalid group.id */
ERR_INVALID_GROUP_ID: 24,
/** Unknown member */
ERR_UNKNOWN_MEMBER_ID: 25,
/** Invalid session timeout */
ERR_INVALID_SESSION_TIMEOUT: 26,
/** Group rebalance in progress */
ERR_REBALANCE_IN_PROGRESS: 27,
/** Commit offset data size is not valid */
ERR_INVALID_COMMIT_OFFSET_SIZE: 28,
/** Topic authorization failed */
ERR_TOPIC_AUTHORIZATION_FAILED: 29,
/** Group authorization failed */
ERR_GROUP_AUTHORIZATION_FAILED: 30,
/** Cluster authorization failed */
ERR_CLUSTER_AUTHORIZATION_FAILED: 31
};
for (var key in librdkafka.errorCodes) {
// Skip it if it doesn't start with ErrorCode
if (key.indexOf('ErrorCode::') !== 0) {
continue;
}
// Replace/add it if there are any discrepancies
var newKey = key.replace('ErrorCode::', '');
LibrdKafkaError.codes[newKey] = librdkafka.errorCodes[key];
}
/**
* Representation of a librdkafka error
*
* This can be created by giving either another error
* to piggy-back on. In this situation it tries to parse
* the error string to figure out the intent. However, more usually,
* it is constructed by an error object created by a C++ Baton.
*
* @param {object|error} e - An object or error to wrap
* @property {string} message - The error message
* @property {number} code - The error code.
* @property {string} origin - The origin, whether it is local or remote
* @constructor
*/
function LibrdKafkaError(e) {
if (!(this instanceof LibrdKafkaError)) {
return new LibrdKafkaError(e);
}
if (typeof e === 'number') {
this.message = librdkafka.err2str(e);
this.code = e;
this.errno = e;
if (e >= LibrdKafkaError.codes.ERR__END) {
this.origin = 'local';
} else {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else if (!util.isError(e)) {
// This is the better way
this.message = e.message;
this.code = e.code;
this.errno = e.code;
if (e.code >= LibrdKafkaError.codes.ERR__END) {
this.origin = 'local';
} else {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else {
var message = e.message;
var parsedMessage = message.split(': ');
var origin, msg;
if (parsedMessage.length > 1) {
origin = parsedMessage[0].toLowerCase();
msg = parsedMessage[1].toLowerCase();
} else {
origin = 'unknown';
msg = message.toLowerCase();
}
// special cases
if (msg === 'consumer is disconnected' || msg === 'producer is disconnected') {
this.origin = 'local';
this.code = LibrdKafkaError.codes.ERR__STATE;
this.errno = this.code;
this.message = msg;
} else {
this.origin = origin;
this.message = msg;
this.code = typeof e.code === 'number' ? e.code : -1;
this.errno = this.code;
this.stack = e.stack;
}
}
}
function createLibrdkafkaError(e) {
return new LibrdKafkaError(e);
}
function errorWrap(errorCode, intIsError) {
var returnValue = true;
if (intIsError) {
returnValue = errorCode;
errorCode = typeof errorCode === 'number' ? errorCode : 0;
}
if (errorCode !== LibrdKafkaError.codes.ERR_NO_ERROR) {
var e = LibrdKafkaError.create(errorCode);
throw e;
}
return returnValue;
}