Skip to content

Commit cda6386

Browse files
hashharlosipiuk
authored andcommitted
Implement case insensitive name matching for BigQuery
1 parent 281e7b1 commit cda6386

File tree

13 files changed

+693
-170
lines changed

13 files changed

+693
-170
lines changed

.github/workflows/ci.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,13 @@ jobs:
295295
if [ "${BIGQUERY_CREDENTIALS_KEY}" != "" ]; then
296296
./mvnw test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests -Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}"
297297
fi
298+
- name: Cloud BigQuery Case Insensitive Mapping Tests
299+
env:
300+
BIGQUERY_CREDENTIALS_KEY: ${{ secrets.BIGQUERY_CREDENTIALS_KEY }}
301+
run: |
302+
if [ "${BIGQUERY_CREDENTIALS_KEY}" != "" ]; then
303+
./mvnw test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-case-insensitive-mapping -Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}"
304+
fi
298305
299306
pt:
300307
runs-on: ubuntu-latest

plugin/trino-bigquery/pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@
7979
<artifactId>log-manager</artifactId>
8080
</dependency>
8181

82+
<dependency>
83+
<groupId>io.airlift</groupId>
84+
<artifactId>units</artifactId>
85+
</dependency>
86+
8287
<dependency>
8388
<groupId>com.google.api</groupId>
8489
<artifactId>gax</artifactId>
@@ -287,6 +292,7 @@
287292
<excludes>
288293
<!-- If you are adding entry here also add an entry to cloud-tests profile below -->
289294
<exclude>**/TestBigQueryIntegrationSmokeTest.java</exclude>
295+
<exclude>**/TestBigQueryCaseInsensitiveMapping.java</exclude>
290296
</excludes>
291297
</configuration>
292298
</plugin>
@@ -313,5 +319,26 @@
313319
</plugins>
314320
</build>
315321
</profile>
322+
323+
<!-- Separate profile for TestBigQueryCaseInsensitiveMapping until we can fully isolate it -->
324+
<profile>
325+
<id>cloud-tests-case-insensitive-mapping</id>
326+
<activation>
327+
<activeByDefault>false</activeByDefault>
328+
</activation>
329+
<build>
330+
<plugins>
331+
<plugin>
332+
<groupId>org.apache.maven.plugins</groupId>
333+
<artifactId>maven-surefire-plugin</artifactId>
334+
<configuration>
335+
<includes>
336+
<include>**/TestBigQueryCaseInsensitiveMapping.java</include>
337+
</includes>
338+
</configuration>
339+
</plugin>
340+
</plugins>
341+
</build>
342+
</profile>
316343
</profiles>
317344
</project>

plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java

Lines changed: 186 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,64 +17,152 @@
1717
import com.google.cloud.bigquery.BigQueryException;
1818
import com.google.cloud.bigquery.Dataset;
1919
import com.google.cloud.bigquery.DatasetId;
20+
import com.google.cloud.bigquery.DatasetInfo;
2021
import com.google.cloud.bigquery.Job;
2122
import com.google.cloud.bigquery.JobInfo;
2223
import com.google.cloud.bigquery.QueryJobConfiguration;
24+
import com.google.cloud.bigquery.Schema;
2325
import com.google.cloud.bigquery.Table;
2426
import com.google.cloud.bigquery.TableDefinition;
2527
import com.google.cloud.bigquery.TableId;
2628
import com.google.cloud.bigquery.TableInfo;
2729
import com.google.cloud.bigquery.TableResult;
2830
import com.google.cloud.http.BaseHttpServiceException;
31+
import com.google.common.cache.Cache;
32+
import com.google.common.cache.CacheBuilder;
2933
import com.google.common.collect.ImmutableSet;
30-
import com.google.common.collect.Iterators;
34+
import io.airlift.units.Duration;
35+
import io.trino.spi.TrinoException;
36+
import io.trino.spi.connector.TableNotFoundException;
37+
import org.checkerframework.checker.nullness.qual.Nullable;
3138

32-
import java.util.Iterator;
39+
import java.util.Collections;
40+
import java.util.HashMap;
3341
import java.util.List;
42+
import java.util.Map;
3443
import java.util.Optional;
3544
import java.util.Set;
36-
import java.util.concurrent.ConcurrentHashMap;
37-
import java.util.concurrent.ConcurrentMap;
38-
import java.util.stream.StreamSupport;
3945

