機械学習で Dataproc、BigQuery、Apache Spark ML を使用する


Apache Spark 用の BigQuery コネクタを使用すると、データ サイエンティストは、BigQuery のシームレスでスケーラブルな SQL エンジンの能力と Apache Spark の機械学習機能を融合できます。このチュートリアルでは、Dataproc、BigQuery、Apache Spark ML を使用してデータセットで機械学習を実施する方法を示します。

目標

線形回帰を使用して次の 5 つの要素に基づく出生時体重のモデルを構築します。

次のツールを使用します。

  • BigQuery(Google Cloud プロジェクトに書き込まれる線形回帰入力テーブルを準備する)
  • BigQuery でのデータのクエリと管理に使用する Python
  • 生成された線形回帰テーブルにアクセスする Apache Spark
  • モデルの構築と評価を行う Spark ML
  • Spark ML 関数を呼び出す Dataproc PySpark ジョブ

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Compute Engine
  • Dataproc
  • BigQuery

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

Dataproc クラスタには、Spark ML などの Spark コンポーネントがインストールされています。Dataproc クラスタをセットアップしてこの例で示すコードを実行するには、次のことを行う(またはすでに行っている)必要があります。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Dataproc, BigQuery, Compute Engine API を有効にします。

    API を有効にする

  4. Google Cloud CLI をインストールします。
  5. gcloud CLI を初期化するには:

    gcloud init
  6. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  7. Dataproc, BigQuery, Compute Engine API を有効にします。

    API を有効にする

  8. Google Cloud CLI をインストールします。
  9. gcloud CLI を初期化するには:

    gcloud init
  10. プロジェクト内で Dataproc クラスタを作成します。クラスタでは、Spark 2.0 以降の Dataproc バージョン(機械学習ライブラリが含まれているバージョン)が実行されている必要があります。

BigQuery natality データのサブセットを作成する

このセクションでは、プロジェクトにデータセットを作成し、そのデータセットにテーブルを作成して、公表されている出生率 BigQuery データセットから出生率データのサブセットをコピーします。このチュートリアルの後半では、このテーブルのサブセット データを使用して、母親の年齢、父親の年齢、妊娠週に基づいて出生体重を予測します。

データ サブセットは、Google Cloud Console を使用するか、ローカルマシンで Python スクリプトを実行して作成できます。

Console

  1. プロジェクトにデータセットを作成します。

    1. BigQuery ウェブ UI に移動します
    2. 左側のナビゲーション パネルでプロジェクト名をクリックし、続いて [データセットを作成] をクリックします。
    3. [データセットを作成] ダイアログで、次のように指定します。
      1. [データセット ID] に「natality_regression」と入力します。
      2. [データのロケーション] では、データセットのロケーションを選択できます。ロケーションのデフォルト値は、US multi-region です。データセットの作成後はロケーションを変更できません。
      3. [デフォルトのテーブルの有効期限] では、次のいずれかのオプションを選択します。
        • [無期限](デフォルト): テーブルは手動で削除する必要があります。
        • [日数]: テーブルは、作成日から指定した日数後に削除されます。
      4. [暗号化] では、次のいずれかのオプションを選択します。
      5. [データセットを作成] をクリックします。
  2. 公表されている出生率データセットに対してクエリを実行し、クエリ結果をデータセット内の新しいテーブルに保存します。

    1. 次のクエリをクエリエディタにコピーして貼り付け、[実行] をクリックします。
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. クエリが完了すると(約 1 分)、結果がプロジェクトの natality_regression データセットの「regression_input」BigQuery テーブルとしてプロジェクトに保存されます。

Python

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。 詳細については、Dataproc Python API のリファレンス ドキュメントをご覧ください。

Dataproc への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

  1. Python および Python 用 Google Cloud クライアント ライブラリ(コードの実行に必要)をインストールする方法については、Python 開発環境の設定をご覧ください。Python virtualenv をインストールして使用することをおすすめします。

  2. 下の natality_tutorial.py コードをローカルマシンの python シェルにコピーして貼り付けます。シェルで <return> キーを押してコードを実行し、デフォルトの Google Cloud プロジェクトに「natality_regression」BigQuery データセットを作成して、公表されている natality データのサブセットを「regression_input」テーブルに設定します。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. natality_regression データセットと regression_input テーブルの作成を確認します。

線形回帰を実行する

このセクションでは、Google Cloud Console を使用して Dataproc サービスにジョブを送信するか、ローカルのターミナルから gcloud コマンドを実行して PySpark 線形回帰を実行します。

Console

  1. ローカルマシンの新しい natality_sparkml.py ファイルに、次のコードをコピーして貼り付けます。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    
    

  2. ローカルの natality_sparkml.py ファイルをプロジェクトの Cloud Storage バケットにコピーします。

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Dataproc の [ジョブを送信] ページから回帰を実行します。

    1. [メインの Python ファイル] フィールドで、natality_sparkml.py ファイルのコピーが置かれている Cloud Storage バケットの gs:// URI を挿入します。

    2. [ジョブタイプ] として PySpark を選択します。

    3. gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jarJAR ファイル フィールドに挿入します。これにより、実行時に spark-bigquery-connector を PySpark アプリケーションで使用できるようになり、BigQuery データを Spark DataFrame に読み込めます。

    4. [ジョブ ID]、[リージョン]、[クラスタ] フィールドに入力します。

    5. [送信] をクリックして、クラスタでジョブを実行します。

ジョブが完了すると、線形回帰出力モデルの概要が Dataproc の [ジョブの詳細] ウィンドウに表示されます。

gcloud

  1. ローカルマシンの新しい natality_sparkml.py ファイルに、次のコードをコピーして貼り付けます。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    
    

  2. ローカルの natality_sparkml.py ファイルをプロジェクトの Cloud Storage バケットにコピーします。

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. 下に示すようにローカルマシンのターミナル ウィンドウから gcloud コマンドを実行することにより、Pyspark ジョブを Dataproc サービスに送信します。

    1. --jars フラグの値を指定することで、実行時に spark-bigquery-connector を PySpark jobv で使用できるようになり、BigQuery データを Spark DataFrame に読み込めます。
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

ジョブが完了すると、線形回帰出力(モデル概要)がターミナル ウィンドウに表示されます。

<<< # モデルサマリーを出力します。
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

  

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Dataproc クラスタの削除

クラスタの削除をご覧ください。

次のステップ