Skip to content

Commit

Permalink
fix!(table): createLoadJobStream sync returns a stream, createLoadJob…
Browse files Browse the repository at this point in the history
… always returns a job #640
  • Loading branch information
zamnuts committed Mar 17, 2020
1 parent bf612bd commit 9a9e97d
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 109 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -57,6 +57,7 @@
"duplexify": "^4.0.0",
"extend": "^3.0.2",
"is": "^3.3.0",
"p-event": "^4.1.0",
"stream-events": "^1.0.5",
"string-format-obj": "^1.1.1",
"uuid": "^7.0.0"
Expand Down
200 changes: 145 additions & 55 deletions src/table.ts
Expand Up @@ -20,6 +20,7 @@ import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import Big from 'big.js';
import * as extend from 'extend';
import pEvent from 'p-event';

const format = require('string-format-obj');
import * as fs from 'fs';
Expand Down Expand Up @@ -1102,20 +1103,120 @@ class Table extends common.ServiceObject {
this.bigQuery.createJob(body, callback!);
}

createLoadJob(source: string, metadata?: JobLoadMetadata): Writable;
createLoadJob(source: File, metadata?: JobLoadMetadata): Promise<JobResponse>;
createLoadJob(
/**
* Add `location` if available. Replaces `format` with `sourceFormat` from
* lookup table if format is present. Does not mutate.
* @param {JobLoadMetadata} metadata
* @returns {JobLoadMetadata}
* @private
*/
_prepareLoadJobMetadata(metadata?: JobLoadMetadata): JobLoadMetadata {
if (!metadata) {
return {};
}

const {format, ...restMetadata} = metadata || {};
const {location} = this;
const sourceFormat = format ? FORMATS[format.toLowerCase()] : undefined;

return {
...restMetadata,
...(location ? {location} : {}),
...(format ? {sourceFormat} : {format}),
};
}

/**
* Load data from a local file, receiving
*
* By loading data this way, you create a load job that will run your data
* load asynchronously. If you would like instantaneous access to your data,
* insert it using {@liink Table#insert}.
*
* Note: The file type will be inferred by the given file's extension. If you
* wish to override this, you must provide `metadata.format`.
*
* @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
*
* @param {string} source The path to the local source file to load.
* @param {object} [metadata] Metadata to set with the load operation. The
* metadata object should be in the format of the
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs
* resource.
* @param {string} [metadata.format] The format the data being loaded is in.
* Allowed options are "AVRO", "CSV", "JSON", "ORC", or "PARQUET".
* @param {string} [metadata.jobId] Custom job id.
* @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
* @returns {Writable} Emits a "job" event upon completion.
*
* @throws {Error} If the source isn't a string file name.
*
* @example
* const {BigQuery} = require('@google-cloud/bigquery');
* const bigquery = new BigQuery();
* const dataset = bigquery.dataset('my-dataset');
* const table = dataset.table('my-table');
*
* const onJob = (job, apiResponse) => {
* // `job` is a Job object that can be used to check the status of the
* // request.
* };
*
* const onError = err => {
* // got an error
* }
*
* //-
* // Load data from a local file.
* //-
* table.createLoadJob('./institutions.csv')
* .once('error', onError)
* .once('job', onJob);
*
* //-
* // You may also pass in metadata in the format of a Jobs resource. See
* // (http://goo.gl/BVcXk4) for a full list of supported values.
* //-
* const metadata = {
* encoding: 'ISO-8859-1',
* sourceFormat: 'NEWLINE_DELIMITED_JSON'
* };
*
* table.createLoadJob('./my-data.csv', metadata)
* .once('error', onError)
* .once('job', onJob);
*/
createLoadJobStream(
source: string,
metadata: JobLoadMetadata,
callback: JobCallback
): Writable;
metadata?: JobLoadMetadata,
): Writable {
const jobLoadMetadata = this._prepareLoadJobMetadata(metadata);

// If a sourceFormat wasn't specified, try to find a match from the
// file's extension.
const detectedFormat =
FORMATS[
path
.extname(source)
.substr(1)
.toLowerCase()
];
if (!jobLoadMetadata.sourceFormat && detectedFormat) {
jobLoadMetadata.sourceFormat = detectedFormat;
}

// Read the file into a new write stream.
return fs.createReadStream(source)
.pipe(this.createWriteStream_(jobLoadMetadata));
}

