Skip to content

Commit 8f07f96

Browse files
ayushbilalaebyhr
authored andcommitted
Move DestinationTableBuilder to BigQueryClient
1 parent 63c06b1 commit 8f07f96

File tree

2 files changed

+67
-69
lines changed

2 files changed

+67
-69
lines changed

plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.plugin.bigquery;
1515

16+
import com.google.cloud.BaseServiceException;
1617
import com.google.cloud.bigquery.BigQuery;
1718
import com.google.cloud.bigquery.BigQueryException;
1819
import com.google.cloud.bigquery.Dataset;
@@ -31,6 +32,7 @@
3132
import com.google.common.cache.Cache;
3233
import com.google.common.cache.CacheBuilder;
3334
import com.google.common.collect.ImmutableSet;
35+
import io.airlift.log.Logger;
3436
import io.airlift.units.Duration;
3537
import io.trino.spi.TrinoException;
3638
import io.trino.spi.connector.TableNotFoundException;
@@ -41,6 +43,8 @@
4143
import java.util.Map;
4244
import java.util.Optional;
4345
import java.util.Set;
46+
import java.util.concurrent.Callable;
47+
import java.util.concurrent.TimeUnit;
4448
import java.util.function.Supplier;
4549

4650
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
@@ -50,6 +54,7 @@
5054
import static com.google.common.collect.Iterables.getOnlyElement;
5155
import static com.google.common.collect.Streams.stream;
5256
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
57+
import static io.trino.plugin.bigquery.BigQueryUtil.convertToBigQueryException;
5358
import static java.lang.String.format;
5459
import static java.util.Locale.ENGLISH;
5560
import static java.util.Objects.requireNonNull;
@@ -59,6 +64,8 @@
5964

6065
class BigQueryClient
6166
{
67+
private static final Logger log = Logger.get(BigQueryClient.class);
68+
6269
private final BigQuery bigQuery;
6370
private final Optional<String> viewMaterializationProject;
6471
private final Optional<String> viewMaterializationDataset;
@@ -337,4 +344,63 @@ public boolean isAmbiguous()
337344
return remoteNames.size() > 1;
338345
}
339346
}
347+
348+
static class DestinationTableBuilder
349+
implements Callable<TableInfo>
350+
{
351+
private final BigQueryClient bigQueryClient;
352+
private final ReadSessionCreatorConfig config;
353+
private final String query;
354+
private final TableId table;
355+
356+
DestinationTableBuilder(BigQueryClient bigQueryClient, ReadSessionCreatorConfig config, String query, TableId table)
357+
{
358+
this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient is null");
359+
this.config = requireNonNull(config, "config is null");
360+
this.query = requireNonNull(query, "query is null");
361+
this.table = requireNonNull(table, "table is null");
362+
}
363+
364+
@Override
365+
public TableInfo call()
366+
{
367+
return createTableFromQuery();
368+
}
369+
370+
TableInfo createTableFromQuery()
371+
{
372+
TableId destinationTable = bigQueryClient.createDestinationTable(table);
373+
log.debug("destinationTable is %s", destinationTable);
374+
JobInfo jobInfo = JobInfo.of(
375+
QueryJobConfiguration
376+
.newBuilder(query)
377+
.setDestinationTable(destinationTable)
378+
.build());
379+
log.debug("running query %s", jobInfo);
380+
Job job = waitForJob(bigQueryClient.create(jobInfo));
381+
log.debug("job has finished. %s", job);
382+
if (job.getStatus().getError() != null) {
383+
throw convertToBigQueryException(job.getStatus().getError());
384+
}
385+
// add expiration time to the table
386+
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
387+
long expirationTime = createdTable.getCreationTime() +
388+
TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
389+
Table updatedTable = bigQueryClient.update(createdTable.toBuilder()
390+
.setExpirationTime(expirationTime)
391+
.build());
392+
return updatedTable;
393+
}
394+
395+
Job waitForJob(Job job)
396+
{
397+
try {
398+
return job.waitFor();
399+
}
400+
catch (InterruptedException e) {
401+
Thread.currentThread().interrupt();
402+
throw new BigQueryException(BaseServiceException.UNKNOWN_CODE, format("Job %s has been interrupted", job.getJobId()), e);
403+
}
404+
}
405+
}
340406
}

plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@
1313
*/
1414
package io.trino.plugin.bigquery;
1515

16-
import com.google.cloud.BaseServiceException;
17-
import com.google.cloud.bigquery.BigQueryException;
18-
import com.google.cloud.bigquery.Job;
19-
import com.google.cloud.bigquery.JobInfo;
20-
import com.google.cloud.bigquery.QueryJobConfiguration;
21-
import com.google.cloud.bigquery.Table;
2216
import com.google.cloud.bigquery.TableDefinition;
2317
import com.google.cloud.bigquery.TableId;
2418
import com.google.cloud.bigquery.TableInfo;
@@ -33,15 +27,12 @@
3327

3428
import java.util.List;
3529
import java.util.Optional;
36-
import java.util.concurrent.Callable;
3730
import java.util.concurrent.ExecutionException;
3831
import java.util.concurrent.TimeUnit;
3932

4033
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED;
41-
import static io.trino.plugin.bigquery.BigQueryUtil.convertToBigQueryException;
4234
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
4335
import static java.lang.String.format;
44-
import static java.util.Objects.requireNonNull;
4536
import static java.util.stream.Collectors.toList;
4637

4738
// A helper class, also handles view materialization
@@ -130,7 +121,7 @@ private TableInfo getActualTable(
130121
String query = bigQueryClient.selectSql(table.getTableId(), requiredColumns);
131122
log.debug("query is %s", query);
132123
try {
133-
return destinationTableCache.get(query, new DestinationTableBuilder(bigQueryClient, config, query, table.getTableId()));
124+
return destinationTableCache.get(query, new BigQueryClient.DestinationTableBuilder(bigQueryClient, config, query, table.getTableId()));
134125
}
135126
catch (ExecutionException e) {
136127
throw new TrinoException(BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, "Error creating destination table", e);
@@ -142,63 +133,4 @@ private TableInfo getActualTable(
142133
tableType, table.getTableId().getDataset(), table.getTableId().getTable()));
143134
}
144135
}
145-
146-
private static class DestinationTableBuilder
147-
implements Callable<TableInfo>
148-
{
149-
private final BigQueryClient bigQueryClient;
150-
private final ReadSessionCreatorConfig config;
151-
private final String query;
152-
private final TableId table;
153-
154-
DestinationTableBuilder(BigQueryClient bigQueryClient, ReadSessionCreatorConfig config, String query, TableId table)
155-
{
156-
this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient is null");
157-
this.config = requireNonNull(config, "config is null");
158-
this.query = requireNonNull(query, "query is null");
159-
this.table = requireNonNull(table, "table is null");
160-
}
161-
162-
@Override
163-
public TableInfo call()
164-
{
165-
return createTableFromQuery();
166-
}
167-
168-
TableInfo createTableFromQuery()
169-
{
170-
TableId destinationTable = bigQueryClient.createDestinationTable(table);
171-
log.debug("destinationTable is %s", destinationTable);
172-
JobInfo jobInfo = JobInfo.of(
173-
QueryJobConfiguration
174-
.newBuilder(query)
175-
.setDestinationTable(destinationTable)
176-
.build());
177-
log.debug("running query %s", jobInfo);
178-
Job job = waitForJob(bigQueryClient.create(jobInfo));
179-
log.debug("job has finished. %s", job);
180-
if (job.getStatus().getError() != null) {
181-
throw convertToBigQueryException(job.getStatus().getError());
182-
}
183-
// add expiration time to the table
184-
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
185-
long expirationTime = createdTable.getCreationTime() +
186-
TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
187-
Table updatedTable = bigQueryClient.update(createdTable.toBuilder()
188-
.setExpirationTime(expirationTime)
189-
.build());
190-
return updatedTable;
191-
}
192-
193-
Job waitForJob(Job job)
194-
{
195-
try {
196-
return job.waitFor();
197-
}
198-
catch (InterruptedException e) {
199-
Thread.currentThread().interrupt();
200-
throw new BigQueryException(BaseServiceException.UNKNOWN_CODE, format("Job %s has been interrupted", job.getJobId()), e);
201-
}
202-
}
203-
}
204136
}

0 commit comments

Comments
 (0)