/*
* Copyright 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
(function () {
function sslTrustStore() {
var javaHome = java.lang.System.getProperty("java.home") + java.io.File.separator + "lib" + java.io.File.separator + "security" + java.io.File.separator + "cacerts";
print("default location of ssl Trust store is: " + javaHome);
return javaHome;
}
var KAFKA_TOPIC = "kafka.topic"; //Key for name of the kafka topic holding used for publishing events
var KAFKA_USER_NAME = "kafka.user.name";
var KAFKA_USER_PASSWORD = "kafka.user.password";
var MESSAGEHUB_API_KEY = "api_key";
var MESSAGEHUB_REST_URL = "kafka_rest_url";
var Utils = require(EclairJS_Globals.NAMESPACE + '/Utils');
var EclairJsKafkaUtils = Java.type("org.eclairjs.nashorn.EclairJsKafkaUtils");
/**
* @memberof module:eclairjs/streaming/kafka
* @constructor
*/
var KafkaUtils = function () {
};
var StorageLevel = Java.type("org.apache.spark.storage.StorageLevel");
var CommonClientConfigs = Java.type("org.apache.kafka.clients.CommonClientConfigs");
var SslConfigs = Java.type("org.apache.kafka.common.config.SslConfigs");
function createKafkaStream(ssc, group, brokers, topic, kafkaProps) {
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("group.id", group);
kafkaProps.put("auto.offset.reset", "latest");
kafkaProps.put("acks", "-1");
kafkaProps.put("retries", "0");
kafkaProps.put("batch.size", "16384");
kafkaProps.put("linger.ms", "1");
kafkaProps.put("buffer.memory", "33554432");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put(KAFKA_TOPIC, topic);
kafkaProps.put("bootstrap.servers",brokers);
var dstream = EclairJsKafkaUtils.createJavaStream(
ssc.getJavaObject(), kafkaProps, topic, StorageLevel.MEMORY_AND_DISK()
);
return Utils.javaToJs(dstream);
}
KafkaUtils.createMessageHubStream = function(ssc, group, brokers, topic, username, password, api_key) {
var kafkaProps = new java.util.HashMap();
kafkaProps.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"),
kafkaProps.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2"),
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"),
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTrustStore()),
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "changeit"),
kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"),
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL" ),
kafkaProps.put(KAFKA_USER_NAME, username);
kafkaProps.put(KAFKA_USER_PASSWORD, password);
kafkaProps.put(MESSAGEHUB_API_KEY, api_key);
kafkaProps.put(MESSAGEHUB_REST_URL, "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443");
kafkaProps.put("sasl.mechanism","PLAIN");
return createKafkaStream(ssc, group, brokers, topic, kafkaProps);
};
/**
* Create an input stream that pulls messages from Kafka Brokers.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param {StreamingContext} jssc StreamingContext object
* @param {string} group The group id for this consumer
* @param {string} brokers Kafka Brokers "hostname:port,hostname:port,.."
* @param {string} topics Kafka Topic
* @returns {module:eclairjs/streaming/dstream.PairDStream} PairDStream of (Kafka message key, Kafka message value)
*/
KafkaUtils.createStream = function(ssc, group, brokers, topic) {
var kafkaProps = new java.util.HashMap();
return createKafkaStream(ssc, group, brokers, topic, kafkaProps);
};
/**
* NOTE: This currently only works on Kafka key/values which are Strings
*
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
*
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* {@link HasOffsetRanges}).
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
* in the {@link StreamingContext}. The information on consumed offset can be
* recovered from the checkpoint. See the programming guide for details (constraints, etc.).
* - End-to-end semantics: This stream ensures that every records is effectively received and
* transformed exactly once, but gives no guarantees on whether the transformed data are
* outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
* that the output operation is idempotent, or use transactions to output records atomically.
* See the programming guide for more details.
*
* @param {module:eclairjs/streaming.StreamingContext} ssc StreamingContext object
* @param {object} kafkaParams map of Kafka options (key, value). Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "bootstrap.servers"
* to be set with Kafka broker(s) (NOT zookeeper servers), specified in
* host1:port1,host2:port2 form.
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
* @param {string[]} topics Names of the topics to consume
* @returns {module:eclairjs/streaming/dstream.DStream} DStream of (Kafka message key, Kafka message value)
*/
KafkaUtils.createDirectStream = function (ssc, kafkaParams, topics) {
throw "not implemented by ElairJS";
// TODO: Not working with toree spark 2.0, kafka 0.10. Commenting Out.
// Works fine standalone.
/*
//var JavaKakfaUtils = Java.type("org.apache.spark.streaming.kafka010.KafkaUtils");
//var JavaLocationStrategies = Java.type("org.apache.spark.streaming.kafka010.LocationStrategies");
//var JavaConsumerStrategies = Java.type("org.apache.spark.streaming.kafka010.ConsumerStrategies");
var ssc_uw = Utils.unwrapObject(ssc);
var kafkaParams_uw = Utils.createJavaHashMap(kafkaParams);
// TODO: Make sure all streaming examples to use new prop name of "bootstrap.servers"
// instead of "metadata.broker.list".
kafkaParams_uw.put("bootstrap.servers", kafkaParams["bootstrap.servers"] || "unused");
// Remove the reference to "metadata.broker.list" so we don't get Config warning
// TODO: We won't need to do this any more once examples are updated.
kafkaParams_uw.remove("metadata.broker.list");
kafkaParams_uw.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
kafkaParams_uw.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams_uw.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams_uw.put("enable.auto.commit", new java.lang.Boolean(false));
//kafkaParams_uw.put("auto.offset.reset", "latest");
kafkaParams_uw.put("auto.offset.reset", "earliest");
kafkaParams_uw.put("group.id", kafkaParams["groub.id"] || "example");
var topics_uw = Utils.createJavaSet(topics);
// Use PreferConsistent in most cases as it consistently distributes partitions across all executors.
// Use PreferBrokers if your executors are on the same hosts as your Kafka brokers.
//var preferredHosts = ssc_uw.sparkContext().isLocal() ?
//JavaLocationStrategies.PreferBrokers() : JavaLocationStrategies.PreferConsistent();
var preferredHosts = JavaLocationStrategies.PreferBrokers();
var javaObject = JavaKakfaUtils.createDirectStream(ssc_uw,
preferredHosts,
JavaConsumerStrategies.Subscribe(topics_uw, kafkaParams_uw));
return Utils.javaToJs(javaObject, ssc);
*/
};
module.exports = KafkaUtils;
})();