new Producer(conf, topicConf)
Producer class for sending messages to Kafka This is the main entry point for writing data to Kafka. You configure this like you do any other client, with a global configuration and default topic configuration. Once you instantiate this object, you need to connect to it first. This allows you to get the metadata and make sure the connection can be made before you depend on it. After that, problems with the connection will by brought down by using poll, which automatically runs when a transaction is made on the object.
Parameters:
Name | Type | Description |
---|---|---|
conf | object | Key value pairs to configure the producer |
topicConf | object | Key value pairs to create a default topic configuration |
- Source:
Extends:
Members:
_disconnect
Save the base disconnect method here so we can overwrite it and add a flush
- Source:
_metadata :Client~Metadata
Metadata object. Starts out empty but will be filled with information after the initial connect.
Type:
- Inherited From:
- Source:
Methods:
(static) createWriteStream(conf, topicConf, streamOptions) → {ProducerStream}
Create a write stream interface for a producer. This stream does not run in object mode. It only takes buffers of data.
Parameters:
Name | Type | Description |
---|---|---|
conf | object | Key value pairs to configure the producer |
topicConf | object | Key value pairs to create a default topic configuration |
streamOptions | object | Stream options |
- Source:
Returns:
- Type
- ProducerStream
connect(metadataOptions, cb) → {Client}
Connect to the broker and receive its metadata. Connects to a broker by establishing the client and fetches its metadata.
Parameters:
Name | Type | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions | object | Options to be sent to the metadata.
Properties
|
||||||||||||
cb | Client~connectionCallback | Callback that indicates we are done connecting. |
- Inherited From:
- Source:
Returns:
- Type
- Client
connectedTime() → {number}
Find out how long we have been connected to Kafka.
- Inherited From:
- Source:
Returns:
- Type
- number
disconnect(timeout, cb)
Disconnect the producer Flush everything on the internal librdkafka producer buffer. Then disconnect
Parameters:
Name | Type | Description |
---|---|---|
timeout | number | Number of milliseconds to try to flush before giving up. defaults to 5 seconds. |
cb | function | The callback to fire when |
- Overrides:
- Source:
flush(timeout, callback) → {Producer}
Flush the producer Flush everything on the internal librdkafka producer buffer. Do this before disconnects usually
Parameters:
Name | Type | Description |
---|---|---|
timeout | number | Number of milliseconds to try to flush before giving up. |
callback | function | Callback to fire when the flush is done. |
- Source:
Returns:
- Type
- Producer
getClient() → {Connection}
Get the native Kafka client. You probably shouldn't use this, but if you want to execute methods directly on the c++ wrapper you can do it here.
- Inherited From:
- Source:
- See:
-
- connection.cc
Returns:
- Type
- Connection
getLastError() → {LibrdKafkaError}
Get the last error emitted if it exists.
- Inherited From:
- Source:
Returns:
- Type
- LibrdKafkaError
getMetadata(metadataOptions, cb)
Get client metadata.
Note: using a metadataOptions.topic
parameter has a potential side-effect.
A Topic object will be created, if it did not exist yet, with default options
and it will be cached by librdkafka.
A subsequent call to create the topic object with specific options (e.g. acks
) will return
the previous instance and the specific options will be silently ignored.
To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
or the metadata request can be performed for all topics (by omitting metadataOptions.topic
).
Parameters:
Name | Type | Description | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
metadataOptions | object | Metadata options to pass to the client.
Properties
|
|||||||||
cb | Client~metadataCallback | Callback to fire with the metadata. |
- Inherited From:
- Source:
isConnected() → {boolean}
Whether or not we are connected to Kafka.
- Inherited From:
- Source:
Returns:
- Type
- boolean
offsetsForTimes(toppars, timeout, cb)
Query offsets for times from the broker. This function makes a call to the broker to get the offsets for times specified.
Parameters:
Name | Type | Description |
---|---|---|
toppars | Array.<TopicPartition> | Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for |
timeout | number | Number of ms to wait to recieve a response. |
cb | Client~offsetsForTimesCallback | Callback to fire with the filled in offsets. |
- Inherited From:
- Source:
poll() → {Producer}
Poll for events We need to run poll in order to learn about new events that have occurred. This is no longer done automatically when we produce, so we need to run it manually, or set the producer to automatically poll.
- Source:
Returns:
- Type
- Producer
produce(topic, partition, message, key, timestamp, opaque) → {boolean}
Produce a message to Kafka synchronously. This is the method mainly used in this class. Use it to produce a message to Kafka. The first and only parameter of the synchronous variant is the message object. When this is sent off, there is no guarantee it is delivered. If you need guaranteed delivery, change your *acks* settings, or use delivery reports.
Parameters:
Name | Type | Description |
---|---|---|
topic | string | The topic name to produce to. |
partition | number | null | The partition number to produce to. |
message | Buffer | null | The message to produce. |
key | string | The key associated with the message. |
timestamp | number | null | Timestamp to send with the message. |
opaque | object | An object you want passed along with this message, if provided. |
- Source:
- See:
Throws:
-
- Throws a librdkafka error if it failed.
- Type
- LibrdKafkaError
Returns:
- Type
- boolean
queryWatermarkOffsets(topic, partition, timeout, cb)
Query offsets from the broker. This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.
Parameters:
Name | Type | Description |
---|---|---|
topic | string | Topic to recieve offsets from. |
partition | number | Partition of the provided topic to recieve offsets from |
timeout | number | Number of ms to wait to recieve a response. |
cb | Client~watermarkOffsetsCallback | Callback to fire with the offsets. |
- Inherited From:
- Source:
setPollInterval(interval) → {Producer}
Set automatic polling for events. We need to run poll in order to learn about new events that have occurred. If you would like this done on an interval with disconnects and reconnections managed, you can do it here
Parameters:
Name | Type | Description |
---|---|---|
interval | number | Interval, in milliseconds, to poll |
- Source:
Returns:
- Type
- Producer
Type Definitions:
Message
Producer message. This is sent to the wrapper, not received from it
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
message | string | buffer | The buffer to send to Kafka. |
topic | Topic | The Kafka topic to produce to. |
partition | number | The partition to produce to. Defaults to the partitioner |
key | string | The key string to use for the message. |
- Source:
- See:
-
- Consumer~Message
Events
disconnected
Disconnect event. Called after disconnection is finished.
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
connectionOpened | date | when the connection was opened. |
- Inherited From:
- Source:
ready
Ready event. Called when the Client connects successfully
Type:
- object
Properties:
Name | Type | Description |
---|---|---|
name | string | the name of the broker. |
- Inherited From:
- Source: