스트리밍 파이프라인에서 LLM 실행


이 튜토리얼에서는 Apache Beam RunInference API를 사용하여 스트리밍 Dataflow 파이프라인에서 대규모 언어 모델(LLM)을 실행하는 방법을 보여줍니다.

RunInference API에 대한 자세한 내용은 Apache Beam 문서에서 Beam ML 정보를 참조하세요.

예시 코드는 GitHub에서 제공됩니다.

목표

  • 모델 입력 및 응답에 대한 Pub/Sub 주제 및 구독을 만듭니다.
  • Vertex AI 커스텀 작업을 사용하여 Cloud Storage에 모델을 로드합니다.
  • 파이프라인을 실행합니다.
  • 모델에 질문을 하고 응답을 얻습니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

종속 항목 설치를 위해 여유 디스크 공간이 5GB 이상인 머신에서 이 튜토리얼을 실행합니다.

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI API를 사용 설정합니다.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  7. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  8. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  9. Google Cloud CLI를 설치합니다.
  10. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  13. Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI API를 사용 설정합니다.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  14. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  15. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  16. Compute Engine 기본 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER[email protected]" --role=SERVICE_ACCOUNT_ROLE

    다음을 바꿉니다.

    • PROJECT_ID: 프로젝트 ID입니다.
    • PROJECT_NUMBER: 프로젝트 번호입니다. 프로젝트 번호를 찾으려면 gcloud projects describe 명령어를 사용합니다.
    • SERVICE_ACCOUNT_ROLE: 개별 역할입니다.
  17. Google Cloud 프로젝트 ID를 복사합니다. 이 값은 이 튜토리얼의 뒷부분에서 사용됩니다.

Google Cloud 리소스 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • 임시 스토리지 위치로 사용할 Cloud Storage 버킷
  • 모델 프롬프트의 Pub/Sub 주제
  • 모델 응답의 Pub/Sub 주제 및 구독

Cloud Storage 버킷 만들기

gcloud CLI를 사용하여 Cloud Storage 버킷을 만듭니다. 이 버킷은 Dataflow 파이프라인에서 임시 스토리지 위치로 사용됩니다.

버킷을 만들기 위해 gcloud storage buckets create 명령어를 사용합니다.

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

다음을 바꿉니다.

  • BUCKET_NAME: 버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름입니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
  • LOCATION: 버킷의 위치입니다.

버킷 이름을 복사합니다. 이 값은 이 튜토리얼의 뒷부분에서 사용됩니다.

Pub/Sub 주제 및 구독 만들기

Pub/Sub 주제 2개와 하나의 구독을 만듭니다. 하나의 주제는 모델에 전송하는 입력 프롬프트에 사용됩니다. 다른 주제 및 연결된 구독은 모델의 응답에 사용됩니다.

  1. 주제를 만들기 위해 각 주제에 한 번씩 gcloud pubsub topics create 명령어를 두 번 실행합니다.

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    다음을 바꿉니다.

    • PROMPTS_TOPIC_ID: 모델에 전송할 입력 프롬프트의 주제 ID입니다(예: prompts).
    • RESPONSES_TOPIC_ID: 모델 응답에 대한 주제 ID입니다(예: responses).
  2. 구독을 만들고 응답 주제에 연결하려면 gcloud pubsub subscriptions create 명령어를 사용합니다.

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    RESPONSES_SUBSCRIPTION_IDresponses-subscription과 같은 모델 응답의 구독 ID로 바꿉니다.

주제 ID 및 구독 ID를 복사합니다. 이러한 값은 이 튜토리얼의 뒷부분에서 필요합니다.

개발 환경 준비

코드 샘플을 다운로드한 후 튜토리얼을 실행하도록 환경을 설정합니다.

python-docs-samples GitHub 저장소의 코드 샘플은 이 파이프라인을 실행하는 데 필요한 코드를 제공합니다. 자체 파이프라인을 빌드할 준비가 되었으면 이 샘플 코드를 템플릿으로 사용할 수 있습니다.

venv를 사용해서 파이프라인 프로젝트를 실행하도록 격리된 Python 가상 환경을 만듭니다. 가상 환경을 사용하면 다른 프로젝트의 종속 항목으로부터 한 프로젝트의 종속 항목을 분리할 수 있습니다. Python을 설치하고 가상 환경을 만드는 방법은 Python 개발 환경 설정을 참조하세요.

  1. git clone 명령어를 사용하여 GitHub 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. run-inference 디렉터리로 이동합니다.

    cd python-docs-samples/dataflow/run-inference
    
  3. 명령어 프롬프트를 사용하는 경우 Python 3 및 pip가 시스템에서 실행되는지 확인합니다.

    python --version
    python -m pip --version
    

    필요한 경우 Python 3를 설치합니다.

    Cloud Shell을 사용하는 경우 Cloud Shell에 Python이 이미 설치되어 있으므로 이 단계를 건너뛸 수 있습니다.

  4. Python 가상 환경을 만듭니다.

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. 종속 항목을 설치합니다.

    pip install -r requirements.txt --no-cache-dir
    

