17
17
import com .google .cloud .bigquery .BigQueryException ;
18
18
import com .google .cloud .bigquery .Dataset ;
19
19
import com .google .cloud .bigquery .DatasetId ;
20
+ import com .google .cloud .bigquery .DatasetInfo ;
20
21
import com .google .cloud .bigquery .Job ;
21
22
import com .google .cloud .bigquery .JobInfo ;
22
23
import com .google .cloud .bigquery .QueryJobConfiguration ;
24
+ import com .google .cloud .bigquery .Schema ;
23
25
import com .google .cloud .bigquery .Table ;
24
26
import com .google .cloud .bigquery .TableDefinition ;
25
27
import com .google .cloud .bigquery .TableId ;
26
28
import com .google .cloud .bigquery .TableInfo ;
27
29
import com .google .cloud .bigquery .TableResult ;
28
30
import com .google .cloud .http .BaseHttpServiceException ;
31
+ import com .google .common .cache .Cache ;
32
+ import com .google .common .cache .CacheBuilder ;
29
33
import com .google .common .collect .ImmutableSet ;
30
- import com .google .common .collect .Iterators ;
34
+ import io .airlift .units .Duration ;
35
+ import io .trino .spi .TrinoException ;
36
+ import io .trino .spi .connector .TableNotFoundException ;
37
+ import org .checkerframework .checker .nullness .qual .Nullable ;
31
38
32
- import java .util .Iterator ;
39
+ import java .util .Collections ;
40
+ import java .util .HashMap ;
33
41
import java .util .List ;
42
+ import java .util .Map ;
34
43
import java .util .Optional ;
35
44
import java .util .Set ;
36
- import java .util .concurrent .ConcurrentHashMap ;
37
- import java .util .concurrent .ConcurrentMap ;
38
- import java .util .stream .StreamSupport ;
39
45
46
+ import static com .google .cloud .bigquery .TableDefinition .Type .TABLE ;
47
+ import static com .google .cloud .bigquery .TableDefinition .Type .VIEW ;
48
+ import static com .google .common .base .Verify .verify ;
40
49
import static com .google .common .collect .ImmutableList .toImmutableList ;
50
+ import static com .google .common .collect .Iterables .getOnlyElement ;
51
+ import static com .google .common .collect .Streams .stream ;
52
+ import static io .trino .plugin .bigquery .BigQueryErrorCode .BIGQUERY_AMBIGUOUS_OBJECT_NAME ;
41
53
import static java .lang .String .format ;
42
54
import static java .util .Locale .ENGLISH ;
55
+ import static java .util .Objects .requireNonNull ;
43
56
import static java .util .UUID .randomUUID ;
57
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
44
58
import static java .util .stream .Collectors .joining ;
45
59
46
- // holds caches and mappings
47
- // Trino converts the dataset and table names to lower case, while BigQuery is case sensitive
48
- // the mappings here keep the mappings
49
60
class BigQueryClient
50
61
{
51
62
private final BigQuery bigQuery ;
52
63
private final Optional <String > viewMaterializationProject ;
53
64
private final Optional <String > viewMaterializationDataset ;
54
- private final ConcurrentMap <TableId , TableId > tableIds = new ConcurrentHashMap <>();
55
- private final ConcurrentMap <DatasetId , DatasetId > datasetIds = new ConcurrentHashMap <>();
65
+ private final boolean caseInsensitiveNameMatching ;
66
+ private final Cache <String , Optional <RemoteDatabaseObject >> remoteDatasets ;
67
+ private final Cache <TableId , Optional <RemoteDatabaseObject >> remoteTables ;
56
68
57
69
BigQueryClient (BigQuery bigQuery , BigQueryConfig config )
58
70
{
59
71
this .bigQuery = bigQuery ;
60
72
this .viewMaterializationProject = config .getViewMaterializationProject ();
61
73
this .viewMaterializationDataset = config .getViewMaterializationDataset ();
74
+ Duration caseInsensitiveNameMatchingCacheTtl = requireNonNull (config .getCaseInsensitiveNameMatchingCacheTtl (), "caseInsensitiveNameMatchingCacheTtl is null" );
75
+
76
+ this .caseInsensitiveNameMatching = config .isCaseInsensitiveNameMatching ();
77
+ CacheBuilder <Object , Object > remoteNamesCacheBuilder = CacheBuilder .newBuilder ()
78
+ .expireAfterWrite (caseInsensitiveNameMatchingCacheTtl .toMillis (), MILLISECONDS );
79
+ this .remoteDatasets = remoteNamesCacheBuilder .build ();
80
+ this .remoteTables = remoteNamesCacheBuilder .build ();
81
+ }
82
+
83
+ Optional <RemoteDatabaseObject > toRemoteDataset (String projectId , String datasetName )
84
+ {
85
+ requireNonNull (projectId , "projectId is null" );
86
+ requireNonNull (datasetName , "datasetName is null" );
87
+ verify (datasetName .codePoints ().noneMatch (Character ::isUpperCase ), "Expected schema name from internal metadata to be lowercase: %s" , datasetName );
88
+ if (!caseInsensitiveNameMatching ) {
89
+ return Optional .of (RemoteDatabaseObject .of (datasetName ));
90
+ }
91
+
92
+ @ Nullable Optional <RemoteDatabaseObject > remoteDataset = remoteDatasets .getIfPresent (datasetName );
93
+ if (remoteDataset != null ) {
94
+ return remoteDataset ;
95
+ }
96
+
97
+ // cache miss, reload the cache
98
+ Map <String , Optional <RemoteDatabaseObject >> mapping = new HashMap <>();
99
+ for (Dataset dataset : listDatasets (projectId )) {
100
+ mapping .merge (
101
+ dataset .getDatasetId ().getDataset ().toLowerCase (ENGLISH ),
102
+ Optional .of (RemoteDatabaseObject .of (dataset .getDatasetId ().getDataset ())),
103
+ (currentValue , collision ) -> currentValue .map (current -> current .registerCollision (collision .get ().getOnlyRemoteName ())));
104
+ }
105
+
106
+ // explicitly cache the information if the requested dataset doesn't exist
107
+ if (!mapping .containsKey (datasetName )) {
108
+ mapping .put (datasetName , Optional .empty ());
109
+ }
110
+
111
+ verify (mapping .containsKey (datasetName ));
112
+ return mapping .get (datasetName );
62
113
}
63
114
64
- TableInfo getTable ( TableId tableId )
115
+ Optional < RemoteDatabaseObject > toRemoteTable ( String projectId , String remoteDatasetName , String tableName )
65
116
{
66
- TableId bigQueryTableId = tableIds .get (tableId );
67
- Table table = bigQuery .getTable (bigQueryTableId != null ? bigQueryTableId : tableId );
68
- if (table != null ) {
69
- tableIds .putIfAbsent (tableId , table .getTableId ());
70
- datasetIds .putIfAbsent (toDatasetId (tableId ), toDatasetId (table .getTableId ()));
117
+ requireNonNull (projectId , "projectId is null" );
118
+ requireNonNull (remoteDatasetName , "remoteDatasetName is null" );
119
+ requireNonNull (tableName , "tableName is null" );
120
+ verify (tableName .codePoints ().noneMatch (Character ::isUpperCase ), "Expected table name from internal metadata to be lowercase: %s" , tableName );
121
+ if (!caseInsensitiveNameMatching ) {
122
+ return Optional .of (RemoteDatabaseObject .of (tableName ));
123
+ }
124
+
125
+ TableId cacheKey = TableId .of (projectId , remoteDatasetName , tableName );
126
+ @ Nullable Optional <RemoteDatabaseObject > remoteTable = remoteTables .getIfPresent (cacheKey );
127
+ if (remoteTable != null ) {
128
+ return remoteTable ;
71
129
}
72
- return table ;
130
+
131
+ // cache miss, reload the cache
132
+ Map <TableId , Optional <RemoteDatabaseObject >> mapping = new HashMap <>();
133
+ for (Table table : listTables (DatasetId .of (projectId , remoteDatasetName ), TABLE , VIEW )) {
134
+ mapping .merge (
135
+ tableIdToLowerCase (table .getTableId ()),
136
+ Optional .of (RemoteDatabaseObject .of (table .getTableId ().getTable ())),
137
+ (currentValue , collision ) -> currentValue .map (current -> current .registerCollision (collision .get ().getOnlyRemoteName ())));
138
+ }
139
+
140
+ // explicitly cache the information if the requested table doesn't exist
141
+ if (!mapping .containsKey (cacheKey )) {
142
+ mapping .put (cacheKey , Optional .empty ());
143
+ }
144
+
145
+ verify (mapping .containsKey (cacheKey ));
146
+ return mapping .get (cacheKey );
147
+ }
148
+
149
+ private static TableId tableIdToLowerCase (TableId tableId )
150
+ {
151
+ return TableId .of (
152
+ tableId .getProject (),
153
+ tableId .getDataset (),
154
+ tableId .getTable ().toLowerCase (ENGLISH ));
155
+ }
156
+
157
+ DatasetInfo getDataset (DatasetId datasetId )
158
+ {
159
+ return bigQuery .getDataset (datasetId );
73
160
}
74
161
75
- DatasetId toDatasetId (TableId tableId )
162
+ TableInfo getTable (TableId remoteTableId )
76
163
{
77
- return DatasetId .of (tableId .getProject (), tableId .getDataset ());
164
+ // TODO: Return Optional and make callers handle missing value
165
+ return bigQuery .getTable (remoteTableId );
78
166
}
79
167
80
168
String getProjectId ()
@@ -84,43 +172,32 @@ String getProjectId()
84
172
85
173
Iterable <Dataset > listDatasets (String projectId )
86
174
{
87
- Iterator <Dataset > datasets = bigQuery .listDatasets (projectId ).iterateAll ().iterator ();
88
- return () -> Iterators .transform (datasets , this ::addDataSetMappingIfNeeded );
175
+ return bigQuery .listDatasets (projectId ).iterateAll ();
89
176
}
90
177
91
- Iterable <Table > listTables (DatasetId datasetId , TableDefinition .Type ... types )
178
+ Iterable <Table > listTables (DatasetId remoteDatasetId , TableDefinition .Type ... types )
92
179
{
93
180
Set <TableDefinition .Type > allowedTypes = ImmutableSet .copyOf (types );
94
- DatasetId bigQueryDatasetId = datasetIds .getOrDefault (datasetId , datasetId );
95
- Iterable <Table > allTables = bigQuery .listTables (bigQueryDatasetId ).iterateAll ();
96
- return StreamSupport .stream (allTables .spliterator (), false )
181
+ Iterable <Table > allTables = bigQuery .listTables (remoteDatasetId ).iterateAll ();
182
+ return stream (allTables )
97
183
.filter (table -> allowedTypes .contains (table .getDefinition ().getType ()))
98
184
.collect (toImmutableList ());
99
185
}
100
186
101
- private Dataset addDataSetMappingIfNeeded (Dataset dataset )
102
- {
103
- DatasetId bigQueryDatasetId = dataset .getDatasetId ();
104
- DatasetId trinoDatasetId = DatasetId .of (bigQueryDatasetId .getProject (), bigQueryDatasetId .getDataset ().toLowerCase (ENGLISH ));
105
- datasetIds .putIfAbsent (trinoDatasetId , bigQueryDatasetId );
106
- return dataset ;
107
- }
108
-
109
187
TableId createDestinationTable (TableId tableId )
110
188
{
111
189
String project = viewMaterializationProject .orElse (tableId .getProject ());
112
190
String dataset = viewMaterializationDataset .orElse (tableId .getDataset ());
113
- DatasetId datasetId = mapIfNeeded (project , dataset );
191
+
192
+ String remoteDatasetName = toRemoteDataset (project , dataset )
193
+ .map (RemoteDatabaseObject ::getOnlyRemoteName )
194
+ .orElse (dataset );
195
+
196
+ DatasetId datasetId = DatasetId .of (project , remoteDatasetName );
114
197
String name = format ("_pbc_%s" , randomUUID ().toString ().toLowerCase (ENGLISH ).replace ("-" , "" ));
115
198
return TableId .of (datasetId .getProject (), datasetId .getDataset (), name );
116
199
}
117
200
118
- private DatasetId mapIfNeeded (String project , String dataset )
119
- {
120
- DatasetId datasetId = DatasetId .of (project , dataset );
121
- return datasetIds .getOrDefault (datasetId , datasetId );
122
- }
123
-
124
201
Table update (TableInfo table )
125
202
{
126
203
return bigQuery .update (table );
@@ -159,7 +236,75 @@ String selectSql(TableId table, String formattedColumns)
159
236
160
237
private String fullTableName (TableId tableId )
161
238
{
162
- tableId = tableIds .getOrDefault (tableId , tableId );
239
+ String remoteSchemaName = toRemoteDataset (tableId .getProject (), tableId .getDataset ())
240
+ .map (RemoteDatabaseObject ::getOnlyRemoteName )
241
+ .orElse (tableId .getDataset ());
242
+ String remoteTableName = toRemoteTable (tableId .getProject (), remoteSchemaName , tableId .getTable ())
243
+ .map (RemoteDatabaseObject ::getOnlyRemoteName )
244
+ .orElse (tableId .getTable ());
245
+ tableId = TableId .of (tableId .getProject (), remoteSchemaName , remoteTableName );
163
246
return format ("%s.%s.%s" , tableId .getProject (), tableId .getDataset (), tableId .getTable ());
164
247
}
248
+
249
+ List <BigQueryColumnHandle > getColumns (BigQueryTableHandle tableHandle )
250
+ {
251
+ TableInfo tableInfo = getTable (tableHandle .getRemoteTableName ().toTableId ());
252
+ if (tableInfo == null ) {
253
+ throw new TableNotFoundException (
254
+ tableHandle .getSchemaTableName (),
255
+ format ("Table '%s' not found" , tableHandle .getSchemaTableName ()));
256
+ }
257
+ @ Nullable Schema schema = tableInfo .getDefinition ().getSchema ();
258
+ if (schema == null ) {
259
+ throw new TableNotFoundException (
260
+ tableHandle .getSchemaTableName (),
261
+ format ("Table '%s' has no schema" , tableHandle .getSchemaTableName ()));
262
+ }
263
+ return schema .getFields ()
264
+ .stream ()
265
+ .map (Conversions ::toColumnHandle )
266
+ .collect (toImmutableList ());
267
+ }
268
+
269
+ static final class RemoteDatabaseObject
270
+ {
271
+ private final Set <String > remoteNames ;
272
+
273
+ private RemoteDatabaseObject (Set <String > remoteNames )
274
+ {
275
+ this .remoteNames = ImmutableSet .copyOf (remoteNames );
276
+ }
277
+
278
+ public static RemoteDatabaseObject of (String remoteName )
279
+ {
280
+ return new RemoteDatabaseObject (ImmutableSet .of (remoteName ));
281
+ }
282
+
283
+ public RemoteDatabaseObject registerCollision (String ambiguousName )
284
+ {
285
+ return new RemoteDatabaseObject (ImmutableSet .<String >builderWithExpectedSize (remoteNames .size () + 1 )
286
+ .addAll (remoteNames )
287
+ .add (ambiguousName )
288
+ .build ());
289
+ }
290
+
291
+ public String getAnyRemoteName ()
292
+ {
293
+ return Collections .min (remoteNames );
294
+ }
295
+
296
+ public String getOnlyRemoteName ()
297
+ {
298
+ if (!isAmbiguous ()) {
299
+ return getOnlyElement (remoteNames );
300
+ }
301
+
302
+ throw new TrinoException (BIGQUERY_AMBIGUOUS_OBJECT_NAME , "Found ambiguous names in BigQuery when looking up '" + getAnyRemoteName ().toLowerCase (ENGLISH ) + "': " + remoteNames );
303
+ }
304
+
305
+ public boolean isAmbiguous ()
306
+ {
307
+ return remoteNames .size () > 1 ;
308
+ }
309
+ }
165
310
}
0 commit comments