/*
* Copyright 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
(function () {
var JavaWrapper = require(EclairJS_Globals.NAMESPACE + '/JavaWrapper');
var Logger = require(EclairJS_Globals.NAMESPACE + '/Logger');
var Utils = require(EclairJS_Globals.NAMESPACE + '/Utils');
var SparkConf = require(EclairJS_Globals.NAMESPACE + '/SparkConf');
var logger = Logger.getLogger("SparkContext_js");
var initSparkContext = function (conf) {
/*
if (typeof kernel !== 'undefined') {
if (kernel.javaSparkContext() != null) {
return kernel.javaSparkContext();
} else {
kernel.createSparkContext(Utils.unwrapObject(conf));
return kernel.javaSparkContext();
}
}
*/
/*
* Create a new JavaSparkContext from a conf
*
*/
var jvmSC = new org.apache.spark.api.java.JavaSparkContext(Utils.unwrapObject(conf));
return jvmSC
};
/**
*
* @constructor
* @memberof module:eclairjs
* @classdesc A JavaScript-friendly version of SparkContext that returns RDDs
* Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.
* This limitation may eventually be removed; see SPARK-2243 for more details.
* @param {module:eclairjs.SparkConf} conf - a object specifying Spark parameters
*/
var SparkContext = function () {
var jvmObj;
this.customModsAdded = false; // only add the big zip of all non-core required mods once per SparkContext
if (arguments.length == 2) {
var conf = new SparkConf()
conf.setMaster(arguments[0])
conf.setAppName(arguments[1])
jvmObj = initSparkContext(conf)
} else if (arguments.length == 1 &&
(arguments[0] instanceof org.apache.spark.api.java.JavaSparkContext)) {
jvmObj = arguments[0];
} else {
jvmObj = initSparkContext(arguments[0])
}
JavaWrapper.call(this, jvmObj);
logger.debug(this.version());
};
SparkContext.prototype = Object.create(JavaWrapper.prototype);
//Set the "constructor" property to refer to SparkContext
SparkContext.prototype.constructor = SparkContext;
/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
* @returns {module:eclairjs.SparkConf}
*/
SparkContext.prototype.getConf = function () {
var javaObject = this.getJavaObject().getConf();
return new SparkConf(javaObject);
};
/**
* @returns {string[]}
*/
SparkContext.prototype.jars = function () {
return this.getJavaObject().jars();
};
/**
* @returns {string[]}
*/
SparkContext.prototype.files = function () {
return this.getJavaObject().files();
};
/**
* @returns {string}
*/
SparkContext.prototype.master = function () {
return this.getJavaObject().master();
};
/**
* @returns {string}
*/
SparkContext.prototype.appName = function () {
return this.getJavaObject().appName();
};
/**
* @returns {boolean}
*/
SparkContext.prototype.isLocal = function () {
return this.getJavaObject().isLocal();
};
/**
* @returns {boolean} true if context is stopped or in the midst of stopping.
*/
SparkContext.prototype.isStopped = function () {
return this.getJavaObject().isStopped();
};
/**
* @returns {SparkStatusTracker}
*/
SparkContext.prototype.statusTracker = function () {
var javaObject = this.getJavaObject().statusTracker();
return Utils.javaToJs(javaObject);
};
/**
* @returns {string}
* @function
* @name module:eclairjs.SparkContext#uiWebUrl
*/
SparkContext.prototype.uiWebUrl = function() {
return this.getJavaObject().uiWebUrl();
};
/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* )
* @returns {string}
*/
SparkContext.prototype.applicationId = function () {
return this.getJavaObject().applicationId();
};
/**
* @returns {string}
*/
SparkContext.prototype.applicationAttemptId = function () {
return this.getJavaObject().applicationAttemptId();
};
/**
* @param {string} logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
SparkContext.prototype.setLogLevel = function (logLevel) {
this.getJavaObject().setLogLevel(logLevel);
};
/**
* Set a local property that affects jobs submitted from this thread, such as the Spark fair
* scheduler pool. User-defined properties may also be set here. These properties are propagated
* through to worker tasks and can be accessed there via
* [[org.apache.spark.TaskContext#getLocalProperty]].
*
* These properties are inherited by child threads spawned from this thread. This
* may have unexpected consequences when working with thread pools. The standard java
* implementation of thread pools have worker threads spawn other worker threads.
* As a result, local properties may propagate unpredictably.
* @param {string}
* @param {string}
*/
SparkContext.prototype.setLocalProperty = function (key, value) {
this.getJavaObject().setLocalProperty(key, value);
};
/**
* Get a local property set in this thread, or null if it is missing. See
* {@link setLocalProperty}.
* @param {string}
* @returns {string}
*/
SparkContext.prototype.getLocalProperty = function (key) {
return this.getJavaObject().getLocalProperty(key);
};
/**
* @param {string}
*/
SparkContext.prototype.setJobDescription = function (value) {
this.getJavaObject().setJobDescription(value);
};
/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
*
* Often, a unit of execution in an application consists of multiple Spark actions or jobs.
* Application programmers can use this method to group all those jobs together and give a
* group description. Once set, the Spark web UI will associate such jobs with this group.
*
* The application can also use {@link cancelJobGroup} to cancel all
* running jobs in this group. For example,
* @param {string}
* @param {string}
* @param {boolean}
*/
SparkContext.prototype.setJobGroup = function (groupId, description, interruptOnCancel) {
this.getJavaObject().setJobGroup(groupId, description, interruptOnCancel);
};
/**
* clearJobGroup
*/
SparkContext.prototype.clearJobGroup = function () {
this.getJavaObject().clearJobGroup();
};
/**
* Create an {@link Accumulable} shared variable of the given type, to which tasks can "add" values with add.
* Only the master can access the accumuable's value.
*
* @param {object} initialValue
* @param {module:eclairjs.AccumulableParam} param
* @param {string} name of the accumulator for display in Spark's web UI.
* @returns {module:eclairjs.Accumulable}
*/
SparkContext.prototype.accumulable = function (initialValue, param, name) {
var Accumulable = require(EclairJS_Globals.NAMESPACE + '/Accumulable');
return new Accumulable(initialValue, param, name);
};
/**
* Create an {@link Accumulator} variable, which tasks can "add" values to using the add method.
* Only the master can access the accumulator's value.
*
* @param {int | float} initialValue
* @param {string | AccumulableParam} [name] of the accumulator for display in Spark's web UI. or param. defaults to FloatAccumulatorParam
* @param {module:eclairjs.AccumulableParam} [param] defaults to FloatAccumulatorParam, use only if also specifying name
* @returns {module:eclairjs.Accumulator}
*/
SparkContext.prototype.accumulator = function () {
var Accumulator = require(EclairJS_Globals.NAMESPACE + '/Accumulator');
var FloatAccumulatorParam = require(EclairJS_Globals.NAMESPACE + '/FloatAccumulatorParam');
var initialValue = arguments[0];
var name;
var param = new FloatAccumulatorParam();
logger.debug("accumulator " , initialValue);
if (arguments[1]) {
if (typeof arguments[1] === "string") {
name = arguments[1];
if (arguments[2]) {
param = arguments[2];
}
} else {
param = arguments[1];
}
}
return new Accumulator(initialValue, param, name);
};
/**
* Register the given accumulator. Note that accumulators must be registered before use, or it
* will throw exception.
* @param {module:eclairjs/util.AccumulatorV2} acc
* @param {string} [name]
* @function
* @name module:eclairjs.SparkContext#register
*/
SparkContext.prototype.register = function(acc,name) {
var acc_uw = Utils.unwrapObject(acc);
if (name)
this.getJavaObject().register(acc_uw,name);
else
this.getJavaObject().register(acc_uw);
};
/**
* Create an Accumulator integer variable, which tasks can "add" values to using the add method.
* Only the master can access the accumulator's value.
* @param {int} initialValue
* @param {string} name of the accumulator for display in Spark's web UI.
* @returns {module:eclairjs.Accumulator}
*/
SparkContext.prototype.intAccumulator = function (initialValue, name) {
var Accumulator = require(EclairJS_Globals.NAMESPACE + '/Accumulator');
var IntAccumulatorParam = require(EclairJS_Globals.NAMESPACE + '/IntAccumulatorParam');
return new Accumulator(initialValue, new IntAccumulatorParam(), name);
};
/**
* Create an Accumulator float variable, which tasks can "add" values to using the add method.
* Only the master can access the accumulator's value.
* @param {float} initialValue
* @param {string} name of the accumulator for display in Spark's web UI.
* @returns {module:eclairjs.Accumulator}
*/
SparkContext.prototype.floatAccumulator = function (initialValue, name) {
var Accumulator = require(EclairJS_Globals.NAMESPACE + '/Accumulator');
var FloatAccumulatorParam = require(EclairJS_Globals.NAMESPACE + '/FloatAccumulatorParam');
return new Accumulator(initialValue, new FloatAccumulatorParam(), name);
};
/**
* Add a file to be downloaded with this Spark job on every node. The path passed can be either a local file,
* a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.
* To access the file in Spark jobs, use SparkFiles.get(fileName) to find its download location.
* @param {string} path - Path to the file
*/
SparkContext.prototype.addFile = function (path) {
this.getJavaObject().addFile(path);
};
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. The path passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.
* @param {string} path - Path to the jar
*/
SparkContext.prototype.addJar = function (path) {
//public void addJar(java.lang.String path)
this.getJavaObject().addJar(path);
};
/**
* Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
* @param {object} value JSON object.
* @returns {module:eclairjs/broadcast.Broadcast}
*/
SparkContext.prototype.broadcast = function (value) {
//var v = Utils.unwrapObject(value);
var v = JSON.stringify(value);
return Utils.javaToJs(this.getJavaObject().broadcast(v));
};
/**
* Returns a list of file paths that are added to resources.
* @returns {string[]}
* @function
* @name module:eclairjs.SparkContext#listFiles
*/
SparkContext.prototype.listFiles = function() {
return this.getJavaObject().listFiles();
};
/**
* Distribute a local Scala collection to form an RDD.
* @param {array} list
* @param {integer} [numSlices]
* @returns {module:eclairjs.RDD}
*/
SparkContext.prototype.parallelize = function (list, numSlices) {
//public <T> JavaRDD<T> parallelize(java.util.List<T> list, int numSlices)
var list_uw = [];
list.forEach(function (item) {
list_uw.push(Utils.unwrapObject(item));
//list_uw.push(Serialize.jsToJava(item));
});
if (numSlices) {
return Utils.javaToJs(this.getJavaObject().parallelize(list_uw, numSlices));
} else {
return Utils.javaToJs(this.getJavaObject().parallelize(list_uw));
}
};
/**
* Distribute a local collection to form an RDD.
* @param {array} list array of Tuple 2
* @param {integer} numSlices
* @returns {module:eclairjs.PairRDD}
*/
SparkContext.prototype.parallelizePairs = function (list, numSlices) {
//public <T> JavaRDD<T> parallelize(java.util.List<T> list, int numSlices)
var list_uw = [];
list.forEach(function (item) {
list_uw.push(Utils.unwrapObject(item));
});
if (numSlices) {
return Utils.javaToJs(this.getJavaObject().parallelizePairs(list_uw, numSlices));
} else {
return Utils.javaToJs(this.getJavaObject().parallelizePairs(list_uw));
}
};
/**
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
* `step` every element.
*
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
*
* @param {number} start the start value.
* @param {number} end the end value.
* @param {number} step the incremental step
* @param {number} numSlices the partition number of the new RDD.
* @returns {module:eclairjs.RDD}
*/
SparkContext.prototype.range = function (start, end, step, numSlices) {
var javaObject = this.getJavaObject().range(start, end, step, numSlices);
return new Utils.javaToJs(javaObject);
};
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI,
* and return it as an RDD of Strings.
* @param {string} path - path to file
* @param {int} [minPartitions]
* @returns {module:eclairjs.RDD}
*/
SparkContext.prototype.textFile = function (path, minPartitions) {
if (minPartitions) {
return Utils.javaToJs(this.getJavaObject().textFile(path, minPartitions));
} else {
return Utils.javaToJs(this.getJavaObject().textFile(path));
}
};
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* @example
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
*
*
* Do `var rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* @example
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
*
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
* @note On some filesystems, `.../path/*` can be a more efficient way to read all files
* in a directory rather than `.../path/` or `.../path`
*
* @param {string} path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param {number} minPartitions A suggestion value of the minimal splitting number for input data.
* @returns {module:eclairjs.RDD}
*/
SparkContext.prototype.wholeTextFiles = function (path, minPartitions) {
var javaObject = this.getJavaObject().wholeTextFiles(path, minPartitions);
return Utils.javaToJs(javaObject);
};
/**
* Set the directory under which RDDs are going to be checkpointed.
* The directory must be a HDFS path if running on a cluster.
* @param {string} dir
*/
SparkContext.prototype.setCheckpointDir = function (dir) {
this.getJavaObject().setCheckpointDir(dir);
};
/**
* Shut down the SparkContext.
*/
SparkContext.prototype.stop = function () {
this.getJavaObject().stop();
};
/**
* The version of EclairJS and Spark on which this application is running.
* @returns {string}
*/
SparkContext.prototype.version = function () {
var javaVersion = java.lang.System.getProperty("java.version");
var jv = javaVersion.split(".");
var wrongJavaVersionString = "Java 1.8.0_60 or greater required for EclairJS";
var wrongSparkVersionString = "Spark 1.5.1 or greater required for EclairJS";
if (jv[0] < 2) {
if (jv[0] == 1) {
if (jv[1] < 8) {
throw wrongJavaVersionString;
} else {
if (jv[1] == 8) {
// we are at 1.8
var f = jv[2]
var fix = f.split("_");
if ((fix[0] < 1) && (fix[1] < 60)) {
// less than 1.8.0_60
throw wrongJavaVersionString;
}
} else {
// 1.9 or greater
}
}
} else {
throw wrongJavaVersionString;
}
} else {
// versions is 2.0 or greater
}
var sparkVersion = this.getJavaObject().version();
var sv = sparkVersion.split(".");
if (sv[0] < 2) {
if (sv[0] == 1) {
if (sv[1] < 5) {
throw wrongSparkVersionString;
} else {
if (sv[1] == 5) {
// we are at 1.5
if (sv[2] < 1) {
// less than 1.5.1
throw wrongSparkVersionString;
}
} else {
// 1.5 or greater
}
}
} else {
throw wrongSparkVersionString;
}
} else {
// versions is 2.0 or greater
}
return "EclairJS-nashorn 0.9 Spark " + sparkVersion;
};
/**
* Zip up a file in a directory to preserve it's path and add it to worker node for download via addFile.
*/
SparkContext.prototype.addModule = function (module) {
//print("SparkContext.addModule: "+module.toString());
//var folder = module.modname.slice(0, module.modname.lastIndexOf("\/")),
var parent = ModuleUtils.getParent(module);
//print("SparkContext.addModule parent: "+parent);
var folder = (parent && parent.subdir && module.subdir.indexOf(parent.subdir) > -1) ? parent.subdir : (module.subdir || "."),
zipfile = parent && parent.zipfile ? parent.zipfile.replace(".zip", "_child_" + Date.now() + ".zip") : "module_" + Date.now() + ".zip",
filename = module.id.slice(module.id.lastIndexOf("\/") + 1, module.id.length);
//print("SparkContext.addModule folder: "+folder);
//print("SparkContext.addModule filename: "+filename);
try {
org.eclairjs.nashorn.Utils.zipFile(folder, zipfile, [filename]);
//print("SparkContext.addModule zipped file now going to try and addFile: "+zipfile);
this.getJavaObject().addFile(zipfile);
module.zipfile = zipfile;
} catch (exc) {
print("Cannot add module: " + module.modname);
print(exc);
}
};
/**
* Zip up all required files not in JAR to preserve paths and add it to worker node for download via addFile.
*/
SparkContext.prototype.addCustomModules = function () {
// The big zip of all non-core required mods will be loaded only once per SparkContext.
// Note: If the temp file already exists for it, an exception is thrown and can be ignored.
var mods = ModuleUtils.getModulesByType({type: "core", value: false});
logger.debug("addingCustomModules: ",mods.toString());
var folder = ".", zipfile = ModuleUtils.defaultZipFile, filenames = [];
mods.forEach(function (mod) {
filenames.push(mod.id.slice(mod.id.lastIndexOf("\/") + 1, mod.id.length));
});
logger.debug("SparkContext.addCustomModules folder: ",folder);
logger.debug("SparkContext.addCustomModules filenames: ",filenames.toString());
try {
org.eclairjs.nashorn.Utils.zipFile(folder, zipfile, filenames);
this.getJavaObject().addFile(zipfile);
logger.debug("Custom modules have been added");
} catch (exc) {
logger.debug("Custom modules have already been added");
}
};
/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and BytesWritable
* values that contain a serialized partition. This is still an experimental storage format and may not be supported
* exactly as is in future releases.
* @param {string} path
* @param {integer} [minPartitions]
* @returns {module:eclairjs.RDD}
*/
SparkContext.prototype.objectFile = function (path, minPartitions) {
var javaRdd;
if(minPartitions) {
javaRdd = this.getJavaObject().objectFile(path, minPartitions);
} else {
javaRdd = this.getJavaObject().objectFile(path);
}
return Utils.javaToJs(javaRdd);
};
/**
* A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you plan to set some global configurations for all Hadoop RDDs.
* @example
* var hadoopConf = sparkContext.hadoopConfiguration();
* hadoopConf.set("fs.swift.service.softlayer.auth.url", "https://identity.open.softlayer.com/v3/auth/tokens");
* hadoopConf.set("fs.swift.service.softlayer.auth.endpoint.prefix", "endpoints");
* hadoopConf.set("fs.swift.service.softlayer.tenant", "productid"); // IBM BlueMix Object Store product id
* hadoopConf.set("fs.swift.service.softlayer.username", "userid"); // IBM BlueMix Object Store user id
* hadoopConf.set("fs.swift.service.softlayer.password", "secret"); // IBM BlueMix Object Store password
* hadoopConf.set("fs.swift.service.softlayer.apikey", "secret"); // IBM BlueMix Object Store password
*
* var rdd = sparkContext.textFile("swift://wordcount.softlayer/dream.txt").cache();
*
* @returns {org.apache.hadoop.conf.Configuration}
* @private
*/
SparkContext.prototype.hadoopConfiguration = function () {
var javaRdd = this.getJavaObject().hadoopConfiguration();
return Utils.javaToJs(javaRdd);
};
/**
* Set the value of the name property of the Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you plan to set some global configurations for all Hadoop RDDs.
* @example
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.auth.url", "https://identity.open.softlayer.com/v3/auth/tokens");
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.auth.endpoint.prefix", "endpoints");
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.tenant", "productid"); // IBM BlueMix Object Store product id
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.username", "userid"); // IBM BlueMix Object Store user id
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.password", "secret"); // IBM BlueMix Object Store password
* sparkContext.setHadoopConfiguration("fs.swift.service.softlayer.apikey", "secret"); // IBM BlueMix Object Store password
*
* var rdd = sparkContext.textFile("swift://wordcount.softlayer/dream.txt").cache();
*
* @param key
* @param value
* @returns {void}
*/
SparkContext.prototype.setHadoopConfiguration = function (key, value) {
this.getJavaObject().hadoopConfiguration().set(key, value);
};
/**
* Get the value of the name property of the Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you plan to set some global configurations for all Hadoop RDDs.
* , null if no such property exists.
* @param key
* @returns {string}
*/
SparkContext.prototype.getHadoopConfiguration = function (key) {
return this.getJavaObject().hadoopConfiguration().get(key);
};
SparkContext.prototype.toJSON = function () {
var jsonObj = {
"version": this.version(),
"appName": this.appName(),
"master": this.master()
}
return jsonObj;
};
module.exports = SparkContext;
})();