Skip to content

Commit

Permalink
Add Dataproc SparkR Example (#8240)
Browse files Browse the repository at this point in the history
* GCP SparkR Example

Allows you to schedule R, and sparkR jobs on a dataproc cluster.
The functionality to run that kind of job is already in dataproc,
but it was not so clear how to do that from Airflow.

* Update airflow/providers/google/cloud/example_dags/example_dataproc.py

Co-Authored-By: Tomek Urbaszek <[email protected]>

* Make sure R file finds correct library

Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
roelhogervorst and turbaszek committed Apr 16, 2020
1 parent befff3e commit efcffa3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
14 changes: 13 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_dataproc.py
Expand Up @@ -38,7 +38,8 @@
OUTPUT_PATH = "gs://{}/{}/".format(BUCKET, OUTPUT_FOLDER)
PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)

SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)

# Cluster definition
CLUSTER = {
Expand Down Expand Up @@ -104,6 +105,12 @@
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}

SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": SPARKR_URI},
}

HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
Expand Down Expand Up @@ -157,6 +164,10 @@
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)

sparkr_task = DataprocSubmitJobOperator(
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
)

hive_task = DataprocSubmitJobOperator(
task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID
)
Expand All @@ -178,4 +189,5 @@
scale_cluster >> spark_sql_task >> delete_cluster
scale_cluster >> spark_task >> delete_cluster
scale_cluster >> pyspark_task >> delete_cluster
scale_cluster >> sparkr_task >> delete_cluster
scale_cluster >> hadoop_task >> delete_cluster
22 changes: 20 additions & 2 deletions tests/providers/google/cloud/operators/test_dataproc_system.py
Expand Up @@ -25,6 +25,8 @@
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)
SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)

pyspark_file = """
#!/usr/bin/python
Expand All @@ -35,16 +37,32 @@
print(words)
"""

sparkr_file = """
#!/usr/bin/r
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session()
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
"""


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.credential_file(GCP_DATAPROC_KEY)
class DataprocExampleDagsTest(GoogleSystemTest):

@provide_gcp_context(GCP_DATAPROC_KEY)
def setUp(self):
super().setUp()
self.create_gcs_bucket(BUCKET)
self.upload_content_to_gcs(lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN)
self.upload_content_to_gcs(
lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN
)
self.upload_content_to_gcs(
lines=sparkr_file, bucket=SPARKR_URI, filename=SPARKR_MAIN
)

@provide_gcp_context(GCP_DATAPROC_KEY)
def tearDown(self):
Expand Down

0 comments on commit efcffa3

Please sign in to comment.