Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate jobs.query and stateless query for faster queries #1337

Merged
merged 13 commits into from
Mar 27, 2024
Merged
Next Next commit
feat: integrate jobs.query and jobless query for faster queries
  • Loading branch information
alvarowolfx committed Feb 16, 2024
commit c2a684172cd77087e836ea6fe54cb80da5bfdbe0
138 changes: 130 additions & 8 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
} from './table';
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
import bigquery from './types';
import {logger, setLogFunction} from './logger';

// Third-Party Re-exports
export {common};
Expand Down Expand Up @@ -164,6 +165,9 @@
bigquery.IJobList
>;

export type JobsQueryResponse = [Job, bigquery.IQueryResponse];
export type JobsQueryCallback = ResourceCallback<Job, bigquery.IQueryResponse>;

export interface BigQueryTimeOptions {
hours?: number | string;
minutes?: number | string;
Expand Down Expand Up @@ -319,17 +323,17 @@
location?: string;
private _universeDomain: string;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {

Check warning on line 326 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
}

getDatasetsStream(options?: GetDatasetsOptions): ResourceStream<Dataset> {

Check warning on line 331 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Dataset>({}, () => {});
}

getJobsStream(options?: GetJobsOptions): ResourceStream<Job> {

Check warning on line 336 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Job>({}, () => {});
}
Expand Down Expand Up @@ -488,6 +492,11 @@
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private trace(msg: string, ...otherArgs: any[]) {
logger('[bigquery]', msg, ...otherArgs);
}

get universeDomain() {
return this._universeDomain;
}
Expand Down Expand Up @@ -1439,6 +1448,7 @@
},
options
);
this.trace('[createQueryJob]', query);

if (options.destination) {
if (!(options.destination instanceof Table)) {
Expand Down Expand Up @@ -2106,22 +2116,132 @@
: {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
this.createQueryJob(query, (err, job, resp) => {

this.trace('[query]', query, options);
const queryReq = this.probeFastPath_(query, options);
if (!queryReq) {
this.createQueryJob(query, (err, job, resp) => {
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, resp);
return;
}
if (typeof query === 'object' && query.dryRun) {
(callback as SimpleQueryRowsCallback)(null, [], resp);
return;
}
// The Job is important for the `queryAsStream_` method, so a new query
// isn't created each time results are polled for.
options = extend({job}, queryOpts, options);
job!.getQueryResults(options, callback as QueryRowsCallback);
});
return;
}

this.syncQuery(queryReq, options, (err, job, res) => {
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, resp);
(callback as SimpleQueryRowsCallback)(err, null, res);
return;
}
if (typeof query === 'object' && query.dryRun) {
(callback as SimpleQueryRowsCallback)(null, [], resp);
return;

let nextQuery = extend({job}, options);
if (res && res.jobComplete) {
let rows: any = [];

Check warning on line 2148 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
if (res.schema && res.rows) {
rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, {
wrapIntegers: options.wrapIntegers!, // TODO: fix default value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (re: not sure if this was moved from elsewhere or new code) but I wanted to ping in case the TODO isn't intentional.

parseJSON: options.parseJSON,
});
}
this.trace('[syncQuery] job complete');
if (res.pageToken) {
this.trace('[syncQuery] has more pages');
nextQuery = extend({job}, options, {
pageToken: res.pageToken,
cachedRows: rows,
});
job!.getQueryResults(nextQuery, callback as QueryRowsCallback);
return;
} else {
this.trace('[syncQuery] no more pages');
(callback as SimpleQueryRowsCallback)(err, rows, res);
return;
}
}
// The Job is important for the `queryAsStream_` method, so a new query
// isn't created each time results are polled for.
options = extend({job}, queryOpts, options);
this.trace('[syncQuery] job not complete');
job!.getQueryResults(options, callback as QueryRowsCallback);
});
}

private probeFastPath_(
query: string | Query,
options: QueryOptions
): bigquery.IQueryRequest | undefined {
this.trace('[probeFastPath_]', query, options);
if (process.env.FAST_QUERY_PATH === 'DISABLED') {
return undefined;
}
if (typeof query === 'string') {
if (!options.job) {
const req: bigquery.IQueryRequest = {
...options,
useQueryCache: false,
jobCreationMode: 'JOB_CREATION_OPTIONAL',
useLegacySql: false,
requestId: uuid.v4(),
query: query,
};
return req;
}
return undefined;
}
// TODO: non string query and convert to QueryRequest
return undefined;
}

syncQuery(
req: bigquery.IQueryRequest,
options?: QueryResultsOptions
): Promise<JobsQueryResponse>;
syncQuery(
req: bigquery.IQueryRequest,
options: QueryResultsOptions,
callback: JobsQueryCallback
): void;
syncQuery(
req: bigquery.IQueryRequest,
optionsOrCallback?: QueryResultsOptions | JobsQueryCallback,
cb?: JobsQueryCallback
): void | Promise<JobsQueryResponse> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;

