Skip to content

Commit

Permalink
chore(table): remove createLoadJobStream, createLoadJob test refactor…
Browse files Browse the repository at this point in the history
… for promises #640
  • Loading branch information
zamnuts committed Mar 23, 2020
1 parent 3c055b4 commit 28e3ef0
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 220 deletions.
171 changes: 46 additions & 125 deletions src/table.ts
Expand Up @@ -1103,114 +1103,10 @@ class Table extends common.ServiceObject {
this.bigQuery.createJob(body, callback!);
}

/**
* Add `location` if available. Replaces `format` with `sourceFormat` from
* lookup table if format is present. Does not mutate.
* @param {JobLoadMetadata} metadata
* @returns {JobLoadMetadata}
* @private
*/
_prepareLoadJobMetadata(metadata?: JobLoadMetadata): JobLoadMetadata {
if (!metadata) {
return {};
}

const {format, ...restMetadata} = metadata || {};
const {location} = this;
const sourceFormat = format ? FORMATS[format.toLowerCase()] : undefined;

return {
...restMetadata,
...(location ? {location} : {}),
...(format ? {sourceFormat} : {format}),
};
}

/**
* Load data from a local file, receiving
*
* By loading data this way, you create a load job that will run your data
* load asynchronously. If you would like instantaneous access to your data,
* insert it using {@liink Table#insert}.
*
* Note: The file type will be inferred by the given file's extension. If you
* wish to override this, you must provide `metadata.format`.
*
* @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
*
* @param {string} source The path to the local source file to load.
* @param {object} [metadata] Metadata to set with the load operation. The
* metadata object should be in the format of the
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs
* resource.
* @param {string} [metadata.format] The format the data being loaded is in.
* Allowed options are "AVRO", "CSV", "JSON", "ORC", or "PARQUET".
* @param {string} [metadata.jobId] Custom job id.
* @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
* @returns {Writable} Emits a "job" event upon completion.
*
* @throws {Error} If the source isn't a string file name.
*
* @example
* const {BigQuery} = require('@google-cloud/bigquery');
* const bigquery = new BigQuery();
* const dataset = bigquery.dataset('my-dataset');
* const table = dataset.table('my-table');
*
* const onJob = (job, apiResponse) => {
* // `job` is a Job object that can be used to check the status of the
* // request.
* };
*
* const onError = err => {
* // got an error
* }
*
* //-
* // Load data from a local file.
* //-
* table.createLoadJob('./institutions.csv')
* .once('error', onError)
* .once('job', onJob);
*
* //-
* // You may also pass in metadata in the format of a Jobs resource. See
* // (http://goo.gl/BVcXk4) for a full list of supported values.
* //-
* const metadata = {
* encoding: 'ISO-8859-1',
* sourceFormat: 'NEWLINE_DELIMITED_JSON'
* };
*
* table.createLoadJob('./my-data.csv', metadata)
* .once('error', onError)
* .once('job', onJob);
*/
createLoadJobStream(
source: string,
metadata?: JobLoadMetadata,
): Writable {
const jobLoadMetadata = this._prepareLoadJobMetadata(metadata);

// If a sourceFormat wasn't specified, try to find a match from the
// file's extension.
const detectedFormat =
FORMATS[
path
.extname(source)
.substr(1)
.toLowerCase()
];
if (!jobLoadMetadata.sourceFormat && detectedFormat) {
jobLoadMetadata.sourceFormat = detectedFormat;
}

// Read the file into a new write stream.
return fs.createReadStream(source)
.pipe(this.createWriteStream_(jobLoadMetadata));
}

createLoadJob(source: string | File, metadata?: JobLoadMetadata): Promise<JobResponse>;
createLoadJob(
source: string | File,
metadata?: JobLoadMetadata
): Promise<JobResponse>;
createLoadJob(
source: string | File,
metadata: JobLoadMetadata,
Expand Down Expand Up @@ -1327,16 +1223,41 @@ class Table extends common.ServiceObject {
* @returns {Promise<JobResponse>}
* @private
*/
async _createLoadJob(source: string | File | File[], metadata: JobLoadMetadata): Promise<JobResponse> {
async _createLoadJob(
source: string | File | File[],
metadata: JobLoadMetadata
): Promise<JobResponse> {
if (metadata.format) {
metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
delete metadata.format;
}

if (this.location) {
metadata.location = this.location;
}

if (typeof source === 'string') {
// A path to a file was given.
const jobWritable = this.createLoadJobStream(source, metadata);
const jobResponse = await pEvent(jobWritable, 'job') as Job;
// A path to a file was given. If a sourceFormat wasn't specified, try to
// find a match from the file's extension.
const detectedFormat =
FORMATS[
path
.extname(source)
.substr(1)
.toLowerCase()
];
if (!metadata.sourceFormat && detectedFormat) {
metadata.sourceFormat = detectedFormat;
}

// Read the file into a new write stream.
const jobWritable = fs
.createReadStream(source)
.pipe(this.createWriteStream_(metadata));
const jobResponse = (await pEvent(jobWritable, 'job')) as Job;
return [jobResponse, jobResponse.metadata];
}

const jobLoadMetadata = this._prepareLoadJobMetadata(metadata);

// tslint:disable-next-line no-any
const body: any = {
configuration: {
Expand All @@ -1350,22 +1271,22 @@ class Table extends common.ServiceObject {
},
};

if (jobLoadMetadata.jobPrefix) {
body.jobPrefix = jobLoadMetadata.jobPrefix;
delete jobLoadMetadata.jobPrefix;
if (metadata.jobPrefix) {
body.jobPrefix = metadata.jobPrefix;
delete metadata.jobPrefix;
}

if (jobLoadMetadata.location) {
body.location = jobLoadMetadata.location;
delete jobLoadMetadata.location;
if (metadata.location) {
body.location = metadata.location;
delete metadata.location;
}

if (jobLoadMetadata.jobId) {
body.jobId = jobLoadMetadata.jobId;
delete jobLoadMetadata.jobId;
if (metadata.jobId) {
body.jobId = metadata.jobId;
delete metadata.jobId;
}

extend(true, body.configuration.load, jobLoadMetadata, {
extend(true, body.configuration.load, metadata, {
sourceUris: arrify(source).map(src => {
if (!common.util.isCustomType(src, 'storage/file')) {
throw new Error('Source must be a File object.');
Expand All @@ -1381,7 +1302,7 @@ class Table extends common.ServiceObject {
.substr(1)
.toLowerCase()
];
if (!jobLoadMetadata.sourceFormat && format) {
if (!metadata.sourceFormat && format) {
body.configuration.load.sourceFormat = format;
}
return 'gs://' + src.bucket.name + '/' + src.name;
Expand Down

0 comments on commit 28e3ef0

Please sign in to comment.