13
13
*/
14
14
package io .trino .plugin .bigquery ;
15
15
16
- import com .google .cloud .BaseServiceException ;
17
- import com .google .cloud .bigquery .BigQueryException ;
18
- import com .google .cloud .bigquery .Job ;
19
- import com .google .cloud .bigquery .JobInfo ;
20
- import com .google .cloud .bigquery .QueryJobConfiguration ;
21
- import com .google .cloud .bigquery .Table ;
22
16
import com .google .cloud .bigquery .TableDefinition ;
23
17
import com .google .cloud .bigquery .TableId ;
24
18
import com .google .cloud .bigquery .TableInfo ;
33
27
34
28
import java .util .List ;
35
29
import java .util .Optional ;
36
- import java .util .concurrent .Callable ;
37
30
import java .util .concurrent .ExecutionException ;
38
31
import java .util .concurrent .TimeUnit ;
39
32
40
33
import static io .trino .plugin .bigquery .BigQueryErrorCode .BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED ;
41
- import static io .trino .plugin .bigquery .BigQueryUtil .convertToBigQueryException ;
42
34
import static io .trino .spi .StandardErrorCode .NOT_SUPPORTED ;
43
35
import static java .lang .String .format ;
44
- import static java .util .Objects .requireNonNull ;
45
36
import static java .util .stream .Collectors .toList ;
46
37
47
38
// A helper class, also handles view materialization
@@ -130,7 +121,7 @@ private TableInfo getActualTable(
130
121
String query = bigQueryClient .selectSql (table .getTableId (), requiredColumns );
131
122
log .debug ("query is %s" , query );
132
123
try {
133
- return destinationTableCache .get (query , new DestinationTableBuilder (bigQueryClient , config , query , table .getTableId ()));
124
+ return destinationTableCache .get (query , new BigQueryClient . DestinationTableBuilder (bigQueryClient , config , query , table .getTableId ()));
134
125
}
135
126
catch (ExecutionException e ) {
136
127
throw new TrinoException (BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED , "Error creating destination table" , e );
@@ -142,63 +133,4 @@ private TableInfo getActualTable(
142
133
tableType , table .getTableId ().getDataset (), table .getTableId ().getTable ()));
143
134
}
144
135
}
145
-
146
- private static class DestinationTableBuilder
147
- implements Callable <TableInfo >
148
- {
149
- private final BigQueryClient bigQueryClient ;
150
- private final ReadSessionCreatorConfig config ;
151
- private final String query ;
152
- private final TableId table ;
153
-
154
- DestinationTableBuilder (BigQueryClient bigQueryClient , ReadSessionCreatorConfig config , String query , TableId table )
155
- {
156
- this .bigQueryClient = requireNonNull (bigQueryClient , "bigQueryClient is null" );
157
- this .config = requireNonNull (config , "config is null" );
158
- this .query = requireNonNull (query , "query is null" );
159
- this .table = requireNonNull (table , "table is null" );
160
- }
161
-
162
- @ Override
163
- public TableInfo call ()
164
- {
165
- return createTableFromQuery ();
166
- }
167
-
168
- TableInfo createTableFromQuery ()
169
- {
170
- TableId destinationTable = bigQueryClient .createDestinationTable (table );
171
- log .debug ("destinationTable is %s" , destinationTable );
172
- JobInfo jobInfo = JobInfo .of (
173
- QueryJobConfiguration
174
- .newBuilder (query )
175
- .setDestinationTable (destinationTable )
176
- .build ());
177
- log .debug ("running query %s" , jobInfo );
178
- Job job = waitForJob (bigQueryClient .create (jobInfo ));
179
- log .debug ("job has finished. %s" , job );
180
- if (job .getStatus ().getError () != null ) {
181
- throw convertToBigQueryException (job .getStatus ().getError ());
182
- }
183
- // add expiration time to the table
184
- TableInfo createdTable = bigQueryClient .getTable (destinationTable );
185
- long expirationTime = createdTable .getCreationTime () +
186
- TimeUnit .HOURS .toMillis (config .viewExpirationTimeInHours );
187
- Table updatedTable = bigQueryClient .update (createdTable .toBuilder ()
188
- .setExpirationTime (expirationTime )
189
- .build ());
190
- return updatedTable ;
191
- }
192
-
193
- Job waitForJob (Job job )
194
- {
195
- try {
196
- return job .waitFor ();
197
- }
198
- catch (InterruptedException e ) {
199
- Thread .currentThread ().interrupt ();
200
- throw new BigQueryException (BaseServiceException .UNKNOWN_CODE , format ("Job %s has been interrupted" , job .getJobId ()), e );
201
- }
202
- }
203
- }
204
136
}
0 commit comments