createLoadJob(source: string | File, metadata?: JobLoadMetadata): Promise<JobResponse>;
createLoadJob(
source: File,
source: string | File,
metadata: JobLoadMetadata,
callback: JobCallback
): void;
createLoadJob(source: string, callback: JobCallback): Writable;
createLoadJob(source: File, callback: JobCallback): void;
createLoadJob(source: string | File, callback: JobCallback): void;
/**
* Load data from a local file or Storage {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}.
Expand All @@ -1129,10 +1230,10 @@ class Table extends common.ServiceObject {
*
* @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
*
* @param {string|File} source The source file to load. A string or a
* {@link
* @param {string|File|File[]} source The source file to load. A string (path)
* to a local file, or one or more {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}
* object.
* objects.
* @param {object} [metadata] Metadata to set with the load operation. The
* metadata object should be in the format of the
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs
Expand Down Expand Up @@ -1203,50 +1304,39 @@ class Table extends common.ServiceObject {
* });
*/
createLoadJob(
source: string | File,
source: string | File | File[],
metadataOrCallback?: JobLoadMetadata | JobCallback,
cb?: JobCallback
): void | Promise<JobResponse> | Writable {
): void | Promise<JobResponse> {
const metadata =
typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
const callback =
typeof metadataOrCallback === 'function'
? metadataOrCallback
: cb || common.util.noop;

if (metadata.format) {
metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
delete metadata.format;
}

if (this.location) {
metadata.location = this.location;
}
this._createLoadJob(source, metadata).then(
([resp]) => callback(null, resp, resp.metadata),
err => callback(err)
);
}

/**
* @param {string | File | File[]} source
* @param {JobLoadMetadata} metadata
* @returns {Promise<JobResponse>}
* @private
*/
async _createLoadJob(source: string | File | File[], metadata: JobLoadMetadata): Promise<JobResponse> {
if (typeof source === 'string') {
// A path to a file was given. If a sourceFormat wasn't specified, try to
// find a match from the file's extension.
const detectedFormat =
FORMATS[
path
.extname(source)
.substr(1)
.toLowerCase()
];
if (!metadata.sourceFormat && detectedFormat) {
metadata.sourceFormat = detectedFormat;
}

// Read the file into a new write stream.
return fs
.createReadStream(source)
.pipe(this.createWriteStream_(metadata))
.on('error', callback)
.on('job', job => {
callback(null, job, job.metadata);
});
// A path to a file was given.
const jobWritable = this.createLoadJobStream(source, metadata);
const jobResponse = await pEvent(jobWritable, 'job') as Job;
return [jobResponse, jobResponse.metadata];
}

const jobLoadMetadata = this._prepareLoadJobMetadata(metadata);

// tslint:disable-next-line no-any
const body: any = {
configuration: {
Expand All @@ -1260,22 +1350,22 @@ class Table extends common.ServiceObject {
},
};

if (metadata.jobPrefix) {
body.jobPrefix = metadata.jobPrefix;
delete metadata.jobPrefix;
if (jobLoadMetadata.jobPrefix) {
body.jobPrefix = jobLoadMetadata.jobPrefix;
delete jobLoadMetadata.jobPrefix;
}

if (metadata.location) {
body.location = metadata.location;
delete metadata.location;
if (jobLoadMetadata.location) {
body.location = jobLoadMetadata.location;
delete jobLoadMetadata.location;
}

if (metadata.jobId) {
body.jobId = metadata.jobId;
delete metadata.jobId;
if (jobLoadMetadata.jobId) {
body.jobId = jobLoadMetadata.jobId;
delete jobLoadMetadata.jobId;
}

extend(true, body.configuration.load, metadata, {
extend(true, body.configuration.load, jobLoadMetadata, {
sourceUris: arrify(source).map(src => {
if (!common.util.isCustomType(src, 'storage/file')) {
throw new Error('Source must be a File object.');
Expand All @@ -1291,14 +1381,14 @@ class Table extends common.ServiceObject {
.substr(1)
.toLowerCase()
];
if (!metadata.sourceFormat && format) {
if (!jobLoadMetadata.sourceFormat && format) {
body.configuration.load.sourceFormat = format;
}
return 'gs://' + src.bucket.name + '/' + src.name;
}),
});

this.bigQuery.createJob(body, callback);
return this.bigQuery.createJob(body);
}

createQueryJob(options: Query): Promise<JobResponse>;
Expand Down

0 comments on commit 9a9e97d

Please sign in to comment.