Skip to content

Commit

Permalink
feat: Onboard Uniref50 dataset (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
gkodukula committed Sep 13, 2022
1 parent de7f1fa commit b44d572
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ def load_data_to_bq(
job_config.write_disposition = "WRITE_TRUNCATE"
else:
job_config.write_disposition = "WRITE_APPEND"
print("appending started")
job_config.skip_leading_rows = 1 # ignore the header
job_config.autodetect = False
with open(file_path, "rb") as source_file:
Expand Down Expand Up @@ -331,9 +330,11 @@ def create_table_schema(
def append_batch_file(target_file_batch: str, target_file: str) -> None:

with open(target_file_batch, "r") as data_file:
with open(target_file, "a+") as target_file:
with open(target_file, "a+") as _target_file:
logging.info(f"Appending batch file {target_file_batch} to {target_file}")
target_file.write(data_file.read())
logging.info(f"Size of target file is {os.path.getsize(target_file_batch)}")
logging.info(f"Size of target file is {os.path.getsize(target_file)}")
_target_file.write(data_file.read())
if os.path.exists(target_file_batch):
os.remove(target_file_batch)

Expand Down
18 changes: 9 additions & 9 deletions datasets/uniref50/pipelines/uniref50/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -151,7 +151,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -221,7 +221,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -291,7 +291,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -361,7 +361,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -431,7 +431,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -501,7 +501,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -571,7 +571,7 @@ dag:
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "50G"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -614,4 +614,4 @@ dag:
description: ""
mode: "NULLABLE"
graph_paths:
- "download_zip_file >> uniref50_transform_csv_1 >> load_uniref50_to_bq_1 >> uniref50_transform_csv_2 >> load_uniref50_to_bq_2 >> uniref50_transform_csv_3 >> load_uniref50_to_bq_3 >> uniref50_transform_csv_4 >> load_uniref50_to_bq_4 >> uniref50_transform_csv_5 >> load_uniref50_to_bq_5 >> uniref50_transform_csv_6 >> load_uniref50_to_bq_6 >> uniref50_transform_csv_7 >> load_uniref50_to_bq_7 >> uniref50_transform_csv_8 >> load_uniref50_to_bq_8"
- "download_zip_file >> [ uniref50_transform_csv_1, uniref50_transform_csv_2, uniref50_transform_csv_3, uniref50_transform_csv_4, uniref50_transform_csv_5, uniref50_transform_csv_6, uniref50_transform_csv_7, uniref50_transform_csv_8 ] >> load_uniref50_to_bq_1 >> load_uniref50_to_bq_2 >> load_uniref50_to_bq_3 >> load_uniref50_to_bq_4 >> load_uniref50_to_bq_5 >> load_uniref50_to_bq_6 >> load_uniref50_to_bq_7 >> load_uniref50_to_bq_8 "
36 changes: 19 additions & 17 deletions datasets/uniref50/pipelines/uniref50/uniref50_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
dag_id="uniref50.uniref50",
default_args=default_args,
max_active_runs=1,
schedule_interval="@weekly",
schedule_interval="@once",
catchup=False,
default_view="graph",
) as dag:
Expand Down Expand Up @@ -71,7 +71,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -143,7 +143,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -215,7 +215,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -287,7 +287,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -359,7 +359,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -431,7 +431,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -503,7 +503,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -575,7 +575,7 @@
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "50G",
"request_ephemeral_storage": "10G",
},
)

Expand Down Expand Up @@ -622,20 +622,22 @@

(
download_zip_file
>> uniref50_transform_csv_1
>> [
uniref50_transform_csv_1,
uniref50_transform_csv_2,
uniref50_transform_csv_3,
uniref50_transform_csv_4,
uniref50_transform_csv_5,
uniref50_transform_csv_6,
uniref50_transform_csv_7,
uniref50_transform_csv_8,
]
>> load_uniref50_to_bq_1
>> uniref50_transform_csv_2
>> load_uniref50_to_bq_2
>> uniref50_transform_csv_3
>> load_uniref50_to_bq_3
>> uniref50_transform_csv_4
>> load_uniref50_to_bq_4
>> uniref50_transform_csv_5
>> load_uniref50_to_bq_5
>> uniref50_transform_csv_6
>> load_uniref50_to_bq_6
>> uniref50_transform_csv_7
>> load_uniref50_to_bq_7
>> uniref50_transform_csv_8
>> load_uniref50_to_bq_8
)

0 comments on commit b44d572

Please sign in to comment.