개인 정보 차등 보호 확장

이 문서에서는 BigQuery의 개인 정보 차등 보호를 위한 개인 정보 차등 보호를 확장하는 방법의 예시를 제공합니다.

개요

BigQuery를 사용하면 개인 정보 차등 보호를 멀티 클라우드 데이터 소스 및 외부 개인 정보 차등 보호 라이브러리로 확장할 수 있습니다. 이 문서에서는 BigQuery Omni를 사용하여 AWS S3와 같은 멀티 클라우드 데이터 소스에 개인 정보 차등 보호를 적용하는 방법, 원격 함수를 사용하여 외부 개인 정보 차등 보호 라이브러리를 호출하는 방법, Apache Spark 및 Apache Beam으로 실행할 수 있는 Python 라이브러리인 PipelineDP를 사용하여 개인 정보 차등 보호 집계를 수행하는 방법의 예시를 제공합니다.

개인 정보 차등 보호에 대한 자세한 내용은 개인 정보 차등 보호개인 정보 차등 보호 사용을 참조하세요.

BigQuery Omni를 사용한 개인 정보 차등 보호

BigQuery 개인 정보 차등 보호는 AWS S3와 같은 멀티 클라우드 데이터 소스에 대한 호출을 지원합니다. 다음 예시는 외부 데이터 소스인 foo.wikidata를 쿼리하고 개인 정보 차등 보호를 적용합니다. 개인 정보 차등 보호 절의 구문에 대한 자세한 내용은 개인 정보 차등 보호 절을 참조하세요.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

이 예시에서는 다음과 유사한 결과를 반환합니다.

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

BigQuery Omni 제한사항에 대한 자세한 내용은 제한사항을 참조하세요.

원격 함수로 외부 개인 정보 차등 보호 라이브러리 호출

원격 함수를 사용하여 외부 개인 정보 차등 보호 라이브러리를 호출할 수 있습니다. 다음 링크는 소매 판매 데이터 세트에 무집중 개인 정보 차등 보호를 사용할 수 있도록 원격 함수를 사용하여 Tumult Analytics가 호스팅하는 외부 라이브러리를 호출합니다.

Tumult Analytics 작업에 대한 자세한 내용은 Tumult Analytics 출시 게시물{: .external}을 참조하세요.

PipelineDP를 사용한 개인 정보 차등 보호 집계

PipelineDP는 개인 정보 차등 보호 집계를 수행하고 Apache Spark 및 Apache Beam으로 실행할 수 있는 Python 라이브러리입니다. BigQuery는 Python으로 작성된 Apache Spark 저장 프로시져를 실행할 수 있습니다. Apache Spark 저장 프로시져 실행에 대한 자세한 내용은 Apache Spark 저장 프로시져 작업을 참조하세요.

다음 예시는 PipelineDP 라이브러리를 사용하여 개인 정보 차등 보호 집계를 수행합니다. 여기에는 시카고 택시 운행 공개 데이터 세트를 사용하고 각 택시마다 운행 횟수, 운행 시 팁의 합계 및 평균을 계산합니다.

시작하기 전에

표준 Apache Spark 이미지에는 PipelineDP가 포함되지 않습니다. PipelineDP 저장 프로시져를 실행하기 전에 필요한 모든 종속 항목을 포함하는 Docker 이미지를 만들어야 합니다. 이 섹션에서는 Docker 이미지를 만들고 Google Cloud로 푸시하는 방법을 설명합니다.

시작하기 전에 로컬 머신에 Docker를 설치하고 Docker 이미지를 gcr.io로 푸시하도록 인증을 설정해야 합니다. Docker 이미지 푸시에 대한 자세한 내용은 이미지 내보내기 및 가져오기를 참조하세요.

Docker 이미지 만들기 및 푸시

필요한 종속 항목이 포함된 Docker 이미지를 만들고 푸시하려면 다음 단계를 따르세요.

  1. 로컬 폴더 DIR을 만듭니다.
  2. Python 3.9 버전의 Miniconda 설치 프로그램DIR에 다운로드합니다.
  3. 다음 텍스트를 Dockerfile에 저장합니다.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      #
      # Use mamba to install packages quickly.
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
      && ${CONDA_HOME}/bin/mamba install \
        conda \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        openblas \
        orc \
        pandas \
        pyarrow \
        pysal \
        pytables \
        python \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        virtualenv
    
      RUN apt install -y python3-pip
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
    
  4. 다음 명령어를 실행합니다.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    
    

    다음을 바꿉니다.

    • PROJECT_ID: Docker 이미지를 만들 프로젝트입니다.
    • DOCKER_IMAGE: Docker 이미지 이름입니다.

    이미지가 업로드됩니다.

PipelineDP 저장 프로시져 실행

  1. 저장 프로시져를 만들려면 CREATE PROCEDURE 문을 사용합니다.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;
    

    다음을 바꿉니다.

    • PROJECT_ID: 저장 프로시져를 만들려는 프로젝트입니다.
    • DATASET_ID: 저장 프로시져를 만들려는 데이터 세트입니다.
    • REGION: 프로젝트가 있는 리전입니다.
    • DOCKER_IMAGE: Docker 이미지 이름입니다.
    • CONNECTION_ID: 연결의 이름입니다.
    • TABLE_NAME: 테이블의 이름입니다.
  2. CALL 문을 사용하여 프로시져를 호출합니다.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
    

    다음을 바꿉니다.

    • PROJECT_ID: 저장 프로시져를 만들려는 프로젝트입니다.
    • DATASET_ID: 저장 프로시져를 만들려는 데이터 세트입니다.

다음 단계