Source: sql/streaming/StreamingQueryManager.js

/*                                                                         
* Copyright 2016 IBM Corp.                                                 
*                                                                          
* Licensed under the Apache License, Version 2.0 (the "License");          
* you may not use this file except in compliance with the License.         
* You may obtain a copy of the License at                                  
*                                                                          
*      http://www.apache.org/licenses/LICENSE-2.0                          
*                                                                          
* Unless required by applicable law or agreed to in writing, software      
* distributed under the License is distributed on an "AS IS" BASIS,        
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and      
* limitations under the License.                                           
*/ 


var Utils = require('../../utils.js');


/**
 * @classdesc
 * :: Experimental ::
 * A class to manage all the [[StreamingQuery]] active on a {@link SparkSession}.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @class
 * @memberof module:eclairjs/sql/streaming
 */


function StreamingQueryManager(kernelP, refIdP) {
	 this.kernelP = kernelP;
	 this.refIdP = refIdP;
	 
};



/**
 * Returns a list of active queries associated with this SQLContext
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @returns {StreamingQuery[]} 
 */
StreamingQueryManager.prototype.active = function() {
   var args ={
     target: this, 
     method: 'active', 
     returnType: [StreamingQuery]
 
   };
 
   return Utils.generate(args);
};


/**
 * Returns the query if there is an active query with the given id, or null.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {number} id
 * @returns {StreamingQuery} 
 */
