new KafkaConsumer(conf, topicConf)
KafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka. You configure this like you do any other client, with a global configuration and default topic configuration. Once you instantiate this object, connecting will open a socket. Data will not be read until you tell the consumer what topics you want to read from.
Parameters:
Name | Type | Description |
---|---|---|
conf | object | Key value pairs to configure the consumer |
topicConf | object | Key value pairs to create a default topic configuration |
- Source:
Extends:
Members:
_metadata :Client~Metadata
Metadata object. Starts out empty but will be filled with information after the initial connect.
Type:
- Inherited From:
- Source:
Methods:
(static) createReadStream(conf, topicConf, streamOptions) → {KafkaConsumerStream}
Get a stream representation of this KafkaConsumer
Parameters:
Name | Type | Description | ||||||
---|---|---|---|---|---|---|---|---|
conf | object | Key value pairs to configure the consumer | ||||||
topicConf | object | Key value pairs to create a default topic configuration | ||||||
streamOptions | object | Stream options
Properties
|
- Source:
- See:
-
- TopicReadable
Returns:
- Type
- KafkaConsumerStream
assign(assignments) → {Client}
Assign the consumer specific partitions and topics
Parameters:
Name | Type | Description |
---|---|---|
assignments | array | Assignments array. Should contain objects with topic and partition set. |
- Source:
Returns:
- Type
- Client
assignments() → {array}
Get the assignments for the consumer
- Source:
Returns:
- Type
- array
commit(topicPartition) → {KafkaConsumer}
Commit a topic partition or all topic partitions that have been read If you provide a topic partition, it will commit that. Otherwise, it will commit all read offsets for all topic partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartition | object | array | null | Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets. |
- Source:
Throws:
Returns:
- Type
- KafkaConsumer
commitMessage(msg) → {KafkaConsumer}
Commit a message This is basically a convenience method to map commit properly. We need to add one to the offset in this case
Parameters:
Name | Type | Description |
---|---|---|
msg | object | Message object to commit |
- Source:
Throws:
Returns:
- Type
- KafkaConsumer
commitMessageSync(msg) → {KafkaConsumer}
Commit a message synchronously
Parameters:
Name | Type | Description |
---|---|---|
msg | object | A message object to commit. |
Throws:
-
- if the commit fails
- Type
- LibrdKafkaError
Returns:
- Type
- KafkaConsumer
commitSync(topicPartition) → {KafkaConsumer}
Commit a topic partition (or all topic partitions) synchronously
Parameters:
Name | Type | Description |
---|---|---|
topicPartition | object | array | null | Topic partition object to commit, list of topic partitions, or null if you want to commit all read offsets. |
- Source:
Throws:
-
- if the commit fails
- Type
- LibrdKafkaError
Returns:
- Type
- KafkaConsumer
committed(toppars, timeout, cb) → {Client}
Get a current list of the committed offsets per topic partition Returns an array of objects in the form of a topic partition list
Parameters:
Name | Type | Description |
---|---|---|
toppars | Array.<TopicPartition> | Topic partition list to query committed offsets for. Defaults to the current assignment |
timeout | number | Number of ms to block before calling back and erroring |
cb | function | Callback method to execute when finished or timed out |
- Source:
Returns:
- Type
- Client
connect(metadataOptions, cb) → {Client}
Connect to the broker and receive its metadata. Connects to a broker by establishing the client and fetches its metadata.
Parameters:
Name | Type | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions | object | Options to be sent to the metadata.
Properties
|
||||||||||||
cb | Client~connectionCallback | Callback that indicates we are done connecting. |
- Inherited From:
- Source:
Returns:
- Type
- Client
connectedTime() → {number}
Find out how long we have been connected to Kafka.
- Inherited From:
- Source:
Returns:
- Type
- number
consume(cb)
Read messages from Kafka as fast as possible This method keeps a background thread running to fetch the messages as quickly as it can, sleeping only in between EOF and broker timeouts. Use this to get the maximum read performance if you don't care about the stream backpressure.
Parameters:
Name | Type | Description |
---|---|---|
cb | KafkaConsumer~readCallback | Callback to return when a message is fetched. |
- Source:
consume(size, cb)
Read a number of messages from Kafka. This method is similar to the main one, except that it reads a number of messages before calling back. This may get better performance than reading a single message each time in stream implementations. This will keep going until it gets ERR__PARTITION_EOF or ERR__TIMED_OUT so the array may not be the same size you ask for. The size is advisory, but we will not exceed it.
Parameters:
Name | Type | Description |
---|---|---|
size | number | Number of messages to read |
cb | KafkaConsumer~readCallback | Callback to return when work is done. |
- Source:
disconnect() → {function}
Disconnect from the Kafka client. This method will disconnect us from Kafka unless we are already in a disconnecting state. Use this when you're done reading or producing messages on a given client. It will also emit the disconnected event.
- Inherited From:
- Source:
Fires:
Returns:
- Type
- function
getClient() → {Connection}
Get the native Kafka client. You probably shouldn't use this, but if you want to execute methods directly on the c++ wrapper you can do it here.
- Inherited From:
- Source:
- See:
-
- connection.cc
Returns:
- Type
- Connection
getLastError() → {LibrdKafkaError}
Get the last error emitted if it exists.
- Inherited From:
- Source:
Returns:
- Type
- LibrdKafkaError
getMetadata(metadataOptions, cb)
Get client metadata.
Note: using a metadataOptions.topic
parameter has a potential side-effect.
A Topic object will be created, if it did not exist yet, with default options
and it will be cached by librdkafka.
A subsequent call to create the topic object with specific options (e.g. acks
) will return
the previous instance and the specific options will be silently ignored.
To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
or the metadata request can be performed for all topics (by omitting metadataOptions.topic
).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions | object | Metadata options to pass to the client.
Properties
|
|||||||||
cb | Client~metadataCallback | Callback to fire with the metadata. |
- Inherited From:
- Source:
getWatermarkOffsets(topic, partition) → {Client~watermarkOffsets}
Get last known offsets from the client. The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker. If there is no cached offset (either low or high, or both), then this will throw an error.
Parameters:
Name | Type | Description |
---|---|---|
topic | string | Topic to recieve offsets from. |
partition | number | Partition of the provided topic to recieve offsets from |
- Source:
Throws:
-
- Throws when there is no offset stored
- Type
- LibrdKafkaError
Returns:
isConnected() → {boolean}
Whether or not we are connected to Kafka.
- Inherited From:
- Source:
Returns:
- Type
- boolean
offsetsForTimes(toppars, timeout, cb)
Query offsets for times from the broker. This function makes a call to the broker to get the offsets for times specified.
Parameters:
Name | Type | Description |
---|---|---|
toppars | Array.<TopicPartition> | Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for |
timeout | number | Number of ms to wait to recieve a response. |
cb | Client~offsetsForTimesCallback | Callback to fire with the filled in offsets. |
- Inherited From:
- Source:
offsetsStore(topicPartitions)
Store offset for topic partition. The offset will be committed (written) to the offset store according to the auto commit interval, if auto commit is on, or next manual offset if not. enable.auto.offset.store must be set to false to use this API,
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions | Array.<TopicPartition> | Topic partitions with offsets to store offsets for. |
- Source:
- See:
Throws:
-
- Throws when there is no offset stored
- Type
- LibrdKafkaError
pause(topicPartitions)
Pause producing or consumption for the provided list of partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions | Array.<TopicPartition> | List of topics to pause consumption on. |
- Source:
Throws:
-
- Throws when there is no offset stored
- Type
- LibrdKafkaError
position(toppars) → {array}
Get the current offset position of the KafkaConsumer Returns a list of RdKafka::TopicPartitions on success, or throws an error on failure
Parameters:
Name | Type | Description |
---|---|---|
toppars | Array.<TopicPartition> | List of topic partitions to query position for. Defaults to the current assignment |
- Source:
Throws:
Returns:
- Type
- array
queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker. This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.
Parameters:
Name | Type | Description |
---|---|---|
topic | string | Topic to recieve offsets from. |
partition | number | Partition of the provided topic to recieve offsets from |
timeout | number | Number of ms to wait to recieve a response. |
cb | Client~watermarkOffsetsCallback | Callback to fire with the offsets. |
- Inherited From:
- Source:
resume(topicPartitions)
Resume consumption for the provided list of partitions.
Parameters:
Name | Type | Description |
---|---|---|
topicPartitions | Array.<TopicPartition> | List of topic partitions to resume consumption on. |
- Source:
Throws:
-
- Throws when there is no offset stored
- Type
- LibrdKafkaError
seek(toppar, timeout, cb) → {Client}
Seek consumer for topic+partition to offset which is either an absolute or logical offset. Does not return anything, as it is asynchronous. There are special cases with the timeout parameter. The consumer must have previously been assigned to topics and partitions that seek seeks to seek.
Parameters:
Name | Type | Description |
---|---|---|
toppar | TopicPartition | Topic partition to seek. |
timeout | number | Number of ms to block before calling back and erroring. If the parameter is null or 0, the call will not wait for the seek to be performed. Essentially, it will happen in the background with no notification |
cb | function | Callback method to execute when finished or timed out. If the seek timed out, the internal state of the consumer is unknown. |
- Source:
Returns:
- Type
- Client
setDefaultConsumeTimeout(timeoutMs)
Set the default consume timeout provided to c++land
Parameters:
Name | Type | Description |
---|---|---|
timeoutMs | number | number of milliseconds to wait for a message to be fetched |
- Source:
subscribe(topics) → {KafkaConsumer}
Subscribe to an array of topics (synchronously). This operation is pretty fast because it just sets an assignment in librdkafka. This is the recommended way to deal with subscriptions in a situation where you will be reading across multiple files or as part of your configure-time initialization. This is also a good way to do it for streams.
Parameters:
Name | Type | Description |
---|---|---|
topics | array | An array of topics to listen to |
- Source:
Throws:
Returns:
- Type
- KafkaConsumer
subscription() → {array}
Get the current subscription of the KafkaConsumer Get a list of subscribed topics. Should generally match what you passed on via subscribe
- Source:
- See:
-
- KafkaConsumer::subscribe
Throws:
Returns:
- Type
- array
unassign() → {Client}
Unassign the consumer from its assigned partitions and topics.
- Source:
Returns:
- Type
- Client
unsubscribe() → {KafkaConsumer}
Unsubscribe from all currently subscribed topics Before you subscribe to new topics you need to unsubscribe from the old ones, if there is an active subscription. Otherwise, you will get an error because there is an existing subscription.
- Source:
Throws:
Returns:
- Type
- KafkaConsumer
Type Definitions:
Message
KafkaConsumer message. This is the representation of a message read from Kafka.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
value | buffer | the message buffer from Kafka. |
topic | string | the topic name |
partition | number | the partition on the topic the message was on |
offset | number | the offset of the message |
key | string | the message key |
size | number | message size, in bytes. |
timestamp | number | message timestamp |
- Source:
readCallback(err, message)
This callback returns the message read from Kafka.
Parameters:
Name | Type | Description |
---|---|---|
err | LibrdKafkaError | An error, if one occurred while reading the data. |
message | KafkaConsumer~Message |
- Source:
Events
data
Data event. called whenever a message is received.
Type:
- Source:
disconnected
Disconnect event. Called after disconnection is finished.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
connectionOpened | date | when the connection was opened. |
- Inherited From:
- Source:
ready
Ready event. Called when the Client connects successfully
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name | string | the name of the broker. |
- Inherited From:
- Source: