Class: KafkaConsumer

KafkaConsumer

new KafkaConsumer(conf, topicConf)

KafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka. You configure this like you do any other client, with a global configuration and default topic configuration. Once you instantiate this object, connecting will open a socket. Data will not be read until you tell the consumer what topics you want to read from.

Parameters:

Name Type Description
conf object Key value pairs to configure the consumer
topicConf object Key value pairs to create a default topic configuration
Source:

_metadata :Client~Metadata

Metadata object. Starts out empty but will be filled with information after the initial connect.

Type:
Inherited From:
Source:

(static) createReadStream(conf, topicConf, streamOptions) → {KafkaConsumerStream}

Get a stream representation of this KafkaConsumer

Parameters:

Name Type Description
conf object Key value pairs to configure the consumer
topicConf object Key value pairs to create a default topic configuration
streamOptions object Stream options
Properties
Name Type Description
topics array Array of topics to subscribe to.
Source:
See:
  • TopicReadable
Returns:
- Readable stream that receives messages when new ones become available.
Type
KafkaConsumerStream

assign(assignments) → {Client}

Assign the consumer specific partitions and topics

Parameters:

Name Type Description
assignments array Assignments array. Should contain objects with topic and partition set.
Source:
Returns:
- Returns itself
Type
Client

assignments() → {array}

Get the assignments for the consumer

Source:
Returns:
assignments - Array of topic partitions
Type
array

commit(topicPartition) → {KafkaConsumer}

Commit a topic partition or all topic partitions that have been read If you provide a topic partition, it will commit that. Otherwise, it will commit all read offsets for all topic partitions.

Parameters:

Name Type Description
topicPartition object | array | null Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets.
Source:

Throws:

When commit returns a non 0 error code
Returns:
- returns itself.
Type
KafkaConsumer

commitMessage(msg) → {KafkaConsumer}

Commit a message This is basically a convenience method to map commit properly. We need to add one to the offset in this case

Parameters:

Name Type Description
msg object Message object to commit
Source:

Throws:

When commit returns a non 0 error code
Returns:
- returns itself.
Type
KafkaConsumer

commitMessageSync(msg) → {KafkaConsumer}

Commit a message synchronously

Parameters:

Name Type Description
msg object A message object to commit.
Source:
See:

Throws:

- if the commit fails
Type
LibrdKafkaError
Returns:
- returns itself.
Type
KafkaConsumer

commitSync(topicPartition) → {KafkaConsumer}

Commit a topic partition (or all topic partitions) synchronously

Parameters:

Name Type Description
topicPartition object | array | null Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets.
Source:

Throws:

- if the commit fails
Type
LibrdKafkaError
Returns:
- returns itself.
Type
KafkaConsumer

committed(toppars, timeout, cb) → {Client}

Get a current list of the committed offsets per topic partition Returns an array of objects in the form of a topic partition list

Parameters:

Name Type Description
toppars Array.<TopicPartition> Topic partition list to query committed offsets for. Defaults to the current assignment
timeout number Number of ms to block before calling back and erroring
cb function Callback method to execute when finished or timed out
Source:
Returns:
- Returns itself
Type
Client

connect(metadataOptions, cb) → {Client}

Connect to the broker and receive its metadata. Connects to a broker by establishing the client and fetches its metadata.

Parameters:

Name Type Description
metadataOptions object Options to be sent to the metadata.
Properties
Name Type Description
topic string Topic to fetch metadata for. Empty string is treated as empty.
allTopics boolean Fetch metadata for all topics, not just the ones we know about.
timeout int The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms
cb Client~connectionCallback Callback that indicates we are done connecting.
Inherited From:
Source:
Returns:
- Returns itself.
Type
Client

connectedTime() → {number}

Find out how long we have been connected to Kafka.

Inherited From:
Source:
Returns:
- Milliseconds since the connection has been established.
Type
number

consume(cb)

Read messages from Kafka as fast as possible This method keeps a background thread running to fetch the messages as quickly as it can, sleeping only in between EOF and broker timeouts. Use this to get the maximum read performance if you don't care about the stream backpressure.

