Skip to content

Commit

Permalink
Add BigQuerySqlExecutor and refactor tests to use it
Browse files Browse the repository at this point in the history
  • Loading branch information
hashhar authored and losipiuk committed Feb 15, 2021
1 parent cc90d10 commit 281e7b1
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.Session;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.sql.SqlExecutor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -33,7 +35,7 @@
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.testing.TestingSession.testSessionBuilder;

public class BigQueryQueryRunner
public final class BigQueryQueryRunner
{
private static final String TPCH_SCHEMA = "tpch";

Expand Down Expand Up @@ -65,20 +67,6 @@ public static DistributedQueryRunner createQueryRunner(Map<String, String> extra
}
}

public static BigQuery createBigQueryClient()
{
try {
InputStream jsonKey = new ByteArrayInputStream(Base64.getDecoder().decode(System.getProperty("bigquery.credentials-key")));
return BigQueryOptions.newBuilder()
.setCredentials(ServiceAccountCredentials.fromStream(jsonKey))
.build()
.getService();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static Session createSession()
{
return testSessionBuilder()
Expand All @@ -87,6 +75,43 @@ public static Session createSession()
.build();
}

public static class BigQuerySqlExecutor
implements SqlExecutor
{
private final BigQuery bigQuery;

public BigQuerySqlExecutor()
{
this.bigQuery = createBigQueryClient();
}

@Override
public void execute(String sql)
{
try {
bigQuery.query(QueryJobConfiguration.of(sql));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

private static BigQuery createBigQueryClient()
{
try {
InputStream jsonKey = new ByteArrayInputStream(Base64.getDecoder().decode(System.getProperty("bigquery.credentials-key")));
return BigQueryOptions.newBuilder()
.setCredentials(ServiceAccountCredentials.fromStream(jsonKey))
.build()
.getService();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

public static void main(String[] args)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,13 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.common.collect.ImmutableMap;
import io.trino.testing.AbstractTestIntegrationSmokeTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import static io.trino.plugin.bigquery.BigQueryQueryRunner.createBigQueryClient;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
Expand All @@ -36,10 +31,13 @@
public class TestBigQueryIntegrationSmokeTest
extends AbstractTestIntegrationSmokeTest
{
private BigQuerySqlExecutor bigQuerySqlExecutor;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
return BigQueryQueryRunner.createQueryRunner(ImmutableMap.of());
}

Expand All @@ -64,11 +62,9 @@ public void testDescribeTable()
@Test(enabled = false)
public void testSelectFromHourlyPartitionedTable()
{
BigQuery client = createBigQueryClient();

executeBigQuerySql(client, "DROP TABLE IF EXISTS test.hourly_partitioned");
executeBigQuerySql(client, "CREATE TABLE test.hourly_partitioned (value INT64, ts TIMESTAMP) PARTITION BY TIMESTAMP_TRUNC(ts, HOUR)");
executeBigQuerySql(client, "INSERT INTO test.hourly_partitioned (value, ts) VALUES (1000, '2018-01-01 10:00:00')");
onBigQuery("DROP TABLE IF EXISTS test.hourly_partitioned");
onBigQuery("CREATE TABLE test.hourly_partitioned (value INT64, ts TIMESTAMP) PARTITION BY TIMESTAMP_TRUNC(ts, HOUR)");
onBigQuery("INSERT INTO test.hourly_partitioned (value, ts) VALUES (1000, '2018-01-01 10:00:00')");

MaterializedResult actualValues = computeActual("SELECT COUNT(1) FROM test.hourly_partitioned");

Expand All @@ -78,11 +74,9 @@ public void testSelectFromHourlyPartitionedTable()
@Test(enabled = false)
public void testSelectFromYearlyPartitionedTable()
{
BigQuery client = createBigQueryClient();

executeBigQuerySql(client, "DROP TABLE IF EXISTS test.yearly_partitioned");
executeBigQuerySql(client, "CREATE TABLE test.yearly_partitioned (value INT64, ts TIMESTAMP) PARTITION BY TIMESTAMP_TRUNC(ts, YEAR)");
executeBigQuerySql(client, "INSERT INTO test.yearly_partitioned (value, ts) VALUES (1000, '2018-01-01 10:00:00')");
onBigQuery("DROP TABLE IF EXISTS test.yearly_partitioned");
onBigQuery("CREATE TABLE test.yearly_partitioned (value INT64, ts TIMESTAMP) PARTITION BY TIMESTAMP_TRUNC(ts, YEAR)");
onBigQuery("INSERT INTO test.yearly_partitioned (value, ts) VALUES (1000, '2018-01-01 10:00:00')");

MaterializedResult actualValues = computeActual("SELECT COUNT(1) FROM test.yearly_partitioned");

Expand All @@ -92,13 +86,11 @@ public void testSelectFromYearlyPartitionedTable()
@Test(description = "regression test for https://github.com/trinodb/trino/issues/5618")
public void testPredicatePushdownPrunnedColumns()
{
BigQuery client = createBigQueryClient();

String tableName = "test.predicate_pushdown_prunned_columns";

executeBigQuerySql(client, "DROP TABLE IF EXISTS " + tableName);
executeBigQuerySql(client, "CREATE TABLE " + tableName + " (a INT64, b INT64, c INT64)");
executeBigQuerySql(client, "INSERT INTO " + tableName + " VALUES (1,2,3)");
onBigQuery("DROP TABLE IF EXISTS " + tableName);
onBigQuery("CREATE TABLE " + tableName + " (a INT64, b INT64, c INT64)");
onBigQuery("INSERT INTO " + tableName + " VALUES (1,2,3)");

assertQuery(
"SELECT 1 FROM " + tableName + " WHERE " +
Expand All @@ -110,16 +102,14 @@ public void testPredicatePushdownPrunnedColumns()
@Test(description = "regression test for https://github.com/trinodb/trino/issues/5635")
public void testCountAggregationView()
{
BigQuery client = createBigQueryClient();

String tableName = "test.count_aggregation_table";
String viewName = "test.count_aggregation_view";

executeBigQuerySql(client, "DROP TABLE IF EXISTS " + tableName);
executeBigQuerySql(client, "DROP VIEW IF EXISTS " + viewName);
executeBigQuerySql(client, "CREATE TABLE " + tableName + " (a INT64, b INT64, c INT64)");
executeBigQuerySql(client, "INSERT INTO " + tableName + " VALUES (1, 2, 3), (4, 5, 6)");
executeBigQuerySql(client, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
onBigQuery("DROP TABLE IF EXISTS " + tableName);
onBigQuery("DROP VIEW IF EXISTS " + viewName);
onBigQuery("CREATE TABLE " + tableName + " (a INT64, b INT64, c INT64)");
onBigQuery("INSERT INTO " + tableName + " VALUES (1, 2, 3), (4, 5, 6)");
onBigQuery("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);

assertQuery(
"SELECT count(*) FROM " + viewName,
Expand All @@ -140,32 +130,28 @@ public void testCountAggregationView()
@Test
public void testRepeatCountAggregationView()
{
BigQuery client = createBigQueryClient();

String viewName = "test.repeat_count_aggregation_view_" + randomTableSuffix();

executeBigQuerySql(client, "DROP VIEW IF EXISTS " + viewName);
executeBigQuerySql(client, "CREATE VIEW " + viewName + " AS SELECT 1 AS col1");
onBigQuery("DROP VIEW IF EXISTS " + viewName);
onBigQuery("CREATE VIEW " + viewName + " AS SELECT 1 AS col1");

assertQuery("SELECT count(*) FROM " + viewName, "VALUES (1)");
assertQuery("SELECT count(*) FROM " + viewName, "VALUES (1)");

executeBigQuerySql(client, "DROP VIEW " + viewName);
onBigQuery("DROP VIEW " + viewName);
}

@Test
public void testViewDefinitionSystemTable()
{
BigQuery client = createBigQueryClient();

String schemaName = "test";
String tableName = "views_system_table_base_" + randomTableSuffix();
String viewName = "views_system_table_view_" + randomTableSuffix();

executeBigQuerySql(client, format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
executeBigQuerySql(client, format("DROP VIEW IF EXISTS %s.%s", schemaName, viewName));
executeBigQuerySql(client, format("CREATE TABLE %s.%s (a INT64, b INT64, c INT64)", schemaName, tableName));
executeBigQuerySql(client, format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s", schemaName, viewName, schemaName, tableName));
onBigQuery(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
onBigQuery(format("DROP VIEW IF EXISTS %s.%s", schemaName, viewName));
onBigQuery(format("CREATE TABLE %s.%s (a INT64, b INT64, c INT64)", schemaName, tableName));
onBigQuery(format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s", schemaName, viewName, schemaName, tableName));

assertEquals(
computeScalar(format("SELECT * FROM %s.\"%s$view_definition\"", schemaName, viewName)),
Expand All @@ -175,34 +161,8 @@ public void testViewDefinitionSystemTable()
format("SELECT * FROM %s.\"%s$view_definition\"", schemaName, tableName),
format("Table '%s.%s\\$view_definition' not found", schemaName, tableName));

executeBigQuerySql(client, format("DROP TABLE %s.%s", schemaName, tableName));
executeBigQuerySql(client, format("DROP VIEW %s.%s", schemaName, viewName));
}

private static void executeBigQuerySql(BigQuery bigquery, String query)
{
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build();

JobId jobId = JobId.of();
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

try {
queryJob = queryJob.waitFor();

if (queryJob == null) {
throw new RuntimeException(format("Job with uuid %s does not longer exists", jobId.getJob()));
}

if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(format("Query '%s' failed: %s", query, queryJob.getStatus().getError()));
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
onBigQuery(format("DROP TABLE %s.%s", schemaName, tableName));
onBigQuery(format("DROP VIEW %s.%s", schemaName, viewName));
}

@Override
Expand All @@ -221,4 +181,9 @@ public void testShowCreateTable()
" comment varchar NOT NULL\n" +
")");
}

private void onBigQuery(String sql)
{
bigQuerySqlExecutor.execute(sql);
}
}

0 comments on commit 281e7b1

Please sign in to comment.