quarks.connectors.kafka

Class KafkaConsumer

  • java.lang.Object
    • quarks.connectors.kafka.KafkaConsumer


  • public class KafkaConsumer
    extends java.lang.Object
    KafkaConsumer is a connector for creating a stream of tuples by subscribing to Apache Kafka messaging system topics.

    The connector uses and includes components from the Kafka 0.8.2.2 release. It has been successfully tested against a kafka_2.11-0.9.0.0 server as well. For more information about Kafka see http://kafka.apache.org

    Sample use:

    
     String zookeeperConnect = "localhost:2181";
     String groupId = "myGroupId";
     String topic = "mySensorReadingsTopic";
     
     Map<String,Object> config = new HashMap<>();
     config.put("zookeeper.connect", zookeeperConnect);
     config.put("groupId", groupId);
     
     Topology t = ...
     KafkaConsumer kafka = new KafkaConsumer(t, () -> config);
                  
     // subscribe to a topic where sensor readings are published as JSON,
     // creating a stream of JSON tuples  
     TStream<String> sensorReadingsJson =
                  kafka.subscribe(rec -> rec.value(), topic);
     
     // print the received messages
     sensorReadingsJson.print();
     
    • Constructor Detail

      • KafkaConsumer

        public KafkaConsumer(Topology t,
                             Supplier<java.util.Map<java.lang.String,java.lang.Object>> config)
        Create a consumer connector for subscribing to Kafka topics and creating tuples from the received messages.

        See the Apache Kafka documentation for "Old Consumer Configs" configuration properties at http://kafka.apache.org. Configuration property values are strings.

        The Kafka "Old Consumer" configs are used. Minimal configuration typically includes:

        • zookeeper.connect
        • group.id
        Parameters:
        config - KafkaConsumer configuration information.
    • Method Detail

      • subscribeBytes

        public <T> TStream<T> subscribeBytes(Function<KafkaConsumer.ByteConsumerRecord,T> toTupleFn,
                                             java.lang.String... topics)
        Subscribe to the specified topics and yield a stream of tuples from the published Kafka records.

        Kafka's consumer group management functionality is used to automatically allocate, and dynamically reallocate, the topic's partitions to this connector.

        In line with Kafka's evolving new KafkaConsumer interface, subscribing to a topic advertises a single thread to the server for partition allocation.

        Currently, subscribe*() can only be called once for a KafkaConsumer instance. This restriction will be removed once we migrate to Kafka 0.9.0.0.

        Type Parameters:
        T - tuple type
        Parameters:
        toTupleFn - A function that yields a tuple from a ByteConsumerRecord
        topics - the topics to subscribe to.
        Returns:
        stream of tuples
        Throws:
        java.lang.IllegalArgumentException - for a duplicate or conflicting subscription
      • subscribe

        public <T> TStream<T> subscribe(Function<KafkaConsumer.StringConsumerRecord,T> toTupleFn,
                                        java.lang.String... topics)
        Subscribe to the specified topics and yield a stream of tuples from the published Kafka records.

        Kafka's consumer group management functionality is used to automatically allocate, and dynamically reallocate, the topic's partitions to this connector.

        In line with Kafka's evolving new KafkaConsumer interface, subscribing to a topic advertises a single thread to the server for partition allocation.

        Currently, subscribe*() can only be called once for a KafkaConsumer instance. This restriction will be removed once we migrate to Kafka 0.9.0.0.

        Type Parameters:
        T - tuple type
        Parameters:
        toTupleFn - A function that yields a tuple from a StringConsumerRecord
        topics - the topics to subscribe to.
        Returns:
        stream of tuples
        Throws:
        java.lang.IllegalArgumentException - for a duplicate or conflicting subscription

Copyright IBM 2015,2016 - 2f6ad0e-20160307-0902