/*
* Copyright 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
(function () {
var JavaWrapper = require(EclairJS_Globals.NAMESPACE + '/JavaWrapper');
var Logger = require(EclairJS_Globals.NAMESPACE + '/Logger');
var Utils = require(EclairJS_Globals.NAMESPACE + '/Utils');
var JavaStreamingContext =
Java.type('org.apache.spark.streaming.api.java.JavaStreamingContext');
var JLinkedList = Java.type('java.util.LinkedList');
var logger = Logger.getLogger("streaming.Duration_js");
/**
* @constructor
* @memberof module:eclairjs/streaming
* @classdesc Main entry point for Spark Streaming functionality. It provides methods used to create DStreams from various input sources.
* It can be either created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf configuration
* (see core Spark documentation), or from an existing org.apache.spark.SparkContext. The associated SparkContext can be accessed
* using context.sparkContext. After creating and transforming DStreams, the streaming computation can be started and stopped using
* context.start() and context.stop(), respectively. context.awaitTermination() allows the current thread to wait for the termination
* of the context by stop() or by an exception.
* @param {SparkContex} sparkContext
* @param {module:eclairjs/streaming.Duration} duration
*/
var StreamingContext = function (sparkContext, duration) {
var jvmObj;
if (arguments.length == 1 && (org.apache.spark.streaming.api.java.JavaStreamingContext))
jvmObj = arguments[0];
else
jvmObj =
new JavaStreamingContext(Utils.unwrapObject(sparkContext),
Utils.unwrapObject(duration)
);
JavaWrapper.call(this, jvmObj);
};
StreamingContext.prototype = Object.create(JavaWrapper.prototype);
StreamingContext.prototype.constructor = StreamingContext;
/**
* The underlying SparkContext
* @returns {module:eclairjs.SparkContext}
*/
StreamingContext.prototype.sparkContext = function () {
var javaObject = this.getJavaObject().sparkContext();
return Utils.javaToJs(javaObject);
};
/**
* Wait for the execution to stop
*/
StreamingContext.prototype.awaitTermination = function () {
this.getJavaObject().awaitTermination();
};
/**
* Wait for the execution to stop, or timeout
* @param {long} millis
* @returns {boolean}
*/
StreamingContext.prototype.awaitTerminationOrTimeout = function (millis) {
return this.getJavaObject().awaitTerminationOrTimeout(millis);
};
/**
* Start the execution of the streams.
*/
StreamingContext.prototype.start = function () {
this.getJavaObject().start();
};
/**
* Stops the execution of the streams.
*/
StreamingContext.prototype.stop = function () {
if (arguments.length == 1) {
this.getJavaObject().stop(arguments[0]);
} else {
this.getJavaObject().stop();
}
};
StreamingContext.prototype.close = function () {
this.getJavaObject().close();
};
/**
* Create a input stream from TCP source hostname:port.
* @param {string} host
* @param {string} port
* @returns {module:eclairjs/streaming/dstream.DStream}
*/
StreamingContext.prototype.socketTextStream = function (host, port) {
var jDStream = this.getJavaObject().socketTextStream(host, port);
return Utils.javaToJs(jDStream, this);
};
/**
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE:
* 1. Changes to the queue after the stream is created will not be recognized.
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
* @param {module:eclairjs.RDD[] } queue Queue of RDDs
* @param {boolean} [oneAtATime=true] Whether only one RDD should be consumed from the queue in every interval
* @param {module:eclairjs.RDD} [defaultRDD] Default RDD is returned by the DStream when the queue is empty
* @returns {module:eclairjs/streaming/dstream.DStream}
*/
StreamingContext.prototype.queueStream = function (queue) {
var jQueue = new JLinkedList();
for (var index in queue)
jQueue.add(Utils.unwrapObject(queue[index]));
var oneAtATime = (arguments.length > 1) ? arguments[1] : true;
var javaObject;
if (arguments.length > 2) {
var defaultRDD_uw = Utils.unwrapObject(arguments[2]);
javaObject = this.getJavaObject().queueStream(jQueue, oneAtATime, defaultRDD_uw);
}
else
javaObject = this.getJavaObject().queueStream(jQueue, oneAtATime);
return Utils.javaToJs(javaObject);
};
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
* @param {string} directory HDFS-compatible directory where the checkpoint data will be reliably stored
*/
StreamingContext.prototype.checkpoint = function (directory) {
this.getJavaObject().checkpoint(directory);
};
//
/// Static Functions
///
///
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param {string} checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param {func} creatingFunc Function to create a new JavaStreamingContext
* @returns {JavaStreamingContext}
*/
StreamingContext.getOrCreate = function (checkpointPath, creatingFunc) {
var bindArgs;
var fn = Utils.createLambdaFunction(creatingFunc, org.eclairjs.nashorn.JSFunction0, this.sparkContext(), bindArgs);
var javaObject = org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(checkpointPath, fn);
return new Utils.javaToJs(javaObject);
};
module.exports = StreamingContext;
})();