Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate jobs.query and stateless query for faster queries #1337

Merged
merged 13 commits into from
Mar 27, 2024
Merged
Prev Previous commit
Next Next commit
feat: fully build QueryRequest object for jobs.query
  • Loading branch information
alvarowolfx committed Mar 11, 2024
commit 970e08f190594669011d49880939fe24b1856a0a
287 changes: 173 additions & 114 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,17 +323,17 @@
location?: string;
private _universeDomain: string;

createQueryStream(options?: Query | string): ResourceStream<RowMetadata> {

Check warning on line 326 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<RowMetadata>({}, () => {});
}

getDatasetsStream(options?: GetDatasetsOptions): ResourceStream<Dataset> {

Check warning on line 331 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Dataset>({}, () => {});
}

getJobsStream(options?: GetJobsOptions): ResourceStream<Job> {

Check warning on line 336 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

'options' is defined but never used
// placeholder body, overwritten in constructor
return new ResourceStream<Job>({}, () => {});
}
Expand Down Expand Up @@ -493,7 +493,7 @@
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private trace(msg: string, ...otherArgs: any[]) {
private trace_(msg: string, ...otherArgs: any[]) {
logger('[bigquery]', msg, ...otherArgs);
}

Expand Down Expand Up @@ -1436,20 +1436,19 @@
callback?: JobCallback
): void | Promise<JobResponse> {
const options = typeof opts === 'object' ? opts : {query: opts};
this.trace('[createQueryJob]', options, callback);
this.trace_('[createQueryJob]', options, callback);
if ((!options || !options.query) && !options.pageToken) {
throw new Error('A SQL query string is required.');
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const query: any = extend(
const query: Query = extend(
true,
{
useLegacySql: false,
},
options
);
this.trace('[createQueryJob]', query);
this.trace_('[createQueryJob]', query);

if (options.destination) {
if (!(options.destination instanceof Table)) {
Expand All @@ -1465,78 +1464,21 @@
delete query.destination;
}

if (query.params) {
query.parameterMode = is.array(query.params) ? 'positional' : 'named';

if (query.parameterMode === 'named') {
query.queryParameters = [];

// tslint:disable-next-line forin
for (const namedParameter in query.params) {
const value = query.params[namedParameter];
let queryParameter;

if (query.types) {
if (!is.object(query.types)) {
throw new Error(
'Provided types must match the value type passed to `params`'
);
}

if (query.types[namedParameter]) {
queryParameter = BigQuery.valueToQueryParameter_(
value,
query.types[namedParameter]
);
} else {
queryParameter = BigQuery.valueToQueryParameter_(value);
}
} else {
queryParameter = BigQuery.valueToQueryParameter_(value);
}

queryParameter.name = namedParameter;
query.queryParameters.push(queryParameter);
}
} else {
query.queryParameters = [];

if (query.types) {
if (!is.array(query.types)) {
throw new Error(
'Provided types must match the value type passed to `params`'
);
}

if (query.params.length !== query.types.length) {
throw new Error('Incorrect number of parameter types provided.');
}
query.params.forEach((value: {}, i: number) => {
const queryParameter = BigQuery.valueToQueryParameter_(
value,
query.types[i]
);
query.queryParameters.push(queryParameter);
});
} else {
query.params.forEach((value: {}) => {
const queryParameter = BigQuery.valueToQueryParameter_(value);
query.queryParameters.push(queryParameter);
});
}
}
delete query.params;
}
const {parameterMode, params} = this.buildQueryParams_(
query.params,
query.types
);
query.parameterMode = parameterMode;
query.queryParameters = params;
delete query.params;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const reqOpts: any = {
configuration: {
query,
},
const reqOpts: JobOptions = {};
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
reqOpts.configuration = {
query,
};

if (typeof query.jobTimeoutMs === 'number') {
reqOpts.configuration.jobTimeoutMs = query.jobTimeoutMs;
reqOpts.configuration.jobTimeoutMs = query.jobTimeoutMs.toString();
delete query.jobTimeoutMs;
}

Expand Down Expand Up @@ -1568,6 +1510,85 @@
this.createJob(reqOpts, callback!);
}

private buildQueryParams_(
params: Query['params'],
types: Query['types']
): {
parameterMode: 'positional' | 'named' | undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to pull this type out into an interface, but YMMV.

params: bigquery.IQueryParameter[] | undefined;
} {
if (!params) {
return {
parameterMode: undefined,
params: undefined,
};
}
const parameterMode = is.array(params) ? 'positional' : 'named';
const queryParameters: bigquery.IQueryParameter[] = [];
if (parameterMode === 'named') {
const namedParams = params as {[param: string]: any};

Check warning on line 1529 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
for (const namedParameter in namedParams) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use for (const namedParameter of Object.getOwnPropertyNames(namedParams)) or check each namedParameter against namedParams.hasOwnProperty().

const value = namedParams[namedParameter];
let queryParameter;

if (types) {
if (!is.object(types)) {
throw new Error(
'Provided types must match the value type passed to `params`'
);
}

const namedTypes = types as QueryParamTypeStruct;

if (namedTypes[namedParameter]) {
queryParameter = BigQuery.valueToQueryParameter_(
value,
namedTypes[namedParameter]
);
} else {
queryParameter = BigQuery.valueToQueryParameter_(value);
}
} else {
queryParameter = BigQuery.valueToQueryParameter_(value);
}

queryParameter.name = namedParameter;
queryParameters.push(queryParameter);
}
} else {
if (types) {
if (!is.array(types)) {
throw new Error(
'Provided types must match the value type passed to `params`'
);
}

const positionalTypes = types as QueryParamTypeStruct[];

if (params.length !== types.length) {
throw new Error('Incorrect number of parameter types provided.');
}
params.forEach((value: {}, i: number) => {
const queryParameter = BigQuery.valueToQueryParameter_(
value,
positionalTypes[i]
);
queryParameters.push(queryParameter);
});
} else {
params.forEach((value: {}) => {
const queryParameter = BigQuery.valueToQueryParameter_(value);
queryParameters.push(queryParameter);
});
}
}

return {
parameterMode,
params: queryParameters,
};
}

/**
* Creates a job. Typically when creating a job you'll have a very specific
* task in mind. For this we recommend one of the following methods:
Expand Down Expand Up @@ -2118,8 +2139,9 @@
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;

this.trace('[query]', query, options);
this.trace_('[query]', query, options);
const queryReq = this.buildQueryRequest_(query, options);
this.trace_('[query] queryReq', queryReq);
if (!queryReq) {
this.createQueryJob(query, (err, job, resp) => {
if (err) {
Expand All @@ -2138,7 +2160,8 @@
return;
}

this.runJobsQuery(queryReq, options, (err, job, res) => {
this.runJobsQuery(queryReq, (err, job, res) => {
this.trace_('[runJobsQuery callback]: ', query, err, job, res);
if (err) {
(callback as SimpleQueryRowsCallback)(err, null, res);
return;
Expand All @@ -2146,7 +2169,7 @@

options = extend({job}, queryOpts, options);
if (res && res.jobComplete) {
let rows: any = [];

Check warning on line 2172 in src/bigquery.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
if (res.schema && res.rows) {
rows = BigQuery.mergeSchemaWithRows_(res.schema, res.rows, {
wrapIntegers: options.wrapIntegers!, // TODO: fix default value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (re: not sure if this was moved from elsewhere or new code) but I wanted to ping in case the TODO isn't intentional.

Expand All @@ -2154,17 +2177,17 @@
});
}
options.cachedRows = rows;
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
this.trace('[runJobsQuery] job complete');
this.trace_('[runJobsQuery] job complete');
if (res.pageToken) {
this.trace('[runJobsQuery] has more pages');
this.trace_('[runJobsQuery] has more pages');
options.pageToken = res.pageToken;
} else {
this.trace('[runJobsQuery] no more pages');
this.trace_('[runJobsQuery] no more pages');
}
job!.getQueryResults(options, callback as QueryRowsCallback);
return;
}
this.trace('[runJobsQuery] job not complete');
this.trace_('[runJobsQuery] job not complete');
job!.getQueryResults(options, callback as QueryRowsCallback);
});
}
Expand All @@ -2182,68 +2205,104 @@
query: string | Query,
options: QueryOptions
): bigquery.IQueryRequest | undefined {
this.trace('[buildQueryRequest]', query, options);
if (process.env.FAST_QUERY_PATH === 'DISABLED') {
return undefined;
}
if (typeof query === 'string') {
if (!options.job) {
const req: bigquery.IQueryRequest = {
...options,
useQueryCache: false,
jobCreationMode: 'JOB_CREATION_OPTIONAL',
useLegacySql: false,
requestId: uuid.v4(),
query: query,
};
return req;
}
const queryObj: Query =
typeof query === 'string'
? {
query: query,
}
: query;
this.trace_('[buildQueryRequest]', query, options, queryObj);
// This is a denylist of settings which prevent us from composing an equivalent
// bq.QueryRequest due to differences between configuration parameters accepted
// by jobs.insert vs jobs.query.
if (
!!queryObj.destination ||
!!queryObj.tableDefinitions ||
!!queryObj.createDisposition ||
!!queryObj.writeDisposition ||
(!!queryObj.priority && queryObj.priority !== 'INTERACTIVE') ||
queryObj.useLegacySql ||
!!queryObj.maximumBillingTier ||
!!queryObj.timePartitioning ||
!!queryObj.rangePartitioning ||
!!queryObj.clustering ||
!!queryObj.destinationEncryptionConfiguration ||
!!queryObj.schemaUpdateOptions ||
!!queryObj.jobTimeoutMs ||
// User has defined the jobID generation behavior
!!queryObj.jobId
) {
return undefined;
}
// TODO: non string query and convert to QueryRequest
return undefined;

if (queryObj.dryRun) {
return undefined;
}

if (options.job) {
return undefined;
}
const req: bigquery.IQueryRequest = {
useQueryCache: queryObj.useQueryCache,
labels: queryObj.labels,
defaultDataset: queryObj.defaultDataset,
createSession: queryObj.createSession,
maximumBytesBilled: queryObj.maximumBytesBilled,
timeoutMs: options.timeoutMs,
location: queryObj.location || options.location,
formatOptions: {
useInt64Timestamp: true,
},
maxResults: queryObj.maxResults || options.maxResults,
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
query: queryObj.query,
useLegacySql: false,
requestId: uuid.v4(),
jobCreationMode: 'JOB_CREATION_OPTIONAL',
};
if (req.maxResults) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary workaround?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gonna remove that

req.jobCreationMode = 'JOB_CREATION_REQUIRED';
}
const {parameterMode, params} = this.buildQueryParams_(
queryObj.params,
queryObj.types
);
if (params) {
req.queryParameters = params;
}
if (parameterMode) {
req.parameterMode = parameterMode;
}
return req;
}

private runJobsQuery(
req: bigquery.IQueryRequest,
options?: QueryResultsOptions
): Promise<JobsQueryResponse>;
private runJobsQuery(
req: bigquery.IQueryRequest,
options: QueryResultsOptions,
callback: JobsQueryCallback
): void;
private runJobsQuery(
req: bigquery.IQueryRequest,
optionsOrCallback?: QueryResultsOptions | JobsQueryCallback,
cb?: JobsQueryCallback
callback?: JobsQueryCallback
): void | Promise<JobsQueryResponse> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;

this.trace('[runJobsQuery]', req, options, callback);
this.trace_('[runJobsQuery]', req, callback);
this.request(
{
method: 'POST',
uri: '/queries',
json: req,
},
async (err, res: bigquery.IQueryResponse) => {
this.trace_('jobs.query res:', res, err);
if (err) {
callback!(err, null, res);
return;
}
this.trace('jobs.query res:', res);
let job: Job | null = null;
if (res.jobReference) {
const jobRef = res.jobReference!;
job = this.job(jobRef.jobId!, {
location: jobRef.location,
});
} else {
job = this.job(res.queryId!); // stateless query
} else if (res.queryId) {
job = this.job(res.queryId); // stateless query
}
callback!(null, job, res);
}
Expand Down