扩展差分隐私

本文档举例说明了如何扩展 BigQuery 差分隐私。

概览

BigQuery 可让您将差分隐私扩展到多云数据源和外部差分隐私库。本文档举例说明如何使用 BigQuery Omni 为 AWS S3 等多云数据源应用差分隐私,如何使用远程函数调用外部差分隐私库,以及如何使用 PipelineDP(一种可以使用 Apache Spark 和 Apache Beam 运行的 Python 库)执行差分隐私聚合。

如需详细了解差分隐私,请参阅使用差分隐私

使用 BigQuery Omni 实现差分隐私

BigQuery 差分隐私支持调用 AWS S3 等多云数据源。以下示例查询外部数据源 foo.wikidata,并应用差分隐私。如需详细了解差分隐私子句的语法,请参阅差分隐私子句

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

该示例会返回类似于以下内容的结果:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

如需详细了解 BigQuery Omni 的限制,请参阅限制

使用远程函数调用外部差分隐私库

您可以使用远程函数调用外部差分隐私库。以下链接使用远程函数调用由 Tumult Analytics 托管的外部库,以对零售销售数据集使用零集中差分隐私。

如需了解如何使用 Tumult Analytics,请参阅 Tumult Analytics 发布博文 {: .external}。

使用 PipelineDP 进行差分隐私聚合

PipelineDP 是一个 Python 库,用于执行差分隐私聚合,并且可以使用 Apache Spark 和 Apache Beam 运行。BigQuery 可以运行用 Python 编写的 Apache Spark 存储过程。如需详细了解如何运行 Apache Spark 存储过程,请参阅使用 Apache Spark 的存储过程

以下示例使用 PipelineDP 库执行差分隐私聚合。它使用芝加哥出租车行程公共数据集并计算每辆出租车的行程数量以及这些行程的总和与平均值。

须知事项

标准 Apache Spark 映像不包含 PipelineDP。在运行 PipelineDP 存储过程之前,您必须创建包含所有必要依赖项的 Docker 映像。本部分介绍如何创建 Docker 映像并将其推送到 Google Cloud。

在开始之前,请确保已在本地机器上安装 Docker 并设置用于将 Docker 映像推送到 gcr.io 的身份验证。如需详细了解如何推送 Docker 映像,请参阅推送和拉取映像

创建和推送 Docker 映像

如需创建和推送具有所需依赖项的 Docker 映像,请按照以下步骤操作:

  1. 创建本地文件夹 DIR
  2. 将具有 Python 3.9 版本的 Miniconda 安装程序下载到 DIR
  3. 将以下文本保存到 Dockerfile

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      #
      # Use mamba to install packages quickly.
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
      && ${CONDA_HOME}/bin/mamba install \
        conda \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        openblas \
        orc \
        pandas \
        pyarrow \
        pysal \
        pytables \
        python \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        virtualenv
    
      RUN apt install -y python3-pip
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
    
  4. 运行以下命令:

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    
    

    请替换以下内容:

    • PROJECT_ID:您要在其中创建 Docker 映像的项目。
    • DOCKER_IMAGE:Docker 映像名称。

    映像已上传。

运行 PipelineDP 存储过程

  1. 如需创建存储过程,请使用 CREATE PROCEDURE 语句。

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;
    

    请替换以下内容:

    • PROJECT_ID:要在其中创建存储过程的项目。
    • DATASET_ID:要在其中创建存储过程的数据集。
    • REGION:您的项目所在的区域。
    • DOCKER_IMAGE:Docker 映像名称。
    • CONNECTION_ID:连接的名称。
    • TABLE_NAME:表的名称。
  2. 使用 CALL 语句调用过程。

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
    

    请替换以下内容:

    • PROJECT_ID:要在其中创建存储过程的项目。
    • DATASET_ID:要在其中创建存储过程的数据集。

后续步骤