46+
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
47+
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
48+
import static com.google.common.base.Verify.verify;
4049
import static com.google.common.collect.ImmutableList.toImmutableList;
50+
import static com.google.common.collect.Iterables.getOnlyElement;
51+
import static com.google.common.collect.Streams.stream;
52+
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
4153
import static java.lang.String.format;
4254
import static java.util.Locale.ENGLISH;
55+
import static java.util.Objects.requireNonNull;
4356
import static java.util.UUID.randomUUID;
57+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4458
import static java.util.stream.Collectors.joining;
4559

46-
// holds caches and mappings
47-
// Trino converts the dataset and table names to lower case, while BigQuery is case sensitive
48-
// the mappings here keep the mappings
4960
class BigQueryClient
5061
{
5162
private final BigQuery bigQuery;
5263
private final Optional<String> viewMaterializationProject;
5364
private final Optional<String> viewMaterializationDataset;
54-
private final ConcurrentMap<TableId, TableId> tableIds = new ConcurrentHashMap<>();
55-
private final ConcurrentMap<DatasetId, DatasetId> datasetIds = new ConcurrentHashMap<>();
65+
private final boolean caseInsensitiveNameMatching;
66+
private final Cache<String, Optional<RemoteDatabaseObject>> remoteDatasets;
67+
private final Cache<TableId, Optional<RemoteDatabaseObject>> remoteTables;
5668

5769
BigQueryClient(BigQuery bigQuery, BigQueryConfig config)
5870
{
5971
this.bigQuery = bigQuery;
6072
this.viewMaterializationProject = config.getViewMaterializationProject();
6173
this.viewMaterializationDataset = config.getViewMaterializationDataset();
74+
Duration caseInsensitiveNameMatchingCacheTtl = requireNonNull(config.getCaseInsensitiveNameMatchingCacheTtl(), "caseInsensitiveNameMatchingCacheTtl is null");
75+
76+
this.caseInsensitiveNameMatching = config.isCaseInsensitiveNameMatching();
77+
CacheBuilder<Object, Object> remoteNamesCacheBuilder = CacheBuilder.newBuilder()
78+
.expireAfterWrite(caseInsensitiveNameMatchingCacheTtl.toMillis(), MILLISECONDS);
79+
this.remoteDatasets = remoteNamesCacheBuilder.build();
80+
this.remoteTables = remoteNamesCacheBuilder.build();
81+
}
82+
83+
Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String datasetName)
84+
{
85+
requireNonNull(projectId, "projectId is null");
86+
requireNonNull(datasetName, "datasetName is null");
87+
verify(datasetName.codePoints().noneMatch(Character::isUpperCase), "Expected schema name from internal metadata to be lowercase: %s", datasetName);
88+
if (!caseInsensitiveNameMatching) {
89+
return Optional.of(RemoteDatabaseObject.of(datasetName));
90+
}
91+
92+
@Nullable Optional<RemoteDatabaseObject> remoteDataset = remoteDatasets.getIfPresent(datasetName);
93+
if (remoteDataset != null) {
94+
return remoteDataset;
95+
}
96+
97+
// cache miss, reload the cache
98+
Map<String, Optional<RemoteDatabaseObject>> mapping = new HashMap<>();
99+
for (Dataset dataset : listDatasets(projectId)) {
100+
mapping.merge(
101+
dataset.getDatasetId().getDataset().toLowerCase(ENGLISH),
102+
Optional.of(RemoteDatabaseObject.of(dataset.getDatasetId().getDataset())),
103+
(currentValue, collision) -> currentValue.map(current -> current.registerCollision(collision.get().getOnlyRemoteName())));
104+
}
105+
106+
// explicitly cache the information if the requested dataset doesn't exist
107+
if (!mapping.containsKey(datasetName)) {
108+
mapping.put(datasetName, Optional.empty());
109+
}
110+
111+
verify(mapping.containsKey(datasetName));
112+
return mapping.get(datasetName);
62113
}
63114

