Source: eclairjs/streaming/kafka/KafkaUtils.js

/*
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

(function () {

    var Utils = require(EclairJS_Globals.NAMESPACE + '/Utils');

    var JavaKakfaUtils = Java.type("org.apache.spark.streaming.kafka.KafkaUtils");

    /**
     * @memberof module:eclairjs/streaming/kafka
     * @constructor
     */
    var KafkaUtils = function () {
    };

    var Class = Java.type("java.lang.Class");
    var StringClass = Class.forName("java.lang.String");
    var StringDecoderClass = Class.forName("kafka.serializer.StringDecoder");


    /**
     * Create an input stream that pulls messages from Kafka Brokers.
     * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
     * @param {treamingContext} jssc       StreamingContext object
     * @param {string} zkQuorum   Zookeeper quorum (hostname:port,hostname:port,..)
     * @param {string} groupId    The group id for this consumer
     * @param {object} topics     Map of (topic_name -> numPartitions) to consume. Each partition is consumed
     *                  in its own thread
     * @returns {module:eclairjs/streaming/dstream.DStream}  DStream of (Kafka message key, Kafka message value)
     */
    KafkaUtils.createStream = function (ssc, zkQuorum, group, topics) {
        var m = Utils.createJavaHashMap(topics, undefined, function (key, value) {
            return new java.lang.Integer(value);
        });
        return Utils.javaToJs(JavaKakfaUtils.createStream(ssc.getJavaObject(),
            zkQuorum,
            group,
            m),
            ssc);
    };


    /**
     *  NOTE: This currently only works on Kafka key/values which are Strings
     *
     * Create an input stream that directly pulls messages from Kafka Brokers
     * without using any receiver. This stream can guarantee that each message
     * from Kafka is included in transformations exactly once (see points below).
     *
     * Points to note:
     *  - No receivers: This stream does not use any receiver. It directly queries Kafka
     *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
     *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
     *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
     *    You can access the offsets used in each batch from the generated RDDs (see
     *    {@link HasOffsetRanges}).
     *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
     *    in the {@link StreamingContext}. The information on consumed offset can be
     *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
     *  - End-to-end semantics: This stream ensures that every records is effectively received and
     *    transformed exactly once, but gives no guarantees on whether the transformed data are
     *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
     *    that the output operation is idempotent, or use transactions to output records atomically.
     *    See the programming guide for more details.
     *
     * @param {module:eclairjs/streaming.StreamingContext} ssc  StreamingContext object
     * @param {object} kafkaParams  map of Kafka options (key, value). Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
     *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
     *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
     *   host1:port1,host2:port2 form.
     *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
     *   to determine where the stream starts (defaults to "largest")
     * @param {string[]} topics  Names of the topics to consume
     * @returns {module:eclairjs/streaming/dstream.DStream}  DStream of (Kafka message key, Kafka message value)
     */
    KafkaUtils.createDirectStream = function (ssc, kafkaParams, topics) {
        var ssc_uw = Utils.unwrapObject(ssc);
        var kafkaParams_uw = Utils.createJavaHashMap(kafkaParams);
        var topics_uw = Utils.createJavaSet(topics);
        var javaObject = JavaKakfaUtils.createDirectStream(ssc_uw,
            StringClass,
            StringClass,
            StringDecoderClass,
            StringDecoderClass,
            kafkaParams_uw, topics_uw);
        return Utils.javaToJs(javaObject, ssc);
    };

    module.exports = KafkaUtils;

})();