Skip to content

Commit

Permalink
Move DestinationTableBuilder to BigQueryClient
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushbilala authored and ebyhr committed Jun 29, 2021
1 parent 63c06b1 commit 8f07f96
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.BaseServiceException;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
Expand All @@ -31,6 +32,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.TableNotFoundException;
Expand All @@ -41,6 +43,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
Expand All @@ -50,6 +54,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryUtil.convertToBigQueryException;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -59,6 +64,8 @@

class BigQueryClient
{
private static final Logger log = Logger.get(BigQueryClient.class);

private final BigQuery bigQuery;
private final Optional<String> viewMaterializationProject;
private final Optional<String> viewMaterializationDataset;
Expand Down Expand Up @@ -337,4 +344,63 @@ public boolean isAmbiguous()
return remoteNames.size() > 1;
}
}

static class DestinationTableBuilder
implements Callable<TableInfo>
{
private final BigQueryClient bigQueryClient;
private final ReadSessionCreatorConfig config;
private final String query;
private final TableId table;

DestinationTableBuilder(BigQueryClient bigQueryClient, ReadSessionCreatorConfig config, String query, TableId table)
{
this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient is null");
this.config = requireNonNull(config, "config is null");
this.query = requireNonNull(query, "query is null");
this.table = requireNonNull(table, "table is null");
}

@Override
public TableInfo call()
{
return createTableFromQuery();
}

TableInfo createTableFromQuery()
{
TableId destinationTable = bigQueryClient.createDestinationTable(table);
log.debug("destinationTable is %s", destinationTable);
JobInfo jobInfo = JobInfo.of(
QueryJobConfiguration
.newBuilder(query)
.setDestinationTable(destinationTable)
.build());
log.debug("running query %s", jobInfo);
Job job = waitForJob(bigQueryClient.create(jobInfo));
log.debug("job has finished. %s", job);
if (job.getStatus().getError() != null) {
throw convertToBigQueryException(job.getStatus().getError());
}
// add expiration time to the table
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
long expirationTime = createdTable.getCreationTime() +
TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
Table updatedTable = bigQueryClient.update(createdTable.toBuilder()
.setExpirationTime(expirationTime)
.build());
return updatedTable;
}

Job waitForJob(Job job)
{
try {
return job.waitFor();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseServiceException.UNKNOWN_CODE, format("Job %s has been interrupted", job.getJobId()), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.BaseServiceException;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
Expand All @@ -33,15 +27,12 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED;
import static io.trino.plugin.bigquery.BigQueryUtil.convertToBigQueryException;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

// A helper class, also handles view materialization
Expand Down Expand Up @@ -130,7 +121,7 @@ private TableInfo getActualTable(
String query = bigQueryClient.selectSql(table.getTableId(), requiredColumns);
log.debug("query is %s", query);
try {
return destinationTableCache.get(query, new DestinationTableBuilder(bigQueryClient, config, query, table.getTableId()));
return destinationTableCache.get(query, new BigQueryClient.DestinationTableBuilder(bigQueryClient, config, query, table.getTableId()));
}
catch (ExecutionException e) {
throw new TrinoException(BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, "Error creating destination table", e);
Expand All @@ -142,63 +133,4 @@ private TableInfo getActualTable(
tableType, table.getTableId().getDataset(), table.getTableId().getTable()));
}
}

private static class DestinationTableBuilder
implements Callable<TableInfo>
{
private final BigQueryClient bigQueryClient;
private final ReadSessionCreatorConfig config;
private final String query;
private final TableId table;

DestinationTableBuilder(BigQueryClient bigQueryClient, ReadSessionCreatorConfig config, String query, TableId table)
{
this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient is null");
this.config = requireNonNull(config, "config is null");
this.query = requireNonNull(query, "query is null");
this.table = requireNonNull(table, "table is null");
}

@Override
public TableInfo call()
{
return createTableFromQuery();
}

TableInfo createTableFromQuery()
{
TableId destinationTable = bigQueryClient.createDestinationTable(table);
log.debug("destinationTable is %s", destinationTable);
JobInfo jobInfo = JobInfo.of(
QueryJobConfiguration
.newBuilder(query)
.setDestinationTable(destinationTable)
.build());
log.debug("running query %s", jobInfo);
Job job = waitForJob(bigQueryClient.create(jobInfo));
log.debug("job has finished. %s", job);
if (job.getStatus().getError() != null) {
throw convertToBigQueryException(job.getStatus().getError());
}
// add expiration time to the table
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
long expirationTime = createdTable.getCreationTime() +
TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
Table updatedTable = bigQueryClient.update(createdTable.toBuilder()
.setExpirationTime(expirationTime)
.build());
return updatedTable;
}

Job waitForJob(Job job)
{
try {
return job.waitFor();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseServiceException.UNKNOWN_CODE, format("Job %s has been interrupted", job.getJobId()), e);
}
}
}
}

0 comments on commit 8f07f96

Please sign in to comment.