public class KafkaConsumer
extends java.lang.Object
KafkaConsumer
is a connector for creating a stream of tuples
by subscribing to Apache Kafka messaging system topics.
The connector uses and includes components from the Kafka 0.8.2.2 release. It has been successfully tested against a kafka_2.11-0.9.0.0 server as well. For more information about Kafka see http://kafka.apache.org
Sample use:
String zookeeperConnect = "localhost:2181";
String groupId = "myGroupId";
String topic = "mySensorReadingsTopic";
Map<String,Object> config = new HashMap<>();
config.put("zookeeper.connect", zookeeperConnect);
config.put("groupId", groupId);
Topology t = ...
KafkaConsumer kafka = new KafkaConsumer(t, () -> config);
// subscribe to a topic where sensor readings are published as JSON,
// creating a stream of JSON tuples
TStream<String> sensorReadingsJson =
kafka.subscribe(rec -> rec.value(), topic);
// print the received messages
sensorReadingsJson.print();
Modifier and Type | Class and Description |
---|---|
static interface |
KafkaConsumer.ByteConsumerRecord
A Kafka record with byte[] typed key and value members
|
static interface |
KafkaConsumer.ConsumerRecord<K,V>
A received Kafka record
|
static interface |
KafkaConsumer.StringConsumerRecord
A Kafka record with String typed key and value members
|
Constructor and Description |
---|
KafkaConsumer(Topology t,
Supplier<java.util.Map<java.lang.String,java.lang.Object>> config)
Create a consumer connector for subscribing to Kafka topics
and creating tuples from the received messages.
|
Modifier and Type | Method and Description |
---|---|
<T> TStream<T> |
subscribe(Function<KafkaConsumer.StringConsumerRecord,T> toTupleFn,
java.lang.String... topics)
Subscribe to the specified topics and yield a stream of tuples
from the published Kafka records.
|
<T> TStream<T> |
subscribeBytes(Function<KafkaConsumer.ByteConsumerRecord,T> toTupleFn,
java.lang.String... topics)
Subscribe to the specified topics and yield a stream of tuples
from the published Kafka records.
|
public KafkaConsumer(Topology t, Supplier<java.util.Map<java.lang.String,java.lang.Object>> config)
See the Apache Kafka documentation for "Old Consumer Configs" configuration properties at http://kafka.apache.org. Configuration property values are strings.
The Kafka "Old Consumer" configs are used. Minimal configuration typically includes:
zookeeper.connect
group.id
config
- KafkaConsumer configuration information.public <T> TStream<T> subscribeBytes(Function<KafkaConsumer.ByteConsumerRecord,T> toTupleFn, java.lang.String... topics)
Kafka's consumer group management functionality is used to automatically allocate, and dynamically reallocate, the topic's partitions to this connector.
In line with Kafka's evolving new KafkaConsumer interface, subscribing to a topic advertises a single thread to the server for partition allocation.
Currently, subscribe*() can only be called once for a KafkaConsumer instance. This restriction will be removed once we migrate to Kafka 0.9.0.0.
T
- tuple typetoTupleFn
- A function that yields a tuple from a ByteConsumerRecord
topics
- the topics to subscribe to.java.lang.IllegalArgumentException
- for a duplicate or conflicting subscriptionpublic <T> TStream<T> subscribe(Function<KafkaConsumer.StringConsumerRecord,T> toTupleFn, java.lang.String... topics)
Kafka's consumer group management functionality is used to automatically allocate, and dynamically reallocate, the topic's partitions to this connector.
In line with Kafka's evolving new KafkaConsumer interface, subscribing to a topic advertises a single thread to the server for partition allocation.
Currently, subscribe*() can only be called once for a KafkaConsumer instance. This restriction will be removed once we migrate to Kafka 0.9.0.0.
T
- tuple typetoTupleFn
- A function that yields a tuple from a StringConsumerRecord
topics
- the topics to subscribe to.java.lang.IllegalArgumentException
- for a duplicate or conflicting subscriptionCopyright IBM 2015,2016 - 2f6ad0e-20160307-0902