64-
TableInfo getTable(TableId tableId)
115+
Optional<RemoteDatabaseObject> toRemoteTable(String projectId, String remoteDatasetName, String tableName)
65116
{
66-
TableId bigQueryTableId = tableIds.get(tableId);
67-
Table table = bigQuery.getTable(bigQueryTableId != null ? bigQueryTableId : tableId);
68-
if (table != null) {
69-
tableIds.putIfAbsent(tableId, table.getTableId());
70-
datasetIds.putIfAbsent(toDatasetId(tableId), toDatasetId(table.getTableId()));
117+
requireNonNull(projectId, "projectId is null");
118+
requireNonNull(remoteDatasetName, "remoteDatasetName is null");
119+
requireNonNull(tableName, "tableName is null");
120+
verify(tableName.codePoints().noneMatch(Character::isUpperCase), "Expected table name from internal metadata to be lowercase: %s", tableName);
121+
if (!caseInsensitiveNameMatching) {
122+
return Optional.of(RemoteDatabaseObject.of(tableName));
123+
}
124+
125+
TableId cacheKey = TableId.of(projectId, remoteDatasetName, tableName);
126+
@Nullable Optional<RemoteDatabaseObject> remoteTable = remoteTables.getIfPresent(cacheKey);
127+
if (remoteTable != null) {
128+
return remoteTable;
71129
}
72-
return table;
130+
131+
// cache miss, reload the cache
132+
Map<TableId, Optional<RemoteDatabaseObject>> mapping = new HashMap<>();
133+
for (Table table : listTables(DatasetId.of(projectId, remoteDatasetName), TABLE, VIEW)) {
134+
mapping.merge(
135+
tableIdToLowerCase(table.getTableId()),
136+
Optional.of(RemoteDatabaseObject.of(table.getTableId().getTable())),
137+
(currentValue, collision) -> currentValue.map(current -> current.registerCollision(collision.get().getOnlyRemoteName())));
138+
}
139+
140+
// explicitly cache the information if the requested table doesn't exist
141+
if (!mapping.containsKey(cacheKey)) {
142+
mapping.put(cacheKey, Optional.empty());
143+
}
144+
145+
verify(mapping.containsKey(cacheKey));
146+
return mapping.get(cacheKey);
147+
}
148+
149+
private static TableId tableIdToLowerCase(TableId tableId)
150+
{
151+
return TableId.of(
152+
tableId.getProject(),
153+
tableId.getDataset(),
154+
tableId.getTable().toLowerCase(ENGLISH));
155+
}
156+
157+
DatasetInfo getDataset(DatasetId datasetId)
158+
{
159+
return bigQuery.getDataset(datasetId);
73160
}
74161

75-
DatasetId toDatasetId(TableId tableId)
162+
TableInfo getTable(TableId remoteTableId)
76163
{
77-
return DatasetId.of(tableId.getProject(), tableId.getDataset());
164+
// TODO: Return Optional and make callers handle missing value
165+
return bigQuery.getTable(remoteTableId);
78166
}
79167

80168
String getProjectId()
@@ -84,43 +172,32 @@ String getProjectId()
84172

85173
Iterable<Dataset> listDatasets(String projectId)
86174
{
87-
Iterator<Dataset> datasets = bigQuery.listDatasets(projectId).iterateAll().iterator();
88-
return () -> Iterators.transform(datasets, this::addDataSetMappingIfNeeded);
175+
return bigQuery.listDatasets(projectId).iterateAll();
89176
}
90177

91-
Iterable<Table> listTables(DatasetId datasetId, TableDefinition.Type... types)
178+
Iterable<Table> listTables(DatasetId remoteDatasetId, TableDefinition.Type... types)
92179
{
93180
Set<TableDefinition.Type> allowedTypes = ImmutableSet.copyOf(types);
94-
DatasetId bigQueryDatasetId = datasetIds.getOrDefault(datasetId, datasetId);
95-
Iterable<Table> allTables = bigQuery.listTables(bigQueryDatasetId).iterateAll();
96-
return StreamSupport.stream(allTables.spliterator(), false)
181+
Iterable<Table> allTables = bigQuery.listTables(remoteDatasetId).iterateAll();
182+
return stream(allTables)
97183
.filter(table -> allowedTypes.contains(table.getDefinition().getType()))
98184
.collect(toImmutableList());
99185
}
100186

101-
private Dataset addDataSetMappingIfNeeded(Dataset dataset)
102-
{
103-
DatasetId bigQueryDatasetId = dataset.getDatasetId();
104-
DatasetId trinoDatasetId = DatasetId.of(bigQueryDatasetId.getProject(), bigQueryDatasetId.getDataset().toLowerCase(ENGLISH));
105-
datasetIds.putIfAbsent(trinoDatasetId, bigQueryDatasetId);
106-
return dataset;
107-
}
108-
109187
TableId createDestinationTable(TableId tableId)
110188
{
111189
String project = viewMaterializationProject.orElse(tableId.getProject());
112190
String dataset = viewMaterializationDataset.orElse(tableId.getDataset());
113-
DatasetId datasetId = mapIfNeeded(project, dataset);
191+
192+
String remoteDatasetName = toRemoteDataset(project, dataset)
193+
.map(RemoteDatabaseObject::getOnlyRemoteName)
194+
.orElse(dataset);
195+
196+
DatasetId datasetId = DatasetId.of(project, remoteDatasetName);
114197
String name = format("_pbc_%s", randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
115198
return TableId.of(datasetId.getProject(), datasetId.getDataset(), name);
116199
}
117200

118-
private DatasetId mapIfNeeded(String project, String dataset)
119-
{
120-
DatasetId datasetId = DatasetId.of(project, dataset);
121-
return datasetIds.getOrDefault(datasetId, datasetId);
122-
}
123-
124201
Table update(TableInfo table)
125202
{
126203
return bigQuery.update(table);
@@ -159,7 +236,75 @@ String selectSql(TableId table, String formattedColumns)
159236

160237
private String fullTableName(TableId tableId)
161238
{
162-
tableId = tableIds.getOrDefault(tableId, tableId);
239+
String remoteSchemaName = toRemoteDataset(tableId.getProject(), tableId.getDataset())
240+
.map(RemoteDatabaseObject::getOnlyRemoteName)
241+
.orElse(tableId.getDataset());
242+
String remoteTableName = toRemoteTable(tableId.getProject(), remoteSchemaName, tableId.getTable())
243+
.map(RemoteDatabaseObject::getOnlyRemoteName)
244+
.orElse(tableId.getTable());
245+
tableId = TableId.of(tableId.getProject(), remoteSchemaName, remoteTableName);
163246
return format("%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
164247
}
248+
249+
List<BigQueryColumnHandle> getColumns(BigQueryTableHandle tableHandle)
250+
{
251+
TableInfo tableInfo = getTable(tableHandle.getRemoteTableName().toTableId());
252+
if (tableInfo == null) {
253+
throw new TableNotFoundException(
254+
tableHandle.getSchemaTableName(),
255+
format("Table '%s' not found", tableHandle.getSchemaTableName()));
256+
}
257+
@Nullable Schema schema = tableInfo.getDefinition().getSchema();
258+
if (schema == null) {
259+
throw new TableNotFoundException(
260+
tableHandle.getSchemaTableName(),
261+
format("Table '%s' has no schema", tableHandle.getSchemaTableName()));
262+
}
263+
return schema.getFields()
264+
.stream()
265+
.map(Conversions::toColumnHandle)
266+
.collect(toImmutableList());
267+
}
268+
269+
static final class RemoteDatabaseObject
270+
{
271+
private final Set<String> remoteNames;
272+
273+
private RemoteDatabaseObject(Set<String> remoteNames)
274+
{
275+
this.remoteNames = ImmutableSet.copyOf(remoteNames);
276+
}
277+
278+
public static RemoteDatabaseObject of(String remoteName)
279+
{
280+
return new RemoteDatabaseObject(ImmutableSet.of(remoteName));
281+
}
282+
283+
public RemoteDatabaseObject registerCollision(String ambiguousName)
284+
{
285+
return new RemoteDatabaseObject(ImmutableSet.<String>builderWithExpectedSize(remoteNames.size() + 1)
286+
.addAll(remoteNames)
287+
.add(ambiguousName)
288+
.build());
289+
}
290+
291+
public String getAnyRemoteName()
292+
{
293+
return Collections.min(remoteNames);
294+
}
295+
296+
public String getOnlyRemoteName()
297+
{
298+
if (!isAmbiguous()) {
299+
return getOnlyElement(remoteNames);
300+
}
301+
302+
throw new TrinoException(BIGQUERY_AMBIGUOUS_OBJECT_NAME, "Found ambiguous names in BigQuery when looking up '" + getAnyRemoteName().toLowerCase(ENGLISH) + "': " + remoteNames);
303+
}
304+
305+
public boolean isAmbiguous()
306+
{
307+
return remoteNames.size() > 1;
308+
}
309+
}
165310
}

0 commit comments

Comments
 (0)