ストリーミング パイプラインで LLM を実行する


このチュートリアルでは、Apache Beam RunInference API を使用して、ストリーミング Dataflow パイプラインで大規模言語モデル(LLM)を実行する方法について説明します。

RunInference API の詳細については、Apache Beam ドキュメントの Beam ML についてをご覧ください。

サンプルコードは GitHub で入手できます。

目標

  • モデルの入力とレスポンスの Pub/Sub トピックとサブスクリプションを作成します。
  • Vertex AI カスタムジョブを使用して、Cloud Storage にモデルを読み込みます。
  • パイプラインを実行します。
  • モデルに質問し、回答を得ます。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

このチュートリアルでは、依存関係をインストールするため 5 GB 以上の空き容量のあるマシンを使用します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI API を有効にします。

    gcloud services enable dataflow.googleapis.comcompute.googleapis.comstorage.googleapis.compubsub.googleapis.comaiplatform.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI API を有効にします。

    gcloud services enable dataflow.googleapis.comcompute.googleapis.comstorage.googleapis.compubsub.googleapis.comaiplatform.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER[email protected]" --role=SERVICE_ACCOUNT_ROLE

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • PROJECT_NUMBER: プロジェクトの番号。プロジェクト番号を確認するには、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE: 個々のロール。
  17. Google Cloud プロジェクト ID をコピーします。この値は、このチュートリアルの後半で必要になります。

Google Cloud リソースを作成する

このセクションでは、次のリソースを作成します。

  • 一時的な格納場所として使用する Cloud Storage バケット
  • モデルのプロンプトに使用する Pub/Sub トピック
  • モデルのレスポンスに使用する Pub/Sub トピックとサブスクリプション

Cloud Storage バケットを作成する

gcloud CLI を使用して Cloud Storage バケットを作成します。このバケットは、Dataflow パイプラインによって一時ストレージの場所として使用されます。

バケットを作成するには、gcloud storage buckets create コマンドを使用します。

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

次のように置き換えます。

  • BUCKET_NAME: バケットの命名要件を満たす Cloud Storage バケットの名前。Cloud Storage のバケット名は、グローバルに一意である必要があります。
  • LOCATION: バケットのロケーション

バケット名をコピーします。この値は、このチュートリアルの後半で必要になります。

Pub/Sub トピックとサブスクリプションを作成する