모델 로드 코드 샘플

이 튜토리얼의 모델 로드 코드는 모델의 state_dict 객체를 Cloud Storage에 로드하는 Vertex AI 커스텀 작업을 실행합니다.

시작 파일은 다음과 같습니다.

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM

def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")

def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

파이프라인 코드 샘플

이 튜토리얼의 파이프라인 코드는 다음을 수행하는 Dataflow 파이프라인을 배포합니다.

  • Pub/Sub에서 프롬프트를 읽고 텍스트를 토큰 텐서로 인코딩합니다.
  • RunInference 변환을 실행합니다.
  • 출력 토큰 텐서를 텍스트로 디코딩하고 응답을 Pub/Sub에 씁니다.

시작 파일은 다음과 같습니다.

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256

def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]

def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)

class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

모델 로드

LLM은 매우 큰 모델일 수 있습니다. 더 많은 매개변수로 학습된 더 큰 모델일수록 일반적으로 더 나은 결과를 제공합니다. 그러나 모델이 클수록 실행하는 데 더 큰 머신과 메모리가 필요합니다. 또한 모델이 클수록 CPU에서 실행 속도가 느려질 수 있습니다.

Dataflow에서 PyTorch 모델을 실행하기 전에 모델의 state_dict 객체를 로드해야 합니다. 모델의 state_dict 객체는 모델의 가중치를 저장합니다.

Apache Beam RunInference 변환을 사용하는 Dataflow 파이프라인에서 모델의 state_dict 객체를 Cloud Storage에 로드해야 합니다. Cloud Storage에 state_dict 객체를 로드하기 위해 사용하는 머신에 모델을 로드하는 데 충분한 메모리가 있어야 합니다. 또한 가중치를 다운로드하고 이를 Cloud Storage에 업로드하기 위해 머신에 빠른 인터넷 연결이 필요합니다.

다음 표에서는 각 모델의 매개변수 수와 각 모델을 로드하는 데 필요한 최소 메모리를 보여줍니다.

모델 매개변수 필요한 메모리
google/flan-t5-small 8천만 320MB 초과
google/flan-t5-base 2억 5천만 1GB 초과
google/flan-t5-large 7억 8천만 3.2GB 초과
google/flan-t5-xl 30억 12GB 초과
google/flan-t5-xxl 110억 44GB 초과
google/flan-ul2 200억 80GB 초과

더 작은 모델을 로컬로 로드할 수 있지만 이 튜토리얼에서는 적절한 크기의 VM으로 모델을 로드하는 Vertex AI 커스텀 작업을 실행하는 방법을 보여줍니다.

LLM이 매우 클 수 있기 때문에 이 튜토리얼의 예시에서는 기본 float32 형식 대신 float16 형식으로 state_dict 객체를 저장합니다. 이 구성에서는 각 매개변수에 32비트 대신 16비트가 사용되어 state_dict 객체 크기가 절반이 됩니다. 크기가 작을수록 모델을 로드하는 데 필요한 시간이 최소화됩니다. 그러나 형식 변환에 따라 VM이 모델 및 state_dict 객체를 메모리에 맞춰야 합니다.

다음 표에서는 state_dict 객체가 float16 형식으로 저장된 후 모델을 로드하기 위한 최소 요구사항을 보여줍니다. 또한 Vertex AI를 사용하여 모델을 로드하기 위한 권장 머신 유형을 보여줍니다. Vertex AI의 최소(및 기본) 디스크 크기는 100GB이지만 일부 모델에 따라 더 큰 디스크가 필요할 수 있습니다.

모델 이름 필요한 메모리 머신 유형 VM 메모리 VM 디스크
google/flan-t5-small 480MB 초과 e2-standard-4 16GB 100GB
google/flan-t5-base 1.5GB 초과 e2-standard-4 16GB 100GB
google/flan-t5-large 4.8GB 초과 e2-standard-4 16GB 100GB
google/flan-t5-xl 18GB 초과 e2-highmem-4 32GB 100GB
google/flan-t5-xxl 66GB 초과 e2-highmem-16 128GB 100GB
google/flan-ul2 120GB 초과 e2-highmem-16 128GB 150GB

Vertex AI 커스텀 작업을 사용하여 모델의 state_dict 객체를 Cloud Storage에 로드합니다.

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