Parameters:

Name Type Description
cb KafkaConsumer~readCallback Callback to return when a message is fetched.
Source:

consume(size, cb)

Read a number of messages from Kafka. This method is similar to the main one, except that it reads a number of messages before calling back. This may get better performance than reading a single message each time in stream implementations. This will keep going until it gets ERR__PARTITION_EOF or ERR__TIMED_OUT so the array may not be the same size you ask for. The size is advisory, but we will not exceed it.

Parameters:

Name Type Description
size number Number of messages to read
cb KafkaConsumer~readCallback Callback to return when work is done.
Source:

disconnect() → {function}

Disconnect from the Kafka client. This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client. It will also emit the disconnected event.

Inherited From:
Source:

Fires:

Returns:
- Callback to call when disconnection is complete.
Type
function

getClient() → {Connection}

Get the native Kafka client. You probably shouldn't use this, but if you want to execute methods directly on the c++ wrapper you can do it here.

Inherited From:
Source:
See:
  • connection.cc
Returns:
- The native Kafka client.
Type
Connection

getLastError() → {LibrdKafkaError}

Get the last error emitted if it exists.

Inherited From:
Source:
Returns:
- Returns the LibrdKafkaError or null if one hasn't been thrown.
Type
LibrdKafkaError

getMetadata(metadataOptions, cb)

Get client metadata. Note: using a metadataOptions.topic parameter has a potential side-effect. A Topic object will be created, if it did not exist yet, with default options and it will be cached by librdkafka. A subsequent call to create the topic object with specific options (e.g. acks) will return the previous instance and the specific options will be silently ignored. To avoid this side effect, the topic object can be created with the expected options before requesting metadata, or the metadata request can be performed for all topics (by omitting metadataOptions.topic).

Parameters:

Name Type Description
metadataOptions object Metadata options to pass to the client.
Properties
Name Type Description
topic string Topic string for which to fetch metadata
timeout number Max time, in ms, to try to fetch metadata before timing out. Defaults to 3000.
cb Client~metadataCallback Callback to fire with the metadata.
Inherited From:
Source:

getWatermarkOffsets(topic, partition) → {Client~watermarkOffsets}

Get last known offsets from the client. The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker. If there is no cached offset (either low or high, or both), then this will throw an error.

Parameters:

Name Type Description
topic string Topic to recieve offsets from.
partition number Partition of the provided topic to recieve offsets from
Source:

Throws:

- Throws when there is no offset stored
Type
LibrdKafkaError
Returns:
- Returns an object with a high and low property, specifying the high and low offsets for the topic partition
Type
Client~watermarkOffsets

isConnected() → {boolean}

Whether or not we are connected to Kafka.

Inherited From:
Source:
Returns:
- Whether we are connected.
Type
boolean

offsetsForTimes(toppars, timeout, cb)

Query offsets for times from the broker. This function makes a call to the broker to get the offsets for times specified.

Parameters:

Name Type Description
toppars Array.<TopicPartition> Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for
timeout number Number of ms to wait to recieve a response.
cb Client~offsetsForTimesCallback Callback to fire with the filled in offsets.
Inherited From:
Source:

offsetsStore(topicPartitions)

Store offset for topic partition. The offset will be committed (written) to the offset store according to the auto commit interval, if auto commit is on, or next manual offset if not. enable.auto.offset.store must be set to false to use this API,

Parameters:

Name Type Description
topicPartitions Array.<TopicPartition> Topic partitions with offsets to store offsets for.
Source:
See:

Throws:

- Throws when there is no offset stored
Type
LibrdKafkaError

pause(topicPartitions)

Pause producing or consumption for the provided list of partitions.

Parameters:

Name Type Description
topicPartitions Array.<TopicPartition> List of topics to pause consumption on.
Source:

Throws:

- Throws when there is no offset stored
Type
LibrdKafkaError

position(toppars) → {array}

Get the current offset position of the KafkaConsumer Returns a list of RdKafka::TopicPartitions on success, or throws an error on failure