2 つの Pub/Sub トピックと 1 つのサブスクリプションを作成します。1 つのトピックはモデルに送信する入力プロンプト用です。もう一つのトピックとそれに関連付けられたサブスクリプションはモデルのレスポンス用です。

  1. トピックを作成するには、gcloud pubsub topics create コマンドを 2 回(トピックごとに 1 回ずつ)実行します。

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    次のように置き換えます。

    • PROMPTS_TOPIC_ID: モデルに送信する入力プロンプトのトピック ID(例: prompts
    • RESPONSES_TOPIC_ID: モデルのレスポンスのトピック ID(responses など)
  2. サブスクリプションを作成してレスポンス トピックに関連付けるには、gcloud pubsub subscriptions create コマンドを使用します。

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    RESPONSES_SUBSCRIPTION_ID は、モデルのレスポンスのサブスクリプション ID(responses-subscription など)に置き換えます。

トピック ID とサブスクリプション ID をコピーします。これらの値は、このチュートリアルの後の部分で必要になります。

環境を準備する

コードサンプルをダウンロードして、チュートリアルを実行するための環境を設定します。

python-docs-samples GitHub リポジトリのコードサンプルには、このパイプラインを実行するために必要なコードが含まれています。独自のパイプラインを構築する準備ができたら、このサンプルコードをテンプレートとして使用できます。

venv を使用して、パイプライン プロジェクトを実行するための隔離された Python 仮想環境を作成します。仮想環境を使用すると、1 つのプロジェクトの依存関係を他のプロジェクトの依存関係から分離できます。Python をインストールして仮想環境を作成する方法については、Python 開発環境の設定をご覧ください。

  1. git clone コマンドを使用して GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. run-inference ディレクトリに移動します。

    cd python-docs-samples/dataflow/run-inference
    
  3. コマンド プロンプトを使用している場合は、システムで Python 3 と pip が実行されていることを確認します。

    python --version
    python -m pip --version
    

    必要に応じて、Python 3 をインストールします。

    Cloud Shell を使用している場合は、Cloud Shell に Python がすでにインストールされているため、この手順をスキップできます。

  4. Python 仮想環境を作成します。

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. 依存関係をインストールします。

    pip install -r requirements.txt --no-cache-dir
    

モデル読み込みのコードサンプル

このチュートリアルのモデル読み込みコードは、モデルの state_dict オブジェクトを Cloud Storage に読み込む Vertex AI カスタムジョブを起動します。

スターター ファイルは、次のようになります。

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

パイプラインのコードサンプル

このチュートリアルのパイプライン コードは、次の処理を行う Dataflow パイプラインをデプロイします。

  • Pub/Sub からプロンプトを読み取り、テキストをトークン テンソルにエンコードします。
  • RunInference 変換を実行します。
  • 出力トークンのテンソルをテキストにデコードし、レスポンスを Pub/Sub に書き込みます。

スターター ファイルは、次のようになります。

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

モデルを読み込む

LLM は非常に大規模なモデルになる可能性があります。一般に、より多くのパラメータでトレーニングされた大規模なモデルほど、より良い結果が得られます。ただし、モデルが大きいほど、実行により大きなマシンとより多くのメモリが必要になります。大規模なモデルは CPU での実行速度も遅くなる可能性があります。

Dataflow で PyTorch モデルを実行する前に、モデルの state_dict オブジェクトを読み込む必要があります。モデルの state_dict オブジェクトにはモデルの重みが保存されます。

Apache Beam RunInference 変換を使用する Dataflow パイプラインでは、モデルの state_dict オブジェクトを Cloud Storage に読み込む必要があります。state_dict オブジェクトを Cloud Storage に読み込むために使用するマシンには、モデルの読み込みに十分なメモリが必要です。重みをダウンロードして Cloud Storage にアップロードするため、マシンには高速のインターネット接続も必要です。

次の表に、各モデルのパラメータの数と、各モデルの読み込みに必要な最小メモリを示します。

モデル パラメータ 必要なメモリ
google/flan-t5-small 8,000 万 320 MB 超
google/flan-t5-base 25,000 万 1 GB 超
google/flan-t5-large 78,000 万 3.2 GB 超
google/flan-t5-xl 30 億 12 GB 超
google/flan-t5-xxl 110 億 44 GB 超
google/flan-ul2 200 億 80 GB 超

小さなモデルをローカルに読み込むこともできますが、このチュートリアルでは、適切なサイズの VM でモデルを読み込む Vertex AI カスタムジョブを起動する方法について説明します。

LLM は非常に大きくなる可能性があるため、このチュートリアルの例では、state_dict オブジェクトをデフォルトの float32 形式ではなく float16 形式で保存します。この構成では、各パラメータが 32 ビットではなく 16 ビットを使用するため、state_dict オブジェクトのサイズが半分になります。サイズが小さいほど、モデルの読み込みに必要な時間が短くなります。ただし、形式の変換は、VM がモデルと state_dict オブジェクトの両方をメモリに適合させる必要があることを意味します。

次の表に、state_dict オブジェクトが float16 形式で保存された後にモデルを読み込むための最小要件を示します。この表には、Vertex AI を使用してモデルを読み込む際の推奨マシンタイプも示します。Vertex AI の最小(デフォルト)のディスクサイズは 100 GB ですが、一部のモデルではより大きなディスクが必要になる場合があります。

モデル名 必要なメモリ マシンタイプ VM メモリ VM ディスク
google/flan-t5-small 480 MB 超 e2-standard-4 16 GB 100 GB
google/flan-t5-base 1.5 GB 超 e2-standard-4 16 GB 100 GB
google/flan-t5-large 4.8 GB 超 e2-standard-4 16 GB 100 GB
google/flan-t5-xl 18 GB 超 e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl 66 GB 超 e2-highmem-16 128 GB 100 GB
google/flan-ul2 120 GB 超 e2-highmem-16 128 GB 150 GB

Vertex AI カスタムジョブを使用して、モデルの state_dict オブジェクトを Cloud Storage に読み込みます。

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

次のように置き換えます。

  • MODEL_NAME: モデルの名前(google/flan-t5-xl など)。
  • VERTEX_AI_MACHINE_TYPE: Vertex AI カスタムジョブを実行するマシンのタイプ(e2-highmem-4 など)。
  • DISK_SIZE_GB: VM のディスクサイズ(GB)。最小サイズは 100 GB です。

モデルのサイズによっては、モデルの読み込みに数分かかることがあります。ステータスを表示するには、Vertex AI の [カスタムジョブ] ページに移動します。

[カスタムジョブ] に移動

パイプラインを実行する

モデルを読み込んだ後、Dataflow パイプラインを実行します。パイプラインを実行するには、各ワーカーが使用するモデルとメモリの両方がメモリに収まる必要があります。

次の表に、推論パイプラインの実行に推奨されるマシンタイプを示します。

モデル名 マシンタイプ VM メモリ
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

パイプラインを実行します。

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • PROMPTS_TOPIC_ID: モデルに送信する入力プロンプトのトピック ID
  • RESPONSES_TOPIC_ID: モデルのレスポンスのトピック ID
  • MODEL_NAME: モデルの名前(例: google/flan-t5-xl
  • BUCKET_NAME: バケットの名前
  • REGION: ジョブをデプロイするリージョン(us-central1 など)
  • DATAFLOW_MACHINE_TYPE: パイプラインを実行する VM(n2-highmem-4 など)

モデルがワーカーごとに 1 回だけ読み込まれ、メモリが不足しないようにするには、パイプライン オプション --experiments=no_use_multiple_sdk_containers を設定して、単一プロセスを使用するようにワーカーを構成します。RunInference 変換は同じモデルを複数のスレッドと共有するため、スレッド数を制限する必要はありません。

この例のパイプラインは CPU で実行されます。モデルが大きいほど、各リクエストの処理に時間がかかります。レスポンスを高速化する必要がある場合は、GPU を有効にします

パイプラインのステータスを表示するには、Dataflow の [ジョブ] ページに移動します。

[ジョブ] に移動

モデルに質問する

パイプラインの実行が開始されたら、モデルにプロンプトを提供し、レスポンスを取得します。

  1. Pub/Sub にメッセージをパブリッシュして、プロンプトを送信します。gcloud pubsub topics publish コマンドを実行します。

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    PROMPT_TEXT は、表示するプロンプトを含む文字列に置き換えます。プロンプトは引用符で囲みます。

    独自のプロンプトを使用するか、次のいずれかの例を試してください。

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. レスポンスを取得するには、gcloud pubsub subscriptions pull コマンドを使用します。

    モデルのサイズによっては、モデルがレスポンスを生成するまでに数分かかることがあります。モデルが大きいほど、デプロイとレスポンスの生成に時間がかかります。

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    RESPONSES_SUBSCRIPTION_ID は、モデルのレスポンスのサブスクリプション ID に置き換えます。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

  1. Python 仮想環境を終了します。

    deactivate
  2. パイプラインを停止します。

    1. 実行中の Dataflow ジョブのジョブ ID を一覧表示し、チュートリアルのジョブのジョブ ID をメモします。

      gcloud dataflow jobs list --region=REGION --status=active
    2. ジョブをキャンセルします。

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. バケットとその内容を削除します。

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. トピックとサブスクリプションを削除します。

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER[email protected] --role=SERVICE_ACCOUNT_ROLE
  6. 省略可: Google アカウントのロールを取り消します。

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  8. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