다음을 바꿉니다.

  • MODEL_NAME: 모델의 이름입니다(예: google/flan-t5-xl).
  • VERTEX_AI_MACHINE_TYPE: Vertex AI 커스텀 작업을 실행할 머신의 유형입니다(예: e2-highmem-4).
  • DISK_SIZE_GB: VM의 디스크 크기(GB)입니다. 최소 크기는 100GB입니다.

모델 크기에 따라 모델을 로드하는 데 몇 분 정도 걸릴 수 있습니다. 상태를 보려면 Vertex AI 커스텀 작업 페이지로 이동합니다.

커스텀 작업으로 이동

파이프라인 실행

모델을 로드한 후 Dataflow 파이프라인을 실행합니다. 파이프라인을 실행하려면 각 작업자에 사용되는 모델 및 메모리를 모두 메모리에 맞게 조정해야 합니다.

다음 표에서는 추론 파이프라인을 실행하기 위해 권장되는 머신 유형을 보여줍니다.

모델 이름 머신 유형 VM 메모리
google/flan-t5-small n2-highmem-2 16GB
google/flan-t5-base n2-highmem-2 16GB
google/flan-t5-large n2-highmem-4 32GB
google/flan-t5-xl n2-highmem-4 32GB
google/flan-t5-xxl n2-highmem-8 64GB
google/flan-ul2 n2-highmem-16 128GB

파이프라인을 실행합니다.

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.
  • PROMPTS_TOPIC_ID: 모델에 전송할 입력 프롬프트의 주제 ID입니다.
  • RESPONSES_TOPIC_ID: 모델 응답의 주제 ID입니다.
  • MODEL_NAME: 모델의 이름입니다(예: google/flan-t5-xl).
  • BUCKET_NAME: 버킷의 이름입니다.
  • REGION: 작업을 배포할 리전(예: us-central1)
  • DATAFLOW_MACHINE_TYPE: 파이프라인을 실행할 VM입니다(예: n2-highmem-4).

모델이 작업자당 한 번만 로드되고 메모리가 부족하지 않도록 보장하려면 파이프라인 옵션 --experiments=no_use_multiple_sdk_containers를 설정하여 단일 프로세스를 사용하도록 작업자를 구성합니다. RunInference 변환이 여러 스레드와 동일한 모델을 공유하므로 스레드 수를 제한할 필요가 없습니다.

이 예시의 파이프라인은 CPU로 실행됩니다. 더 큰 모델의 경우 각 요청을 처리하는 데 더 많은 시간이 필요합니다. 더 빠른 응답이 필요하면 GPU를 사용 설정할 수 있습니다.

파이프라인 상태를 보려면 Dataflow 작업 페이지로 이동합니다.

작업으로 이동

모델에 질문하기

파이프라인 실행이 시작된 후 모델에 프롬프트를 제공하고 응답을 수신합니다.

  1. Pub/Sub에 메시지를 게시하여 프롬프트를 전송합니다. gcloud pubsub topics publish 명령어를 사용합니다.

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    PROMPT_TEXT를 제공하려는 프롬프트가 포함된 문자열로 바꿉니다. 프롬프트를 따옴표로 묶습니다.

    자체 프롬프트를 사용하거나 다음 예시 중 하나를 시도합니다.

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. 응답을 가져오려면 gcloud pubsub subscriptions pull 명령어를 사용합니다.

    모델 크기에 따라 모델이 응답을 생성하는 데 몇 분 정도 걸릴 수 있습니다. 모델이 클수록 배포 및 응답 생성에 더 오래 걸립니다.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    RESPONSES_SUBSCRIPTION_ID를 모델 응답의 구독 ID로 바꿉니다.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

    Google Cloud 프로젝트를 삭제합니다.

    gcloud projects delete PROJECT_ID

개별 리소스 삭제

  1. Python 가상 환경을 종료합니다.

    deactivate
  2. 파이프라인을 중지합니다.

    1. 실행 중인 Dataflow 작업의 작업 ID를 나열한 후 튜토리얼 작업의 작업 ID를 기록해 둡니다.

      gcloud dataflow jobs list --region=REGION --status=active
    2. 작업을 취소합니다.

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. 버킷과 그 안의 항목을 삭제합니다.

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. 주제 및 구독을 삭제합니다.

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Compute Engine 기본 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER[email protected] --role=SERVICE_ACCOUNT_ROLE
  6. 선택사항: Google 계정에서 역할을 취소합니다.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. 선택사항: 만든 사용자 인증 정보를 취소하고 로컬 사용자 인증 정보 파일을 삭제합니다.

    gcloud auth application-default revoke
  8. 선택사항: gcloud CLI에서 사용자 인증 정보를 취소합니다.

    gcloud auth revoke

다음 단계