Skip to content

Commit

Permalink
GCSToBigQueryOperator allow for schema_object in alternate GCS Bucket (
Browse files Browse the repository at this point in the history
  • Loading branch information
patricker committed Sep 7, 2022
1 parent 63562d7 commit 8cac969
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Expand Up @@ -56,6 +56,8 @@ class GCSToBigQueryOperator(BaseOperator):
:param schema_object: If set, a GCS object path pointing to a .json file that
contains the schema for the table. (templated)
Parameter must be defined if 'schema_fields' is null and autodetect is False.
:param schema_object_bucket: [Optional] If set, the GCS bucket where the schema object
template is stored. (templated) (Default: the value of ``bucket``)
:param source_format: File format to export.
:param compression: [Optional] The compression type of the data source.
Possible values include GZIP and NONE.
Expand Down Expand Up @@ -133,6 +135,7 @@ class GCSToBigQueryOperator(BaseOperator):
'bucket',
'source_objects',
'schema_object',
'schema_object_bucket',
'destination_project_dataset_table',
'impersonation_chain',
)
Expand All @@ -147,6 +150,7 @@ def __init__(
destination_project_dataset_table,
schema_fields=None,
schema_object=None,
schema_object_bucket=None,
source_format='CSV',
compression='NONE',
create_disposition='CREATE_IF_NEEDED',
Expand Down Expand Up @@ -187,6 +191,10 @@ def __init__(
self.source_objects = source_objects
self.schema_object = schema_object

if schema_object_bucket is None:
schema_object_bucket = bucket
self.schema_object_bucket = schema_object_bucket

# BQ config
self.destination_project_dataset_table = destination_project_dataset_table
self.schema_fields = schema_fields
Expand Down Expand Up @@ -236,7 +244,7 @@ def execute(self, context: 'Context'):
impersonation_chain=self.impersonation_chain,
)
blob = gcs_hook.download(
bucket_name=self.bucket,
bucket_name=self.schema_object_bucket,
object_name=self.schema_object,
)
schema_fields = json.loads(blob.decode("utf-8"))
Expand Down

0 comments on commit 8cac969

Please sign in to comment.