Source: sql/streaming/DataStreamWriter.js

/*                                                                         
* Copyright 2016 IBM Corp.                                                 
*                                                                          
* Licensed under the Apache License, Version 2.0 (the "License");          
* you may not use this file except in compliance with the License.         
* You may obtain a copy of the License at                                  
*                                                                          
*      http://www.apache.org/licenses/LICENSE-2.0                          
*                                                                          
* Unless required by applicable law or agreed to in writing, software      
* distributed under the License is distributed on an "AS IS" BASIS,        
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and      
* limitations under the License.                                           
*/ 


var Utils = require('../../utils.js');


/**
 * @classdesc
 * 
 * Interface used to write a streaming {@link Dataset} to external storage systems (e.g. file systems,
 * key-value stores, etc). Use {@link writeStream} to access this.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @class
 * @memberof module:eclairjs/sql/streaming
 */


function DataStreamWriter(kernelP, refIdP) {
	 this.kernelP = kernelP;
	 this.refIdP = refIdP;
	 
};


/**
 * 
 * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
 *   - `append`:   only the new rows in the streaming DataFrame/Dataset will be written to
 *                 the sink
 *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
 *                 every time these is some updates
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} outputMode
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.outputMode = function(outputMode) {
	 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
	   var args ={
	     target: this, 
	     method: 'outputMode', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamWriter
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * 
 * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
 * the query as fast as possible.
 *
 * @example 
 *   df.writeStream().trigger(ProcessingTime.create("10 seconds"))
 *  
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/streaming.Trigger} trigger
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.trigger = function(trigger) {
 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
   var args ={
     target: this, 
     method: 'trigger', 
     args: Utils.wrapArguments(arguments),
     returnType: DataStreamWriter
 
   };
 
   return Utils.generate(args);
};


/**
 * 
 * Specifies the name of the {@link StreamingQuery} that can be started with `start()`.
 * This name must be unique among all the currently active queries in the associated SQLContext.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} queryName
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.queryName = function(queryName) {

	 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
	   var args ={
	     target: this, 
	     method: 'queryName', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamWriter
	 
	   };
	 
	   return Utils.generate(args);
};

var dataStreamWriterForeachCounter = 0;
/**
 * 
 * Specifies the underlying output data source. Built-in options include "parquet"
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} source
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.format = function(source) {
	if (source == 'console') {
		var id = 'dataStreamWriterForeach-' + ++dataStreamWriterForeachCounter;

		 this.kernelP.then(function(kernel) {
		    var comm = kernel.connectToComm('dataStreamWriterForeach', id);
	
		    comm.onMsg = (msg) => {
			    var response = msg.content.data;
			    console.log(JSON.stringify(response));
		
		    };
	
		    comm.open('');
		 });
		var funcBody = "var comm = commMap.get('dataStreamWriterForeach:" + id + "'); " +
                        "comm.send('dataStreamWriterForeach', JSON.stringify({type:\"process\", v: value}));"
		var processCallbackFunction = new Function("connection", "value", funcBody); 
		return this.foreach(
								function (partitionId, version) {
								    return {"connection": "connection object", "partitionId": partitionId, "version": version};
								},
								processCallbackFunction,
								function (connection) {
								}
							);
	} else {
		 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
		   var args ={
		     target: this, 
		     method: 'format', 
		     args: Utils.wrapArguments(arguments),
		     returnType: DataStreamWriter
		 
		   };
		 
		   return Utils.generate(args);
	}
	
};


/**
 * Partitions the output by the given columns on the file system. If specified, the output is
 * laid out on the file system similar to Hive's partitioning scheme. As an example, when we
 * partition a dataset by year and then month, the directory layout would look like:
 *
 *   - year=2016/month=01/
 *   - year=2016/month=02/
 *
 * Partitioning is one of the most widely used techniques to optimize physical data layout.
 * It provides a coarse-grained index for skipping unnecessary data reads when queries have
 * predicates on the partitioned columns. In order for partitioning to work well, the number
 * of distinct values in each column should typically be less than tens of thousands.
 *
 * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
 *
 * @since EclairJS 0.7 Spark  1.4.0
 * @param {...string} colNames
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.partitionBy = function(colNames) {
	 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
	 // TODO: handle repeated parm 'colNames'
	   var args ={
	     target: this, 
	     method: 'partitionBy', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamWriter
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * 
 * Adds an output option for the underlying data source.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} key
 * @param {string} value
 * @returns {DataStreamWriter} 
 */