StreamingQueryManager.prototype.get = function(id) {
	 var StreamingQuery = require('../../sql/streaming/StreamingQuery.js');
	   var args ={
	     target: this, 
	     method: 'get', 
	     args: Utils.wrapArguments(arguments),
	     returnType: StreamingQuery
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * Wait until any of the queries on the associated SQLContext has terminated since the
 * creation of the context, or since `resetTerminated()` was called. Returns whether any query
 * has terminated or not (multiple may have terminated). If any query has terminated with an
 * exception, then the exception will be thrown.
 *
 * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
 * return `true` immediately (if the query was terminated by `query.stop()`),
 * or throw the exception immediately (if the query was terminated with exception). Use
 * `resetTerminated()` to clear past terminations and wait for new terminations.
 *
 * In the case where multiple queries have terminated since `resetTermination()` was called,
 * if any query has terminated with exception, then `awaitAnyTermination()` will
 * throw any of the exception. For correctly documenting exceptions across multiple queries,
 * users need to stop all of them after any of them terminates with exception, and then check the
 * `query.exception()` for each query.
 *
 * @throws StreamingQueryException, if any query has terminated with an exception
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {number} [timeoutMs]
 * @returns {Promise.<boolean>} 
 * @ignore
 */
//FIXME This blocks the Toree kernel so no query is processed
//StreamingQueryManager.prototype.awaitAnyTermination = function(timeoutMs) {
// 
// function _resolve(result, resolve, reject) {
// 	try {
// 		var returnValue=result === 'true';
// 		resolve(returnValue);
// 	} catch (e) {
// 		var err = new Error("Parse Error: "+ e.message);
// 		reject(err);
// 	}
// };
//   var args ={
//     target: this, 
//     method: 'awaitAnyTermination', 
//     args: Utils.wrapArguments(arguments),
//     resolver: _resolve,
//     returnType: boolean
// 
//   };
// 
//   return Utils.generate(args);
//};


/**
 * Forget about past terminated queries so that `awaitAnyTermination()` can be used again to
 * wait for new terminations.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @returns {Promise.<Void>} A Promise that resolves to nothing.
 */
StreamingQueryManager.prototype.resetTerminated = function() {
   var args ={
     target: this, 
     method: 'resetTerminated', 
     returnType: null
 
   };
 
   return Utils.generate(args);
};


/**
 * Register a {@link StreamingQueryListener} to receive up-calls for life cycle events of
 * {@link StreamingQuery}.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/streaming.StreamingQueryListener} listener
 * @returns {Promise.<Void>} A Promise that resolves to nothing.
 */
StreamingQueryManager.prototype.addListenerx = function(listener) {
   var args ={
     target: this, 
     method: 'addListener', 
     args: Utils.wrapArguments(arguments),
     returnType: null
 
   };
 
   return Utils.generate(args);
};

var streamingQueryManagerListenerCounter=0;
function generateStreamingQueryManagerListenerFunc(id) {
  var func = "function(queryStartedInfo) {\
    var comm = commMap.get('streamingQueryManagerListener:{{id}}');\
    comm.send('streamingQueryManagerListener', JSON.stringify({type:\"start\", streamingQueryInfo: JSON.stringify(queryStartedInfo)}));\
  },\
  function(queryProcessInfo) {\
    var comm = commMap.get('streamingQueryManagerListener:{{id}}');\
    comm.send('streamingQueryManagerListener', JSON.stringify({type:\"process\", streamingQueryInfo: JSON.stringify(queryProcessInfo)}));\
  },\
  function(queryTerminatedInfo) {\
	  var comm = commMap.get('streamingQueryManagerListener:{{id}}');\
	  comm.send('streamingQueryManagerListener', JSON.stringify({type:\"terminated\", streamingQueryInfo: JSON.stringify(queryTerminatedInfo)}));\
  }\
  ";

  return func.replace(/{{id}}/g, id);
}
/**
 * Register to receive callbacks for life cycle events of {@link module:eclairjs/sql/streaming.StreamingQuery}.
 *
 *
 * Example:
 *
 *	var listener = queryManger.addListener(
 *	    function(event){
 *	        console.log("queryEvent " + JSON.stringify(event));
 *	        if (event.type == 'start') {
 *	        	// event started
 *	        } else if (event.type == 'process') {
 *	        	queryManger.get(event.streamingQueryInfo.id).stop().then(function(){
 *	        		//sparkSession.stop();
 *	        	});
 *	       	
 *	        } else if (event.type == 'terminated') {
 *	        	sparkSession.stop();
 *	        }
 *	    }
 *	);
 *
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/streaming.StreamingQueryManager~queryEventCallback} queryEventCallback
 * @returns {Promise.<Void>} A Promise that resolves to nothing.
 */

StreamingQueryManager.prototype.addListener = function(localFunc) {
  var id = 'streamingQueryManagerListener-' + ++streamingQueryManagerListenerCounter;
  var StreamingQueryListener = require('../../sql/streaming/StreamingQueryListener.js');
  var args = {
    target: this,
    method: 'addListener',
    args: [
      {value: generateStreamingQueryManagerListenerFunc(id), type: 'lambda'}
    ],
    returnType: StreamingQueryListener
  };

  this.kernelP.then(function(kernel) {
    var comm = kernel.connectToComm('streamingQueryManagerListener', id);

    comm.onMsg = (msg) => {
      var StreamingQueryInfo = require('../../sql/streaming/StreamingQueryInfo.js');
      var response = msg.content.data;

      if (localFunc) {
    	var event = {};
    	event.type = response.type;
    	event.streamingQueryInfo = new StreamingQueryInfo(JSON.parse(response.streamingQueryInfo));
        localFunc(event);
      }
    };

    comm.open('');
  });

  return Utils.generate(args);
};

/**
 * This callback called for query starts, process or termination events.
 * @callback module:eclairjs/sql/streaming.StreamingQueryManager~queryEventCallback
 * @param {module:eclairjs/sql/streaming.StreamingQueryInfo} event
 */

/**
 * Deregister a {@link StreamingQueryListener}.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/streaming.StreamingQueryListener} listener
 * @returns {Promise.<Void>} A Promise that resolves to nothing.
 */
StreamingQueryManager.prototype.removeListener = function(listener) {
   var args ={
     target: this, 
     method: 'removeListener', 
     args: Utils.wrapArguments(arguments),
     returnType: null
 
   };
 
   return Utils.generate(args);
};

module.exports = StreamingQueryManager;