Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate jobs.query and stateless query for faster queries #1337

Merged
merged 13 commits into from
Mar 27, 2024
Merged
Prev Previous commit
Next Next commit
feat: support stateless queries
  • Loading branch information
alvarowolfx committed Feb 28, 2024
commit 0717724a81fc89c58fd273e1160d19b5d28f4ad0
21 changes: 10 additions & 11 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,17 +323,17 @@
location?: string;
private _universeDomain: string;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {

Check warning on line 326 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
}

getDatasetsStream(options?: GetDatasetsOptions): ResourceStream<Dataset> {

Check warning on line 331 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Dataset>({}, () => {});
}

getJobsStream(options?: GetJobsOptions): ResourceStream<Job> {

Check warning on line 336 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Job>({}, () => {});
}
Expand Down Expand Up @@ -1436,6 +1436,7 @@
callback?: JobCallback
): void | Promise<JobResponse> {
const options = typeof opts === 'object' ? opts : {query: opts};
this.trace('[createQueryJob]', options, callback);
if ((!options || !options.query) && !options.pageToken) {
throw new Error('A SQL query string is required.');
}
Expand Down Expand Up @@ -2143,29 +2144,25 @@
return;
}

let nextQuery = extend({job}, options);
options = extend({job}, queryOpts, options);
if (res && res.jobComplete) {
let rows: any = [];

Check warning on line 2149 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
if (res.schema && res.rows) {
rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, {
wrapIntegers: options.wrapIntegers!, // TODO: fix default value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (re: not sure if this was moved from elsewhere or new code) but I wanted to ping in case the TODO isn't intentional.

parseJSON: options.parseJSON,
});
}
options.cachedRows = rows;
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
this.trace('[runJobsQuery] job complete');
if (res.pageToken) {
this.trace('[runJobsQuery] has more pages');
nextQuery = extend({job}, options, {
pageToken: res.pageToken,
cachedRows: rows,
});
job!.getQueryResults(nextQuery, callback as QueryRowsCallback);
return;
options.pageToken = res.pageToken;
} else {
this.trace('[runJobsQuery] no more pages');
(callback as SimpleQueryRowsCallback)(err, rows, res);
return;
}
job!.getQueryResults(options, callback as QueryRowsCallback);
return;
}
this.trace('[runJobsQuery] job not complete');
job!.getQueryResults(options, callback as QueryRowsCallback);
Expand All @@ -2176,8 +2173,8 @@
* Check if the given Query can run using the `jobs.query` endpoint.
* Returns a bigquery.IQueryRequest that can be used to call `jobs.query`.
* Return undefined if is not possible to convert to a bigquery.IQueryRequest.
*
* @param query string | Query
*
* @param query string | Query
* @param options QueryOptions
* @returns bigquery.IQueryRequest | undefined
*/
Expand Down Expand Up @@ -2245,6 +2242,8 @@
job = this.job(jobRef.jobId!, {
location: jobRef.location,
});
} else {
job = this.job(res.queryId!); // stateless query
}
callback!(null, job, res);
}
Expand Down
11 changes: 7 additions & 4 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
job?: Job;
wrapIntegers?: boolean | IntegerTypeCastOptions;
parseJSON?: boolean;
cachedRows?: any[];

Check warning on line 54 in src/job.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
} & PagedRequest<bigquery.jobs.IGetQueryResultsParams>;

/**
Expand Down Expand Up @@ -129,7 +129,7 @@
location?: string;
projectId?: string;
getQueryResultsStream(
options?: QueryResultsOptions

Check warning on line 132 in src/job.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
): ResourceStream<RowMetadata> {
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
Expand Down Expand Up @@ -561,10 +561,13 @@
typeof qs.timeoutMs === 'number' ? qs.timeoutMs : false;

if (options.cachedRows) {
const nextQuery = Object.assign({}, options, {
pageToken: options.pageToken,
});
delete nextQuery.cachedRows;
let nextQuery: QueryResultsOptions | null = null;
if (options.pageToken) {
nextQuery = Object.assign({}, options, {
pageToken: options.pageToken,
});
delete nextQuery.cachedRows;
}
callback!(null, options.cachedRows, nextQuery);
return;
}
Expand Down
16 changes: 16 additions & 0 deletions test/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2812,6 +2812,10 @@ describe('BigQuery', () => {
callback(error, null, FAKE_RESPONSE);
};

bq.buildQueryRequest_ = (query: {}, options: {}) => {
return undefined;
};

bq.query(QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.strictEqual(err, error);
assert.strictEqual(rows, null);
Expand Down Expand Up @@ -2850,6 +2854,10 @@ describe('BigQuery', () => {
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.buildQueryRequest_ = (query: {}, options: {}) => {
return undefined;
};

bq.query(QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.ifError(err);
assert.strictEqual(rows, FAKE_ROWS);
Expand Down Expand Up @@ -2901,6 +2909,10 @@ describe('BigQuery', () => {
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.buildQueryRequest_ = (query: {}, opts: {}) => {
return undefined;
};

bq.query(QUERY_STRING, assert.ifError);
});

Expand All @@ -2918,6 +2930,10 @@ describe('BigQuery', () => {
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.buildQueryRequest_ = (query: {}, opts: {}) => {
return undefined;
};

bq.query(QUERY_STRING, fakeOptions, assert.ifError);
});
});
Expand Down