Source: sql/streaming/DataStreamReader.js

/*                                                                         
* Copyright 2016 IBM Corp.                                                 
*                                                                          
* Licensed under the Apache License, Version 2.0 (the "License");          
* you may not use this file except in compliance with the License.         
* You may obtain a copy of the License at                                  
*                                                                          
*      http://www.apache.org/licenses/LICENSE-2.0                          
*                                                                          
* Unless required by applicable law or agreed to in writing, software      
* distributed under the License is distributed on an "AS IS" BASIS,        
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and      
* limitations under the License.                                           
*/ 


var Utils = require('../../utils.js');


/**
 * @classdesc
 * Interface used to load a streaming {@link Dataset} from external storage systems (e.g. file systems,
 * key-value stores, etc). Use {@link readStream} to access this.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @class
 * @memberof module:eclairjs/sql/streaming
 */


function DataStreamReader(kernelP, refIdP) {
	 this.kernelP = kernelP;
	 this.refIdP = refIdP;
	 
};



/**
 * :: Experimental ::
 * Specifies the input data source format.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} source
 * @returns {DataStreamReader} 
 */
DataStreamReader.prototype.format = function(source) {
	 var DataStreamReader = require('../../sql/streaming/DataStreamReader.js');
	   var args ={
	     target: this, 
	     method: 'format', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamReader
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
 * automatically from data. By specifying the schema here, the underlying data source can
 * skip the schema inference step, and thus speed up data loading.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {module:eclairjs/sql/types.StructType} schema
 * @returns {DataStreamReader} 
 */
DataStreamReader.prototype.schema = function(schema) {
	 var DataStreamReader = require('../../sql/streaming/DataStreamReader.js');
	   var args ={
	     target: this, 
	     method: 'schema', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamReader
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Adds an input option for the underlying data source.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} key
 * @param {string} value
 * @returns {DataStreamReader} 
 */
DataStreamReader.prototype.option = function(key,value) {
	 var DataStreamReader = require('../../sql/streaming/DataStreamReader.js');
	   var args ={
	     target: this, 
	     method: 'option', 
	     args: Utils.wrapArguments(arguments),
	     returnType: DataStreamReader
	 
	   };
	 
	   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Loads input in as a {@link Dataset}, for data streams that read from some path.
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} [path]
 * @returns {Dataset} 
 */
DataStreamReader.prototype.load = function(path) {
 // TODO: handle optional parms 'path'
   var Dataset = require('../../sql/Dataset.js');
   var args ={
     target: this, 
     method: 'load', 
     args: Utils.wrapArguments(arguments),
     returnType: Dataset
 
   };
 
   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Loads a JSON file stream (one object per line) and returns the result as a {@link Dataset}.
 *
 * This function goes through the input once to determine the input schema. If you know the
 * schema in advance, use the version that specifies the schema to avoid the extra scan.
 *
 * You can set the following JSON-specific options to deal with non-standard JSON files:
 * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
 * considered in every trigger.</li>
 * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
 * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
 * type. If the values do not fit in decimal, then it infers them as doubles.</li>
 * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
 * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
 * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
 * </li>
 * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
 * (e.g. 00012)</li>
 * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
 * character using backslash quoting mechanism</li>
 * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
 * during parsing.</li>
 * <ul>
 *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
 *  malformed string into a new field configured by `columnNameOfCorruptRecord`. When
 *  a schema is set by user, it sets `null` for extra fields.</li>
 *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
 *  <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
 * </ul>
 * <li>`columnNameOfCorruptRecord` (default is the value specified in
 * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
 * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} path
 * @returns {Dataset} 
 */
DataStreamReader.prototype.json = function(path) {
	var Dataset = require('../../sql/Dataset.js');
   var args ={
     target: this, 
     method: 'json', 
     args: Utils.wrapArguments(arguments),
     returnType: Dataset
 
   };
 
   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Loads a CSV file stream and returns the result as a {@link Dataset}.
 *
 * This function will go through the input once to determine the input schema if `inferSchema`
 * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
 * specify the schema explicitly using {@link schema}.
 *
 * You can set the following CSV-specific options to deal with CSV files:
 * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
 * considered in every trigger.</li>
 * <li>`sep` (default `,`): sets the single character as a separator for each
 * field and value.</li>
 * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
 * type.</li>
 * <li>`quote` (default `"`): sets the single character used for escaping quoted values where
 * the separator can be part of the value. If you would like to turn off quotations, you need to
 * set not `null` but an empty string. This behaviour is different form
 * `com.databricks.spark.csv`.</li>
 * <li>`escape` (default `\`): sets the single character used for escaping quotes inside
 * an already quoted value.</li>
 * <li>`comment` (default empty string): sets the single character used for skipping lines
 * beginning with this character. By default, it is disabled.</li>
 * <li>`header` (default `false`): uses the first line as names of columns.</li>
 * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
 * requires one extra pass over the data.</li>
 * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces
 * from values being read should be skipped.</li>
 * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
 * whitespaces from values being read should be skipped.</li>
 * <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
 * <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
 * <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
 * value.</li>
 * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
 * value.</li>
 * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
 * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
 * and timestamp type. By default, it is `null` which means trying to parse times and date by
 * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
 * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
 * a record can have.</li>
 * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
 * for any given value being read.</li>
 * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
 *    during parsing.</li>
 * <ul>
 *   <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When
 *     a schema is set by user, it sets `null` for extra fields.</li>
 *   <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
 *   <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
 * </ul>
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} path
 * @returns {Dataset} 
 */
DataStreamReader.prototype.csv = function(path) {
	var Dataset = require('../../sql/Dataset.js');
   var args ={
     target: this, 
     method: 'csv', 
     args: Utils.wrapArguments(arguments),
     returnType: Dataset
 
   };
 
   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Loads a Parquet file stream, returning the result as a {@link Dataset}.
 *
 * You can set the following Parquet-specific option(s) for reading Parquet files:
 * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
 * considered in every trigger.</li>
 * <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
 * whether we should merge schemas collected from all Parquet part-files. This will override
 * `spark.sql.parquet.mergeSchema`.</li>
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} path
 * @returns {Dataset} 
 */
DataStreamReader.prototype.parquet = function(path) {
	var Dataset = require('../../sql/Dataset.js');
   var args ={
     target: this, 
     method: 'parquet', 
     args: Utils.wrapArguments(arguments),
     returnType: Dataset
 
   };
 
   return Utils.generate(args);
};


/**
 * :: Experimental ::
 * Loads text files and returns a {@link Dataset} whose schema starts with a string column named
 * "value", and followed by partitioned columns if there are any.
 *
 * Each line in the text files is a new row in the resulting Dataset. For example:
 * @example 
 *   // Scala:
 *   spark.readStream.text("/path/to/directory/")
 *
 *   // Java:
 *   spark.readStream().text("/path/to/directory/")
 *  
 *
 * You can set the following text-specific options to deal with text files:
 * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
 * considered in every trigger.</li>
 *
 * @since EclairJS 0.7 Spark  2.0.0
 * @param {string} path
 * @returns {Dataset} 
 */
DataStreamReader.prototype.text = function(path) {
	var Dataset = require('../../sql/Dataset.js');
   var args ={
     target: this, 
     method: 'text', 
     args: Utils.wrapArguments(arguments),
     returnType: Dataset
 
   };
 
   return Utils.generate(args);
};

module.exports = DataStreamReader;