Skip to content

Commit

Permalink
Feat: Onboard Human Variant Annotation dataset (#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-google committed Aug 25, 2022
1 parent cacd9f1 commit ebfe4de
Show file tree
Hide file tree
Showing 11 changed files with 630 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_storage_bucket" "human-variant-annotation" {
name = "${var.bucket_name_prefix}-human-variant-annotation"
force_destroy = true
location = "US"
uniform_bucket_level_access = true
lifecycle {
ignore_changes = [
logging,
]
}
}

output "storage_bucket-human-variant-annotation-name" {
value = google_storage_bucket.human-variant-annotation.name
}
28 changes: 28 additions & 0 deletions datasets/human_variant_annotation/infra/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


provider "google" {
project = var.project_id
impersonate_service_account = var.impersonating_acct
region = var.region
}

data "google_client_openid_userinfo" "me" {}

output "impersonating-account" {
value = data.google_client_openid_userinfo.me.email
}
26 changes: 26 additions & 0 deletions datasets/human_variant_annotation/infra/variables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


variable "project_id" {}
variable "bucket_name_prefix" {}
variable "impersonating_acct" {}
variable "region" {}
variable "env" {}
variable "iam_policies" {
default = {}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# The base image for this build
FROM python:3.8

# Allow statements and log messages to appear in Cloud logs
ENV PYTHONUNBUFFERED True

# Copy the requirements file into the image
COPY requirements.txt ./

# Install the packages specified in the requirements file
RUN python3 -m pip install --no-cache-dir -r requirements.txt

# The WORKDIR instruction sets the working directory for any RUN, CMD,
# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile.
# If the WORKDIR doesn’t exist, it will be created even if it’s not used in
# any subsequent Dockerfile instruction
WORKDIR /custom

# Copy the specific data processing script/s in the image under /custom/*
COPY ./csv_transform.py .

# Command to run the data processing script when the container is run
CMD ["python3", "csv_transform.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging
import os
import pathlib
import typing

import requests
from google.cloud import storage


def main(
base_url: str,
folder: pathlib.Path,
version: str,
gcs_bucket: str,
target_gcs_folder: str,
pipeline: str,
) -> None:
logging.info(
f"Human Variant Annotation Dataset {pipeline} pipeline process started at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)
logging.info(f"Creating './files/{folder}'")
pathlib.Path(f"./files/{folder}").mkdir(parents=True, exist_ok=True)
dates = get_dates()
for date in dates:
date_time = datetime.datetime.strptime(date, "%Y%m%d")
get_files(date_time, base_url, version, folder, gcs_bucket, target_gcs_folder)

logging.info(
f"Human Variant Annotation Dataset {pipeline} pipeline process completed at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)


def get_dates() -> typing.List[str]:
today = datetime.datetime.now()
start_date = datetime.datetime(today.year, today.month - 1, 1)
end_date = today
delta = datetime.timedelta(days=1)
dates = []
while start_date <= end_date:
dates.append(start_date.strftime("%Y%m%d"))
start_date += delta
return dates


def get_files(
date_time: datetime.datetime,
base_url: str,
version: str,
folder: pathlib.Path,
gcs_bucket: str,
target_gcs_folder: str,
) -> None:
file_name = f"clinvar_{date_time.strftime('%Y%m%d')}.vcf.gz"
source_url = base_url + f"archive_{version}/{date_time.strftime('%Y')}/{file_name}"
source_file = f"./files/{folder}/{file_name}"
status_code = download_gzfile(source_url, source_file)
if status_code == 200:
target_gcs_path = f"{target_gcs_folder}{file_name}"
upload_file_to_gcs(source_file, gcs_bucket, target_gcs_path)
else:
pass


def download_gzfile(source_url: str, source_file: str) -> int:
logging.info(f"Downloading data from {source_url} to {source_file} .")
res = requests.get(source_url, stream=True)
if res.status_code == 200:
with open(source_file, "wb") as fb:
for chunk in res:
fb.write(chunk)
logging.info(f"\tDownloaded data from {source_url} into {source_file}")
else:
logging.info(f"Couldn't download {source_url}: Error {res.status_code}")
return res.status_code


def upload_file_to_gcs(
source_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str
) -> None:
logging.info(f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}")
storage_client = storage.Client()
bucket = storage_client.bucket(target_gcs_bucket)
blob = bucket.blob(target_gcs_path)
blob.upload_from_filename(source_file)
logging.info("Successfully uploaded file to gcs bucket.")


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main(
base_url=os.environ.get("BASE_URL", ""),
folder=pathlib.Path(os.environ.get("FOLDER", "")).expanduser(),
version=os.environ.get("VERSION", "2.0"),
gcs_bucket=os.environ.get("GCS_BUCKET", ""),
target_gcs_folder=os.environ.get("TARGET_GCS_FOLDER", ""),
pipeline=os.environ.get("PIPELINE", ""),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
google-cloud-storage
requests
99 changes: 99 additions & 0 deletions datasets/human_variant_annotation/pipelines/clinvar/clinvar_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_gcs

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}


with DAG(
dag_id="human_variant_annotation.clinvar",
default_args=default_args,
max_active_runs=1,
schedule_interval="@daily",
catchup=False,
default_view="graph",
) as dag:

# Run CSV transform within kubernetes pod
clinvar_vcf_grch37 = kubernetes_pod.KubernetesPodOperator(
task_id="clinvar_vcf_grch37",
startup_timeout_seconds=600,
name="name_basics",
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.human_variant_annotation.container_registry.run_csv_transform_kub }}",
env_vars={
"BASE_URL": "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh37/",
"FOLDER": "vcf_GRCh37",
"VERSION": "2.0",
"GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_FOLDER": "data/human_variant_annotation/clinVar-vcf_GRCh37/",
"PIPELINE": "clinvar",
},
resources={"limit_memory": "1G", "limit_cpu": "1"},
)

# Task to run a GoogleCloudStorageToGoogleCloudStorageOperator
copy_clinvar_v1_to_gcs_destination_bucket = gcs_to_gcs.GCSToGCSOperator(
task_id="copy_clinvar_v1_to_gcs_destination_bucket",
source_bucket="{{ var.value.composer_bucket }}",
source_object="data/human_variant_annotation/clinVar-vcf_GRCh37/*",
destination_bucket="{{ var.json.human_variant_annotation.destination_bucket }}",
destination_object="human-variant-annotation/clinVar-vcf_GRCh37/",
move_object=False,
replace=False,
)

# Run CSV transform within kubernetes pod
clinvar_vcf_grch38 = kubernetes_pod.KubernetesPodOperator(
task_id="clinvar_vcf_grch38",
startup_timeout_seconds=600,
name="name_basics",
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.human_variant_annotation.container_registry.run_csv_transform_kub }}",
env_vars={
"BASE_URL": "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/",
"FOLDER": "vcf_GRCh38",
"VERSION": "2.0",
"GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_FOLDER": "data/human_variant_annotation/clinVar-vcf_GRCh38/",
"PIPELINE": "db_snp",
},
resources={"limit_memory": "1G", "limit_cpu": "1"},
)

# Task to run a GoogleCloudStorageToGoogleCloudStorageOperator
copy_clinvar_v2_to_gcs_destination_bucket = gcs_to_gcs.GCSToGCSOperator(
task_id="copy_clinvar_v2_to_gcs_destination_bucket",
source_bucket="{{ var.value.composer_bucket }}",
source_object="data/human_variant_annotation/clinVar-vcf_GRCh38/*",
destination_bucket="{{ var.json.human_variant_annotation.destination_bucket }}",
destination_object="human-variant-annotation/clinVar-vcf_GRCh38/",
move_object=False,
replace=False,
)

clinvar_vcf_grch37 >> copy_clinvar_v1_to_gcs_destination_bucket
clinvar_vcf_grch38 >> copy_clinvar_v2_to_gcs_destination_bucket

0 comments on commit ebfe4de

Please sign in to comment.