Source: eclairjs/streaming/dstream/DStream.js

/*
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

(function () {

    var JavaWrapper = require(EclairJS_Globals.NAMESPACE + '/JavaWrapper');
    var Logger = require(EclairJS_Globals.NAMESPACE + '/Logger');
    var Utils = require(EclairJS_Globals.NAMESPACE + '/Utils');
    var logger = Logger.getLogger("streaming.dtream.DStream_js");

    /**
     * @constructor
     * @memberof module:eclairjs/streaming/dstream
     * @classdec Represents a Discretized Stream (DStream), the basic abstraction in Spark Streaming,
     * is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
     * @param {object} jDStream
     */
    var DStream = function (jDStream) {
        var jvmObj = jDStream;
        JavaWrapper.call(this, jvmObj);
    };

    DStream.prototype = Object.create(JavaWrapper.prototype);

    DStream.prototype.constructor = DStream;

    DStream.HashMap = Java.type("java.util.HashMap");
    DStream.UUID = Java.type("java.util.UUID");
    DStream.foreachMap = new DStream.HashMap();
    DStream.unrefRDD = function (refId) {
        var javaObj = DStream.foreachMap.get(refId);
        return Utils.javaToJs(javaObj);
    };

    /**
     * Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER)
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.cache = function () {
        return Utils.javaToJs(this.getJavaObject().cache());
    };
    /**
     * Generate an RDD for the given duration
     * @param {module:eclairjs/streaming.Time} validTime
     * @returns {module:eclairjs.RDD}
     */
    DStream.prototype.compute = function (validTime) {
        return Utils.javaToJs(this.getJavaObject().compute(validTime.getJavaObject()));
    };
    /**
     * Return a new DStream containing only the elements that satisfy a predicate.
     * @param {function} func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.filter = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().filter(fn));
    };
    /**
     * Persist RDDs of this DStream with the storage level
     * @param {module:eclairjs/storage.StorageLevel} [level] (MEMORY_ONLY_SER) by default
     * @return {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.persist = function () {
        if (arguments.length == 1) {
            return Utils.javaToJs(
                this.getJavaObject().persist(arguments[0].getJavaObject()));
        } else {
            return Utils.javaToJs(this.getJavaObject().perist());
        }
    };
    /**
     * Return a new DStream with an increased or decreased level of parallelism.
     * @param {int} numPartitions
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.repartition = function (numPartitions) {
        return Utils.javaToJs(this.getJavaObject().repartition(numPartitions));
    };
    /**
     * Return a new DStream by unifying data of another DStream with this DStream.
     * @param {module:eclairjs/streaming/dstream.DStream} that
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.union = function (that) {
        return Utils.javaToJs(this.getJavaObject().union(Utils.unwrapObject(that)));
    }
    /**
     * Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream.
     * The new DStream generates RDDs with the same interval as this DStream.
     * @param duration - width of the window; must be a multiple of this DStream's interval.
     * @param [slideInterval]
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.window = function (duration) {
        if (arguments.length == 1) {
            return Utils.javaToJs(this.getJavaObject().window(
                Utils.unwrapObject(arguments[0])));
        } else {
            return Utils.javaToJs(this.getJavaObject().window(
                Utils.unwrapObject(arguments[0]),
                Utils.unwrapObject(arguments[1])));
        }
    };
    /**
     * Enable periodic checkpointing of RDDs of this DStream.
     * @param {module:eclairjs/streaming.Duration} interval
     * @return {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.checkpoint = function (interval) {
        return Utils.javaToJs(this.getJavaObject().checkpoint(interval.getJavaObject()));
    };
    /**
     * Return the StreamingContext associated with this DStream
     * @returns {module:eclairjs/streaming.StreamingContext}
     */
    DStream.prototype.context = function () {
        return Utils.javaToJs(this.getJavaObject().context());
    };
    /**
     * Return a new DStream in which each RDD has a single element generated by
     * counting each RDD of this DStream.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.count = function () {
        return Utils.javaToJs(this.getJavaObject().count());
    };
    /**
     * Return a new DStream in which each RDD contains the counts of each distinct
     * value in each RDD of this DStream. Hash partitioning is used to generate the
     * RDDs with Spark's default number of partitions if numPartions is not
     * specified.
     * @param {int} numPartitions
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.countByValue = function () {
        if (arguments.length > 0) {
            return Utils.javaToJs(this.getJavaObject().countByValue(arguments[0]));
        } else {
            return Utils.javaToJs(this.getJavaObject().countByValue());
        }
    };
    /**
     * Return a new DStream in which each RDD contains the count of distinct
     * elements in RDDs in a sliding window over this DStream. Hash partitioning is
     * used to generate the RDDs with Spark's default number of partitions if
     * numPartitions is not specified.
     * @param {module:eclairjs/streaming.Duration} windowDuration
     * @param {module:eclairjs/streaming.Duration} slideDuration
     * @param {int} numPartitions
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.countByValueAndWindow = function () {
        if (arguments.length == 2) {
            return Utils.javaToJs(
                this.getJavaObject().countByValueAndWindow(
                    arguments[0].getJavaObject(),
                    arguments[1].getJavaObject()
                ))
        } else {
            return Utils.javaToJs(
                this.getJavaObject().countByValueAndWindow(
                    arguments[0].getJavaObject(),
                    arguments[1].getJavaObject(),
                    arguments[2]
                ))
        }
    };
    /**
     * Return a new DStream in which each RDD has a single element generated by
     * counting the number of elements in a window over this DStream. windowDuration
     * and slideDuration are as defined in the window() operation. This is
     * equivalent to window(windowDuration, slideDuration).count()
     * @param {module:eclairjs/streaming.Duration} windowDuration
     * @param {module:eclairjs/streaming.Duration} slideDuration
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.countByWindow = function (windowDuration, slideDuration) {
        return Utils.javaToJs(
            this.getJavaObject().countByWindow(
                windowDuration.getJavaObject(),
                slideDuration.getJavaObject()
            ))
    };

    /**
     * Return a new DStream in which each RDD contains the count of distinct elements in
     * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
     * with `numPartitions` partitions.
     * @param {module:eclairjs/streaming.Duration} windowDuration  width of the window; must be a multiple of this DStream's
     *                       batching interval
     * @param {module:eclairjs/streaming.Duration} slideDuration   sliding interval of the window (i.e., the interval after which
     *                       the new DStream will generate RDDs); must be a multiple of this
     *                       DStream's batching interval
     * @param {number} [numPartitions]   number of partitions of each RDD in the new DStream.
     * @returns {module:eclairjs/streaming/dstream.PairDStream}
     */
    DStream.prototype.countByValueAndWindow = function (windowDuration, slideDuration, numPartitions) {
        var windowDuration_uw = Utils.unwrapObject(windowDuration);
        var slideDuration_uw = Utils.unwrapObject(slideDuration);

        if (arguments[2]) {
            var javaObject = this.getJavaObject().countByValueAndWindow(windowDuration_uw, slideDuration_uw, numPartitions);
            return Utils.javaToJs(javaObject);
        } else {
            var javaObject = this.getJavaObject().countByValueAndWindow(windowDuration_uw, slideDuration_uw);
            return Utils.javaToJs(javaObject);
        }
    };

    /**
     * Return a new DStream by first applying a function to all elements of this
     * DStream, and then flattening the results.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.flatMap = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSFlatMapFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().flatMap(fn));
    };
    /**
     * Return a new DStream by applying a function to all elements of this DStream,
     * and then flattening the results
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.PairDStream}
     */
    DStream.prototype.flatMapToPair = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSPairFlatMapFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().flatMapToPair(fn));
    };
    /**
     * Return a new DStream in which each RDD is generated by applying glom() to
     * each RDD of this DStream. Applying glom() to an RDD coalesces all elements
     * within each partition into an array.
     */
    DStream.prototype.glom = function () {
        return Utils.javaToJs(this.getJavaObject().glom());
    }

    /**
     * Return a new DStream by applying a function to all elements of this DStream.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.map = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().map(fn));
    };

    /**
     * Return a new DStream by applying a function to all elements of this DStream.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.PairDStream}
     */
    DStream.prototype.mapToPair = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSPairFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().mapToPair(fn));
    };

    /**
     * Return a new DStream in which each RDD is generated by applying
     * mapPartitions() to each RDDs of this DStream. Applying mapPartitions() to an
     * RDD applies a function to each partition of the RDD.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.mapPartitions = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSFlatMapFunction, this.context().sparkContext(), bindArgs);
        return Utils.javaToJs(this.getJavaObject().mapPartitions(fn));
    };

    /**
     * Return a new DStream in which each RDD has a single element generated by reducing each RDD
     * of this DStream.
     * @param {func} f
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.reduce = function (f, bindArgs) {
        var fn = Utils.createLambdaFunction(f, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        var javaObject = this.getJavaObject().reduce(fn);
        return Utils.javaToJs(javaObject);
    };


    /**
     * Return a new DStream in which each RDD has a single element generated by reducing all
     * elements in a sliding window over this DStream.
     * @param {func} reduceFunc  associative reduce function
     * @param {module:eclairjs/streaming.Duration} windowDuration  width of the window; must be a multiple of this DStream's
     *                       batching interval
     * @param {module:eclairjs/streaming.Duration} slideDuration   sliding interval of the window (i.e., the interval after which
     *                       the new DStream will generate RDDs); must be a multiple of this
     *                       DStream's batching interval
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.reduceByWindow = function (reduceFunc, windowDuration, slideDuration, bindArgs) {
        var fn = Utils.createLambdaFunction(reduceFunc, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        var windowDuration_uw = Utils.unwrapObject(windowDuration);
        var slideDuration_uw = Utils.unwrapObject(slideDuration);
        var javaObject = this.getJavaObject().reduceByWindow(fn, windowDuration_uw, slideDuration_uw);
        return Utils.javaToJs(javaObject);
    };


    /**
     * Return a new DStream in which each RDD has a single element generated by reducing all
     * elements in a sliding window over this DStream. However, the reduction is done incrementally
     * using the old window's reduced value :
     *  1. reduce the new values that entered the window (e.g., adding new counts)
     *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
     *  This is more efficient than reduceByWindow without "inverse reduce" function.
     *  However, it is applicable to only "invertible reduce functions".
     * @param {func} reduceFunc  associative reduce function
     * @param {func} invReduceFunc  inverse reduce function
     * @param {module:eclairjs/streaming.Duration} windowDuration  width of the window; must be a multiple of this DStream's
     *                       batching interval
     * @param {module:eclairjs/streaming.Duration} slideDuration   sliding interval of the window (i.e., the interval after which
     *                       the new DStream will generate RDDs); must be a multiple of this
     *                       DStream's batching interval
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.reduceByWindowwithSlideDuration = function (reduceFunc, invReduceFunc, windowDuration, slideDuration, bindArgs) {
        var fn = Utils.createLambdaFunction(reduceFunc, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        var fn2 = Utils.createLambdaFunction(invReduceFunc, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        var windowDuration_uw = Utils.unwrapObject(windowDuration);
        var slideDuration_uw = Utils.unwrapObject(slideDuration);
        var javaObject = this.getJavaObject().reduceByWindow(fn, fn2, windowDuration_uw, slideDuration_uw);
        return Utils.javaToJs(javaObject);
    };


    /**
     * Return a new DStream in which each RDD is generated by applying a function
     * on each RDD of 'this' DStream.
     * @param {func} transformFunc
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.transform = function (transformFunc, bindArgs) {
        var fn = Utils.createLambdaFunction(transformFunc, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        var javaObject = this.getJavaObject().transform(fn);
        return Utils.javaToJs(javaObject);
    };


    /**
     * Return a new DStream in which each RDD is generated by applying a function
     * on each RDD of 'this' DStream.
     * @param {func} transformFunc
     * @returns {module:eclairjs/streaming/dstream.PairDStream}
     */
    DStream.prototype.transformToPair = function (transformFunc, bindArgs) {
        var fn = Utils.createLambdaFunction(transformFunc, org.eclairjs.nashorn.JSFunction, this.context().sparkContext(), bindArgs);
        var javaObject = this.getJavaObject().transformToPair(fn);
        return Utils.javaToJs(javaObject);
    };


    /**
     * Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output
     * stream and therefore materialized.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {void}
     */
    DStream.prototype.foreachRDD = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSVoidFunction, this.context().sparkContext(), bindArgs);
        this.getJavaObject().foreachRDD(fn);
    };

    /**
     * Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output
     * stream and therefore materialized.
     *  The time is passed into to the function.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {void}
     */
    DStream.prototype.foreachRDDWithTime = function (func, bindArgs) {
        var fn = Utils.createLambdaFunction(func, org.eclairjs.nashorn.JSFunction2, this.context().sparkContext(), bindArgs);
        this.getJavaObject().foreachRDD(fn);
    };

    /**
     * Print the first ten elements of each RDD generated in this DStream. This is an output operator, so this DStream will be
     * registered as an output stream and there materialized.
     * @returns {void}
     */
    DStream.prototype.print = function () {
        this.getJavaObject().print();
    };

    module.exports = DStream;

})();