Parameters:

Name Type Description
toppars Array.<TopicPartition> List of topic partitions to query position for. Defaults to the current assignment
Source:

Throws:

- Throws when an error code came back from native land
Returns:
- TopicPartition array. Each item is an object with an offset, topic, and partition
Type
array

queryWatermarkOffsets(topic, partition, timeout, cb)

Query offsets from the broker. This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

Parameters:

Name Type Description
topic string Topic to recieve offsets from.
partition number Partition of the provided topic to recieve offsets from
timeout number Number of ms to wait to recieve a response.
cb Client~watermarkOffsetsCallback Callback to fire with the offsets.
Inherited From:
Source:

resume(topicPartitions)

Resume consumption for the provided list of partitions.

Parameters:

Name Type Description
topicPartitions Array.<TopicPartition> List of topic partitions to resume consumption on.
Source:

Throws:

- Throws when there is no offset stored
Type
LibrdKafkaError

seek(toppar, timeout, cb) → {Client}

Seek consumer for topic+partition to offset which is either an absolute or logical offset. Does not return anything, as it is asynchronous. There are special cases with the timeout parameter. The consumer must have previously been assigned to topics and partitions that seek seeks to seek.

Parameters:

Name Type Description
toppar TopicPartition Topic partition to seek.
timeout number Number of ms to block before calling back and erroring. If the parameter is null or 0, the call will not wait for the seek to be performed. Essentially, it will happen in the background with no notification
cb function Callback method to execute when finished or timed out. If the seek timed out, the internal state of the consumer is unknown.
Source:
Returns:
- Returns itself
Type
Client

setDefaultConsumeTimeout(timeoutMs)

Set the default consume timeout provided to c++land

Parameters:

Name Type Description
timeoutMs number number of milliseconds to wait for a message to be fetched
Source:

subscribe(topics) → {KafkaConsumer}

Subscribe to an array of topics (synchronously). This operation is pretty fast because it just sets an assignment in librdkafka. This is the recommended way to deal with subscriptions in a situation where you will be reading across multiple files or as part of your configure-time initialization. This is also a good way to do it for streams.

Parameters:

Name Type Description
topics array An array of topics to listen to
Source:

Throws:

- Throws when an error code came back from native land
Returns:
- Returns itself.
Type
KafkaConsumer

subscription() → {array}

Get the current subscription of the KafkaConsumer Get a list of subscribed topics. Should generally match what you passed on via subscribe

Source:
See:
  • KafkaConsumer::subscribe

Throws:

- Throws when an error code came back from native land
Returns:
- Array of strings to show the current assignment
Type
array

unassign() → {Client}

Unassign the consumer from its assigned partitions and topics.

Source:
Returns:
- Returns itself
Type
Client

unsubscribe() → {KafkaConsumer}

Unsubscribe from all currently subscribed topics Before you subscribe to new topics you need to unsubscribe from the old ones, if there is an active subscription. Otherwise, you will get an error because there is an existing subscription.

Source:

Throws:

- Throws when an error code comes back from native land
Returns:
- Returns itself.
Type
KafkaConsumer

Type Definitions:

Message

KafkaConsumer message. This is the representation of a message read from Kafka.

Type:
  • object
Properties:
Name Type Description
value buffer the message buffer from Kafka.
topic string the topic name
partition number the partition on the topic the message was on
offset number the offset of the message
key string the message key
size number message size, in bytes.
timestamp number message timestamp
Source:

readCallback(err, message)

This callback returns the message read from Kafka.

Parameters:

Name Type Description
err LibrdKafkaError An error, if one occurred while reading the data.
message KafkaConsumer~Message
Source:

Events

data

Data event. called whenever a message is received.

Type:

Source:

disconnected

Disconnect event. Called after disconnection is finished.

Type:

  • object
Properties:
Name Type Description
connectionOpened date when the connection was opened.
Inherited From:
Source:

ready

Ready event. Called when the Client connects successfully

Type:

  • object
Properties:
Name Type Description
name string the name of the broker.
Inherited From:
Source: