Source: streaming/StreamingContext.js

/*
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

var Utils = require('../utils.js');

/**
 * @constructor
 * @memberof module:eclairjs/streaming
 * @classdesc Main entry point for Spark Streaming functionality. It provides methods used to create DStreams from various input sources.
 *  It can be either created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf configuration
 *  (see core Spark documentation), or from an existing org.apache.spark.SparkContext. The associated SparkContext can be accessed
 *  using context.sparkContext. After creating and transforming DStreams, the streaming computation can be started and stopped using
 *  context.start() and context.stop(), respectively. context.awaitTermination() allows the current thread to wait for the termination
 *  of the context by stop() or by an exception.
 *  @param {SparkContex} sparkContext
 *  @param {module:eclairjs/streaming.Duration} duration
 */
function StreamingContext(sparkContext, duration) {
  this.kernelP = sparkContext.kernelP;

  var args = {
    target: StreamingContext,
    args: Utils.wrapArguments(arguments),
    kernelP: this.kernelP
  };

  this.refIdP = Utils.generateConstructor(args);
}

/**
 * The underlying SparkContext
 * @returns {module:eclairjs.SparkContext}
 */
StreamingContext.prototype.sparkContext = function () {
  var SparkContext = require('../SparkContext.js')()[1];

  var args = {
    target: this,
    method: 'sparkContext',
    args: Utils.wrapArguments(arguments),
    returnType: SparkContext
  };

  return Utils.generate(args);
};

/**
 * Wait for the execution to stop
 */
StreamingContext.prototype.awaitTermination = function() {
  var args = {
    target: this,
    method: 'awaitTermination'
  };

  return Utils.generate(args);
};

/**
 * Wait for the execution to stop, or timeout
 * @param {long} millis
 * @returns {boolean}
 */
StreamingContext.prototype.awaitTerminationOrTimeout = function(millis) {
  var args = {
    target: this,
    method: 'awaitTerminationOrTimeout',
    args: Utils.wrapArguments(arguments),
    returnType: Boolean
  };

  return Utils.generate(args);
};

/**
 * Start the execution of the streams.
 */
StreamingContext.prototype.start = function() {
  var args = {
    target: this,
    method: 'start'
  };

  return Utils.generate(args);
};

/**
 * Stops the execution of the streams.
 */
StreamingContext.prototype.stop = function() {
  var args = {
    target: this,
    method: 'stop',
    args: Utils.wrapArguments(arguments)
  };

  return Utils.generate(args);
};

StreamingContext.prototype.close = function () {
  var args = {
    target: this,
    method: 'close'
  };

  return Utils.generate(args);
};

/**
 * Create a input stream from TCP source hostname:port.
 * @param {string} host
 * @param {string} port
 * @returns {module:eclairjs/streaming/dstream.DStream}
 */
StreamingContext.prototype.socketTextStream = function(host, port) {
  var DStream = require('./dstream/DStream.js')(this.kernelP);

  var args = {
    target: this,
    method: 'socketTextStream',
    args: Utils.wrapArguments(arguments),
    returnType: DStream
  };

  return Utils.generate(args);
};

/**
 * Create an input stream from an queue of RDDs. In each batch,
 * it will process either one or all of the RDDs returned by the queue.
 *
 * NOTE:
 * 1. Changes to the queue after the stream is created will not be recognized.
 * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
 * those RDDs, so `queueStream` doesn't support checkpointing.
 *
 * @param {module:eclairjs.RDD[]} queue       Queue of RDDs
 * @param {boolean}  [oneAtATime=true]   Whether only one RDD should be consumed from the queue in every interval
 * @param {module:eclairjs.RDD} [defaultRDD]  Default RDD is returned by the DStream when the queue is empty
 * @returns {module:eclairjs/streaming/dstream.DStream}
 */
StreamingContext.prototype.queueStream = function (queue) {
  var DStream = require('./dstream/DStream.js')(this.kernelP);
  
  var args = {
    target: this,
    method: 'queueStream',
    args: Utils.wrapArguments(arguments),
    returnType: DStream
  };

  return Utils.generate(args);
};

StreamingContext.moduleLocation = '/streaming/StreamingContext';

module.exports = StreamingContext;