Source: streaming/dstream/DStream.js

/*
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

module.exports = function(kernelP) {
  return (function() {
    var Utils = require('../../utils.js');
    var RDD = require('../../rdd/RDD.js');

    var gKernelP = kernelP;

    /**
     * @constructor
     * @memberof module:eclairjs/streaming/dstream
     * @classdec Represents a Discretized Stream (DStream), the basic abstraction in Spark Streaming, 
     * is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
     */
    function DStream() {
      Utils.handleConstructor(this, arguments, gKernelP);
    }

    /**
     * Return a new DStream by first applying a function to all elements of this DStream, and then flattening the results.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.flatMap = function(func, bindArgs) {
      var args = {
        target: this,
        method: 'flatMap',
        args: [
          {value: func, type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs), optional: true}
        ],
        returnType: DStream
      };

      return Utils.generate(args);
    };

    /**
     * Return a new DStream by applying a function to all elements of this DStream.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.map = function(func, bindArgs) {
      var args = {
        target: this,
        method: 'map',
        args: [
          {value: func, type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs), optional: true}
        ],
        returnType: DStream
      };

      return Utils.generate(args);
    };

    /**
     * Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. 
     * The new DStream generates RDDs with the same interval as this DStream.
     * @param duration - width of the window; must be a multiple of this DStream's interval.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.window = function(duration) {
      var args = {
        target: this,
        method: 'window',
        args: Utils.wrapArguments(arguments),
        returnType: DStream
      };

      return Utils.generate(args);
    };

    /**
     * Print the first ten elements of each RDD generated in this DStream. This is an output operator, so this DStream will be 
     * registered as an output stream and there materialized.
     * @returns {void}
     */
    DStream.prototype.print = function() {
      var args = {
        target: this,
        method: 'print'
      };

      return Utils.generate(args);
    };

    var foreachRDDCounter = 0;

    function generateForeachRDDFunc(lambda, id) {
      // TODO: consider using template literals
      var func = "function() {\
        var res = {{lambda}}.apply(this, arguments);\
        var comm = commMap.get('foreachrdd:{{id}}');\
        comm.send('foreachrdd', JSON.stringify({response: res}));\
      }";

      return func.replace('{{id}}', id).replace('{{lambda}}', lambda.toString());
    }

    /**
     * Apply a function to each RDD in this DStream.
     *
     * foreachRDD works slightly different in EclairJS-node compared to regular Spark due to the fact that it executes the
     * code remotely in EclairJS-nashorn running in Apache Toree.  Instead of just one lambda function, EclairJS-node's
     * foreachRDD takes in two.
     *
     * The first argument is a function which is run remotely on EclairJS-nashorn in Apache Toree and has several
     * restrictions:
     *   - Need to use the EclairJS-nashorn API (https://github.com/EclairJS/eclairjs-nashorn/wiki/API-Documentation).  The
     *     main difference is that methods calls are always synchronous - so for example count() will return the number
     *     directly (while in EclairJS-node it would return a Promise).
     *   - The code in the remote function must return a JSON serializable value.  This means no asynchronous calls can
     *     happen.
     *
     * The second argument in foreachRDD is a bindArgs - an array of values that will be added to the remote function's
     * argument list.  Set it to null if none are needed.
     *
     * The third argument is function that runs on the Node side.  The remote function's return value (which has already
     * been JSON parsed) will be passed into the local function as an argument.
     *
     * You will need to run all Spark computations in the remote function and use the local function to send the result to
     * its final destination (your datastore of choice for example).
     *
     * Example:
     *
     * var dStream = ...;
     *
     * dStream.foreachRDD(
     *   function(rdd) {
     *     // runs remotely
     *     return rdd.collect();
     *   },
     *   null,
     *   function(res) {
     *     // runs locally in Node
     *     console.log('Results: ', res)
     *   }
     * )
     *
     * The remote function collects the contents of the RDD and returns the array, which then gets passed as an argument
     * into the local function.
     *
     * @param {function} remoteFunc - lambda function that runs on the Spark side.  The returned result from the lambda
     *    will be passed into localFunc as an argument
     * @param {Object[]} bindArgs array whose values will be added to remoteFunc's argument list.
     * @param {function} [localFunc] - lambda function that runs on the Node side.
     * @returns {Promise.<void>}
     */
    DStream.prototype.foreachRDD = function(remoteFunc, bindArgs, localFunc) {
      var id = 'foreachrdd-' + ++foreachRDDCounter;

      var args = {
        target: this,
        method: 'foreachRDD',
        args: [
          {value: generateForeachRDDFunc(remoteFunc, id), type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs)}
        ]
      };

      this.kernelP.then(function(kernel) {
        var comm = kernel.connectToComm('foreachrdd', id);

        comm.onMsg = (msg) => {
          var response = msg.content.data.response;

          if (localFunc) {
            localFunc(response);
          }
        };

        comm.open('');
      });

      return Utils.generate(args);
    };

    /**
     * Return a new DStream containing only the elements that satisfy a predicate.
     * @param {function} func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.filter = function(func, bindArgs) {
      var args = {
        target: this,
        method: 'filter',
        args: [
          {value: func, type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs), optional: true}
        ],
        returnType: DStream
      };

      return Utils.generate(args)
    };

    /**
     * Return a new DStream by applying a function to all elements of this DStream.
     * @param func
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.PairDStream} 
     */
    DStream.prototype.mapToPair = function(func, bindArgs) {
      var PairDStream = require('./PairDStream')(this.kernelP);

      var args = {
        target: this,
        method: 'mapToPair',
        args: [
          {value: func, type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs), optional: true}
        ],
        returnType: PairDStream
      };

      return Utils.generate(args);

    };


    /**
     * Persist RDDs of this DStream with the storage level
     * @param {module:eclairjs/storage.StorageLevel} [level] (MEMORY_ONLY_SER) by default
     * @return {module:eclairjs/streaming/dstream.DStream}
     */
    DStream.prototype.persist = function() {
      var args = {
        target: this,
        method: 'persist',
        returnType: DStream
      };

      if(arguments.length  == 1) {
        args.args  = Utils.wrapArguments(arguments);
      }

      return Utils.generate(args);
    };


    /**
     * Return a new DStream in which each RDD is generated by applying a function
     * on each RDD of 'this' DStream.
     * @param {func} transformFunc
     * @param {Object[]} [bindArgs] array whose values will be added to func's argument list.
     * @returns {module:eclairjs/streaming/dstream.PairDStream}
     */
    DStream.prototype.transformToPair = function (transformFunc, bindArgs) {
      var PairDStream = require('./PairDStream')(this.kernelP);

      var args = {
        target: this,
        method: 'transformToPair',
        args: [
          {value: transformFunc, type: 'lambda'},
          {value: Utils.wrapBindArgs(bindArgs), optional: true}
        ],
        returnType: PairDStream
      };

      return Utils.generate(args);
    };

    DStream.moduleLocation = '/streaming/dstream/DStream';

    return DStream;
  })();
};