DataStreamWriter.prototype.option = function(key,value) {
	 var DataStreamWriter = require('../../sql/streaming/DataStreamWriter.js');
	   var args ={
	     target: this, 
	     method: 'option', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamWriter
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * 
 * Starts the execution of the streaming query, which will continually output results to the given
 * path as new data arrives. The returned {@link StreamingQuery} object can be used to interact with
 * the stream.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} [path]
 * @returns {StreamingQuery} 
 */
DataStreamWriter.prototype.start = function(path) {
	 var StreamingQuery = require('../../sql/streaming/StreamingQuery.js');
	 // TODO: handle optional parms 'path'
	   var args ={
	     target: this, 
	     method: 'start', 
	     args: Utils.wrapArguments(arguments),
	     returnType: StreamingQuery
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * 
 * Starts the execution of the streaming query, which will continually send results to the given
 * as as new data arrives. The {@link ForeachWriter} can be used to send the data
 * generated by the [[DataFrame]]/{@link Dataset} to an external system.
 *  
 * @example
 * var query = counts.writeStream().foreach(function(partitionId, version, webSocket) {
 *       // open connection
 *      var connection = webSocket.open(....);
 *      return connection;
 *  },[webSocket],
 *  function(connection, value) {
 *     connection.send(value);
 *   },
 *  function(connection) {
 *     connection.close();
 *  }).start();
 *  
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/streaming.DataStreamWriter~openCallback} openCallback Used to open connection to external system.
 * @param {module:eclairjs/sql/streaming.DataStreamWriter~processCallback} processCallback use to send the data external system.
 * @param {module:eclairjs/sql/streaming.DataStreamWriter~closeCallback} closeFunction Used to close connection to external system.
 * @param {object[]} [openFunctionBindArgs]
 * @param {object[]} [processFunctionBindArgs]
 * @param {object[]} [closeFunctionBindArgs]
 * @returns {DataStreamWriter} 
 */

DataStreamWriter.prototype.foreach = function(openCallback, processCallback, closeCallback, openFunctionBindArgs, processFunctionBindArgs, closeFunctionBindArgs) {

  var args = {
    target: this,
    method: 'foreach',
    args: [
      {value: openCallback, type: 'lambda'},
      {value: processCallback, type: 'lambda'},
      {value: closeCallback, type: 'lambda'},
      {value: Utils.wrapBindArgs(openFunctionBindArgs), optional: true},
      {value: Utils.wrapBindArgs(processFunctionBindArgs), optional: true},
      {value: Utils.wrapBindArgs(closeFunctionBindArgs), optional: true}
    ],
    returnType: DataStreamWriter
  };

  return Utils.generate(args);
};

/**
 * This callback Used to open connection to external system.
 * @callback module:eclairjs/sql/streaming.DataStreamWriter~openCallback
 * @param {number} partitionId
 * @param {number} version
 * @param {object[]} [bindArgs]
 * @returns {object} connection that is passed to {@link module:eclairjs/sql/streaming.DataStreamWriter~processCallback}
 * and {@link module:eclairjs/sql/streaming.DataStreamWriter~closeCallback}
 */

/**
 * This callback consume data generated by a StreamingQuery. Typically this is used to send the generated data
 * to external systems from each partition so you usually should do all the initialization (e.g. opening a connection
 * or initiating a transaction) in the {@link module:eclairjs/sql/streaming.DataStreamWriter~openCallback}.
 * @callback module:eclairjs/sql/streaming.DataStreamWriter~processCallback
 * @param {object} connection
 * @param {number} value
 * @param {object[]} [bindArgs]
 */

/**
 * Used to open connection to external system.
 * @callback module:eclairjs/sql/streaming.DataStreamWriter~closeCallback
 * @param {object} connection
 * @param {object[]} [bindArgs]
 */

module.exports = DataStreamWriter;