libcudf  24.02.00
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Public Member Functions | List of all members
cudf::io::external::kafka::kafka_consumer Class Reference

libcudf datasource for Apache Kafka More...

#include <kafka_consumer.hpp>

Inheritance diagram for cudf::io::external::kafka::kafka_consumer:
cudf::io::datasource

Public Member Functions

 kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
 Creates an instance of the Kafka consumer object that is in a semi-ready state. More...
 
 kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
 Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. More...
 
std::unique_ptr< cudf::io::datasource::bufferhost_read (size_t offset, size_t size) override
 Returns a buffer with a subset of data from Kafka Topic. More...
 
size_t size () const override
 Returns the size of the data in Kafka buffer. More...
 
size_t host_read (size_t offset, size_t size, uint8_t *dst) override
 Reads a selected range into a preallocated buffer. More...
 
void commit_offset (std::string const &topic, int partition, int64_t offset)
 Commits an offset to a specified Kafka Topic/Partition instance. More...
 
std::map< std::string, int64_t > get_watermark_offset (std::string const &topic, int partition, int timeout, bool cached)
 Retrieve the watermark offset values for a topic/partition. More...
 
std::map< std::string, std::string > current_configs ()
 Retrieve the current Kafka client configurations. More...
 
int64_t get_committed_offset (std::string const &topic, int partition)
 Get the latest offset that was successfully committed to the Kafka broker. More...
 
std::map< std::string, std::vector< int32_t > > list_topics (std::string specific_topic)
 Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved. More...
 
void close (int timeout)
 Close the underlying socket connection to Kafka and clean up system resources. More...
 
void unsubscribe ()
 Stop all active consumption and remove consumer subscriptions to topic/partition instances. More...
 
- Public Member Functions inherited from cudf::io::datasource
virtual ~datasource ()
 Base class destructor.
 
virtual bool supports_device_read () const
 Whether or not this source supports reading directly into device memory. More...
 
virtual bool is_device_read_preferred (size_t size) const
 Estimates whether a direct device read would be more optimal for the given size. More...
 
virtual std::unique_ptr< datasource::bufferdevice_read (size_t offset, size_t size, rmm::cuda_stream_view stream)
 Returns a device buffer with a subset of data from the source. More...
 
virtual size_t device_read (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)
 Reads a selected range into a preallocated device buffer. More...
 
virtual std::future< size_t > device_read_async (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)
 Asynchronously reads a selected range into a preallocated device buffer. More...
 
virtual bool is_empty () const
 Returns whether the source contains any data. More...
 

Additional Inherited Members

- Static Public Member Functions inherited from cudf::io::datasource
static std::unique_ptr< datasourcecreate (std::string const &filepath, size_t offset=0, size_t size=0)
 Creates a source from a file path. More...
 
static std::unique_ptr< datasourcecreate (host_buffer const &buffer)
 Creates a source from a host memory buffer. More...
 
static std::unique_ptr< datasourcecreate (cudf::host_span< std::byte const > buffer)
 Creates a source from a host memory buffer. More...
 
static std::unique_ptr< datasourcecreate (cudf::device_span< std::byte const > buffer)
 Creates a source from a device memory buffer. More...
 
static std::unique_ptr< datasourcecreate (datasource *source)
 Creates a source from an user implemented datasource object. More...
 
template<typename T >
static std::vector< std::unique_ptr< datasource > > create (std::vector< T > const &args)
 Creates a vector of datasources, one per element in the input vector. More...
 

Detailed Description

libcudf datasource for Apache Kafka

Definition at line 40 of file kafka_consumer.hpp.

Constructor & Destructor Documentation

◆ kafka_consumer() [1/2]

cudf::io::external::kafka::kafka_consumer::kafka_consumer ( std::map< std::string, std::string >  configs,
python_callable_type  python_callable,
kafka_oauth_callback_wrapper_type  callable_wrapper 
)

Creates an instance of the Kafka consumer object that is in a semi-ready state.

A consumer in a semi-ready state does not have all required parameters to make successful consumer interactions with the Kafka broker. However in the semi-ready state Kafka metadata operations are still possible. This is useful for clients who plan to only use those metadata operations. This is useful when the need for delayed partition and topic assignment is not known ahead of time and needs to be delayed to as late as possible. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Parameters
configskey/value pairs of librdkafka configurations that will be passed to the librdkafka client
python_callablepython_callable_type pointer to a Python functools.partial object
callable_wrapperkafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.

◆ kafka_consumer() [2/2]

