Source: streaming/dstream/PairDStream.js

/*
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

var Utils = require('../../utils.js');
var PairRDD = require('../../rdd/PairRDD.js');
var DStream = require('./DStream')();

var gKernelP;

/**
 * @constructor
 * @classdec Represents a Discretized Stream (DStream), the basic abstraction in Spark Streaming,
 * is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
 * @memberof module:eclairjs/streaming/dstream
 */
function PairDStream(kernelP, refIdP) {
  this.kernelP = kernelP;
  this.refIdP = refIdP;
}

PairDStream.prototype = Object.create(DStream.prototype);

PairDStream.prototype.constructor = PairDStream;

/**
 * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
 * with Spark's default number of partitions.
 * @param {func} func
 * @returns {module:eclairjs/streaming/dstream.PairDStream} 
 */
PairDStream.prototype.reduceByKey = function (func, bindArgs) {
  var args = {
    target: this,
    method: 'reduceByKey',
    args: [
      {value: func, type: 'lambda'},
      {value: Utils.wrapBindArgs(bindArgs), optional: true}
    ],
    returnType: PairDStream
  };

  return Utils.generate(args);
};


/**
 * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
 * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
 * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
 * the RDDs with Spark's default number of partitions.
 * @param {func} reduceFunc  associative reduce function
 * @param {module:eclairjs/streaming.Duration} windowDuration  width of the window; must be a multiple of this DStream's
 *                       batching interval
 * @returns {module:eclairjs/streaming/dstream.PairDStream}
 */
PairDStream.prototype.reduceByKeyAndWindow = function (reduceFunc, windowDuration, bindArgs) {
  var args = {
    target: this,
    method: 'reduceByKeyAndWindow',
    args: [
      {value: reduceFunc, type: 'lambda'},
      {value: windowDuration},
      {value: Utils.wrapBindArgs(bindArgs), optional: true}
    ],
    returnType: PairDStream
  };

  return Utils.generate(args);
};

PairDStream.moduleLocation = '/streaming/dstream/PairDStream';

module.exports = function(kP) {
  if (kP) gKernelP = kP;

  return PairDStream;
};