this.trace('[syncQuery]', req, options, callback);
this.request(
{
method: 'POST',
uri: '/queries',
json: req,
},
async (err, res: bigquery.IQueryResponse) => {
if (err) {
callback!(err, null, res);
return;
}
this.trace('jobs.query res:', res);
let job: Job | null = null;
if (res.jobReference) {
const jobRef = res.jobReference!;
job = this.job(jobRef.jobId!, {
location: jobRef.location,
});
}
callback!(null, job, res);
}
);
}

/**
* This method will be called by `createQueryStream()`. It is required to
* properly set the `autoPaginate` option value.
Expand Down Expand Up @@ -2153,6 +2273,8 @@

this.query(query, opts, callback);
}

static setLogFunction = setLogFunction;
}

/*! Developer Documentation
Expand Down
23 changes: 23 additions & 0 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
} from './bigquery';
import {RowMetadata} from './table';
import bigquery from './types';
import {logger} from './logger';

export type JobMetadata = bigquery.IJob;
export type JobOptions = JobRequest<JobMetadata>;
Expand All @@ -50,6 +51,7 @@
job?: Job;
wrapIntegers?: boolean | IntegerTypeCastOptions;
parseJSON?: boolean;
cachedRows?: any[];

Check warning on line 54 in src/job.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
} & PagedRequest<bigquery.jobs.IGetQueryResultsParams>;

/**
Expand Down Expand Up @@ -127,7 +129,7 @@
location?: string;
projectId?: string;
getQueryResultsStream(
options?: QueryResultsOptions

Check warning on line 132 in src/job.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
): ResourceStream<RowMetadata> {
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
Expand Down Expand Up @@ -379,6 +381,11 @@
);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private trace(msg: string, ...otherArgs: any[]) {
logger(`[job][${this.id}]`, msg, ...otherArgs);
}

/**
* @callback CancelCallback
* @param {?Error} err Request error, if any.
Expand Down Expand Up @@ -536,6 +543,12 @@
},
options
);
this.trace(
'[getQueryResults]',
this.id,
options.pageToken,
options.startIndex
);

const wrapIntegers = qs.wrapIntegers ? qs.wrapIntegers : false;
delete qs.wrapIntegers;
Expand All @@ -547,6 +560,15 @@
const timeoutOverride =
typeof qs.timeoutMs === 'number' ? qs.timeoutMs : false;

if (options.cachedRows) {
const nextQuery = Object.assign({}, options, {
pageToken: options.pageToken,
});
delete nextQuery.cachedRows;
callback!(null, options.cachedRows, nextQuery);
return;
}

this.bigQuery.request(
{
uri: '/queries/' + this.id,
Expand Down Expand Up @@ -582,6 +604,7 @@
return;
}
} else if (resp.pageToken) {
this.trace('[getQueryResults] has more pages', resp.pageToken);
// More results exist.
nextQuery = Object.assign({}, options, {
pageToken: resp.pageToken,
Expand Down
47 changes: 47 additions & 0 deletions src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as util from 'util';

/*! The external function used to emit logs. */
let logFunction: ((msg: string) => void) | null = null;

/**
* Log function to use for debug output. By default, we don't perform any
* logging.
*
* @private
* @internal
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function logger(source: string, msg: string, ...otherArgs: any[]) {
if (logFunction) {
const time = new Date().toISOString();
const formattedMsg = util.format(
`D ${time} | ${source} | ${msg} |`,
...otherArgs
);
logFunction(formattedMsg);
}
}

/**
* Sets or disables the log function for all active Firestore instances.
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
*
* @param logger A log function that takes a message (such as `console.log`) or
* `null` to turn off logging.
*/
export function setLogFunction(logger: ((msg: string) => void) | null): void {
logFunction = logger;
}
7 changes: 7 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2984,6 +2984,13 @@ declare namespace bigquery {
* [Optional] If set to true, BigQuery doesn't run the job. Instead, if the query is valid, BigQuery returns statistics about the job such as how many bytes would be processed. If the query is invalid, an error returns. The default value is false.
*/
dryRun?: boolean;
/**
* Optional. If not set, jobs are always required. If set, the query request will follow the behavior described JobCreationMode. This feature is not yet available. Jobs will always be created.
*/
jobCreationMode?:
| 'JOB_CREATION_MODE_UNSPECIFIED'
| 'JOB_CREATION_REQUIRED'
| 'JOB_CREATION_OPTIONAL';
/**
* The resource type of the request.
*/
Expand Down