Skip to content

Commit

Permalink
Add view_definition system table for BigQuery view
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 7, 2021
1 parent 2b5f7ff commit 66904a6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ BigQuery Trino Notes
``TIMESTAMP`` ``TIMESTAMP_WITH_TIME_ZONE`` Time zone is UTC
============= ============================ =============================================================================================================

System tables
-------------

For each Trino table which maps to BigQuery view there exists a system table which exposes BigQuery view definition.
Given a BigQuery view ``customer_view`` you can send query
``SELECT * customer_view$view_definition`` to see the SQL which defines view in BigQuery.

FAQ
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.ViewDefinition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -33,23 +34,30 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.NotFoundException;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
Expand All @@ -66,6 +74,7 @@ public class BigQueryMetadata
static final int NUMERIC_DATA_TYPE_PRECISION = 38;
static final int NUMERIC_DATA_TYPE_SCALE = 9;
static final String INFORMATION_SCHEMA = "information_schema";
private static final String VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX = "$view_definition";

private final BigQueryClient bigQueryClient;
private final String projectId;
Expand Down Expand Up @@ -160,6 +169,32 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
return new ConnectorTableMetadata(schemaTableName, columns);
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
if (isViewDefinitionSystemTable(tableName)) {
return getViewDefinitionSystemTable(tableName, getViewDefinitionSourceTableName(tableName));
}
return Optional.empty();
}

private Optional<SystemTable> getViewDefinitionSystemTable(SchemaTableName viewDefinitionTableName, SchemaTableName sourceTableName)
{
TableInfo tableInfo = getBigQueryTable(sourceTableName);
if (tableInfo == null || !(tableInfo.getDefinition() instanceof ViewDefinition)) {
throw new TableNotFoundException(viewDefinitionTableName);
}

List<ColumnMetadata> columns = ImmutableList.of(new ColumnMetadata("query", VarcharType.VARCHAR));
List<Type> types = columns.stream()
.map(ColumnMetadata::getType)
.collect(toImmutableList());
Optional<String> query = Optional.ofNullable(((ViewDefinition) tableInfo.getDefinition()).getQuery());
Iterable<List<Object>> propertyValues = ImmutableList.of(ImmutableList.of(query.orElse("NULL")));

return Optional.of(createSystemTable(new ConnectorTableMetadata(sourceTableName, columns), constraint -> new InMemoryRecordSet(types, propertyValues).cursor()));
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -311,4 +346,41 @@ private static boolean containSameElements(Iterable<? extends ColumnHandle> firs
{
return ImmutableSet.copyOf(first).equals(ImmutableSet.copyOf(second));
}

private static boolean isViewDefinitionSystemTable(SchemaTableName table)
{
return table.getTableName().endsWith(VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX) &&
(table.getTableName().length() > VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX.length());
}

private static SchemaTableName getViewDefinitionSourceTableName(SchemaTableName table)
{
return new SchemaTableName(
table.getSchemaName(),
table.getTableName().substring(0, table.getTableName().length() - VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX.length()));
}

private static SystemTable createSystemTable(ConnectorTableMetadata metadata, Function<TupleDomain<Integer>, RecordCursor> cursor)
{
return new SystemTable()
{
@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return metadata;
}

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return cursor.apply(constraint);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,32 @@ public void testRepeatCountAggregationView()
executeBigQuerySql(client, "DROP VIEW " + viewName);
}

@Test
public void testViewDefinitionSystemTable()
{
BigQuery client = createBigQueryClient();

String schemaName = "test";
String tableName = "views_system_table_base_" + randomTableSuffix();
String viewName = "views_system_table_view_" + randomTableSuffix();

executeBigQuerySql(client, format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
executeBigQuerySql(client, format("DROP VIEW IF EXISTS %s.%s", schemaName, viewName));
executeBigQuerySql(client, format("CREATE TABLE %s.%s (a INT64, b INT64, c INT64)", schemaName, tableName));
executeBigQuerySql(client, format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s", schemaName, viewName, schemaName, tableName));

assertEquals(
computeScalar(format("SELECT * FROM %s.\"%s$view_definition\"", schemaName, viewName)),
format("SELECT * FROM %s.%s", schemaName, tableName));

assertQueryFails(
format("SELECT * FROM %s.\"%s$view_definition\"", schemaName, tableName),
format("Table '%s.%s\\$view_definition' not found", schemaName, tableName));

executeBigQuerySql(client, format("DROP TABLE %s.%s", schemaName, tableName));
executeBigQuerySql(client, format("DROP VIEW %s.%s", schemaName, viewName));
}

private static void executeBigQuerySql(BigQuery bigquery, String query)
{
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
Expand Down

0 comments on commit 66904a6

Please sign in to comment.