cudf::io::external::kafka::kafka_consumer::kafka_consumer ( std::map< std::string, std::string >  configs,
python_callable_type  python_callable,
kafka_oauth_callback_wrapper_type  callable_wrapper,
std::string const &  topic_name,
int  partition,
int64_t  start_offset,
int64_t  end_offset,
int  batch_timeout,
std::string const &  delimiter 
)

Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Parameters
configskey/value pairs of librdkafka configurations that will be passed to the librdkafka client
python_callablepython_callable_type pointer to a Python functools.partial object
callable_wrapperkafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.
topic_namename of the Kafka topic to consume from
partitionpartition index to consume from between 0 and TOPIC_NUM_PARTITIONS - 1 inclusive
start_offsetseek position for the specified TOPPAR (Topic/Partition combo)
end_offsetposition in the specified TOPPAR to read to
batch_timeoutmaximum (millisecond) read time allowed. If end_offset is not reached before batch_timeout, a smaller subset will be returned
delimiteroptional delimiter to insert into the output between kafka messages, Ex: "\n"

Member Function Documentation

◆ close()

void cudf::io::external::kafka::kafka_consumer::close ( int  timeout)

Close the underlying socket connection to Kafka and clean up system resources.

Exceptions
cudf::logic_erroron failure to close the connection
Parameters
timeoutMax milliseconds to wait on a response

◆ commit_offset()

void cudf::io::external::kafka::kafka_consumer::commit_offset ( std::string const &  topic,
int  partition,
int64_t  offset 
)

Commits an offset to a specified Kafka Topic/Partition instance.

Exceptions
cudf::logic_erroron failure to commit the partition offset
Parameters
[in]topicName of the Kafka topic that the offset should be set for
[in]partitionPartition on the specified topic that should be used
[in]offsetOffset that should be set for the topic/partition pair

◆ current_configs()

std::map<std::string, std::string> cudf::io::external::kafka::kafka_consumer::current_configs ( )

Retrieve the current Kafka client configurations.

Returns
Map<string, string> of key/value pairs of the current client configurations

◆ get_committed_offset()

int64_t cudf::io::external::kafka::kafka_consumer::get_committed_offset ( std::string const &  topic,
int  partition 
)

Get the latest offset that was successfully committed to the Kafka broker.

Parameters
[in]topicTopic name for the topic/partition pair
[in]partitionPartition number of the topic/partition pair
Returns
Latest offset for the specified topic/partition pair

◆ get_watermark_offset()

std::map<std::string, int64_t> cudf::io::external::kafka::kafka_consumer::get_watermark_offset ( std::string const &  topic,
int  partition,
int  timeout,
bool  cached 
)

Retrieve the watermark offset values for a topic/partition.

Parameters
[in]topicName of the Kafka topic that the watermark should be retrieved for
[in]partitionPartition on the specified topic which should be used
[in]timeoutMax milliseconds to wait on a response from the Kafka broker
[in]cachedIf True uses the last retrieved value from the Kafka broker, if False the latest value will be retrieved from the Kafka broker by making a network request.
Returns
The watermark offset value for the specified topic/partition

◆ host_read() [1/2]

std::unique_ptr<cudf::io::datasource::buffer> cudf::io::external::kafka::kafka_consumer::host_read ( size_t  offset,
size_t  size 
)
overridevirtual

Returns a buffer with a subset of data from Kafka Topic.

Parameters
[in]offsetBytes from the start
[in]sizeBytes to read
Returns
The data buffer

Implements cudf::io::datasource.

◆ host_read() [2/2]

size_t cudf::io::external::kafka::kafka_consumer::host_read ( size_t  offset,
size_t  size,
uint8_t *  dst 
)
overridevirtual

Reads a selected range into a preallocated buffer.

Parameters
[in]offsetBytes from the start
[in]sizeBytes to read
[in]dstAddress of the existing host memory
Returns
The number of bytes read (can be smaller than size)

Implements cudf::io::datasource.

◆ list_topics()

std::map<std::string, std::vector<int32_t> > cudf::io::external::kafka::kafka_consumer::list_topics ( std::string  specific_topic)

Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved.

Parameters
[in]specific_topicThe name of the topic for which to retrieve partitions. If empty then the partitions for all topics will be retrieved.
Returns
Map of Kafka topic names with their corresponding list of topic partition values.

◆ size()

size_t cudf::io::external::kafka::kafka_consumer::size ( ) const
overridevirtual

Returns the size of the data in Kafka buffer.

Returns
size_t The size of the source data in bytes

Implements cudf::io::datasource.

◆ unsubscribe()

void cudf::io::external::kafka::kafka_consumer::unsubscribe ( )

Stop all active consumption and remove consumer subscriptions to topic/partition instances.

Exceptions
cudf::logic_erroron failure to unsubscribe from the active partition assignments.

The documentation for this class was generated from the following file: