Datastream to BigQuery (Stream) template

The Datastream to BigQuery template is a streaming pipeline that reads Datastream data and replicates it into BigQuery. The template reads data from Cloud Storage using Pub/Sub notifications and replicates it into a time partitioned BigQuery staging table. Following replication, the template executes a MERGE in BigQuery to upsert all change data capture (CDC) changes into a replica of the source table.

The template handles creating and updating the BigQuery tables managed by the replication. When data definition language (DDL) is required, a callback to Datastream extracts the source table schema and translates it into BigQuery data types. Supported operations include the following:

  • New tables are created as data is inserted.
  • New columns are added to BigQuery tables with null initial values.
  • Dropped columns are ignored in BigQuery and future values are null.
  • Renamed columns are added to BigQuery as new columns.
  • Type changes are not propagated to BigQuery.

It's recommended to run this pipeline using at-least-once streaming mode, because the template performs de-duplication when it merges data from a temporary BigQuery table to the main BigQuery table. This step in the pipeline means there is no additional benefit to using exactly-once streaming mode.

Pipeline requirements

  • A Datastream stream that is ready to or already replicating data.
  • Cloud Storage Pub/Sub notifications are enabled for the Datastream data.
  • BigQuery destination datasets are created and the Compute Engine Service Account has been granted administrator access to them.
  • A primary key is necessary in the source table for the destination replica table to be created.
  • A MySQL or Oracle source database. PostgreSQL databases are not supported.

Template parameters

Required parameters

  • inputFilePattern : The file location for Datastream file output in Cloud Storage, in the format: gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat : The format of the output files produced by Datastream. Value can be 'avro' or 'json'. Defaults to: avro.
  • gcsPubSubSubscription : The Pub/Sub subscription used by Cloud Storage to notify Dataflow of new files available for processing, in the format: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : The name of the dataset that contains staging tables. This parameter supports templates, for example {_metadata_dataset}_log or my_dataset_log. Normally, this parameter is a dataset name. Defaults to: {_metadata_dataset}.
  • outputDatasetTemplate : The name of the dataset that contains the replica tables. This parameter supports templates, for example {_metadata_dataset} or my_dataset. Normally, this parameter is a dataset name. Defaults to: {_metadata_dataset}.
  • deadLetterQueueDirectory : The path that Dataflow uses to write the dead-letter queue output. This path must not be in the same path as the Datastream file output. Defaults to empty.

Optional parameters

  • streamName : The name or the template for the stream to poll for schema information. Defaults to: {_metadata_stream}. The default value is usually enough.
  • rfcStartDateTime : The starting DateTime to use to fetch data from Cloud Storage (https://tools.ietf.org/html/rfc3339). Defaults to: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : The number of concurrent DataStream files to read. Default is 10.
  • outputProjectId : The ID of the Google Cloud project that contains the BigQuery datasets to output data into. The default for this parameter is the project where the Dataflow pipeline is running.
  • outputStagingTableNameTemplate : The template to use to name the staging tables. For example, {_metadata_table}). Defaults to: {_metadata_table}_log.
  • outputTableNameTemplate : The template to use for the name of the replica tables, for example {_metadata_table}. Defaults to: {_metadata_table}.
  • ignoreFields : Comma-separated fields to ignore in BigQuery. Defaults to: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. (Example: _metadata_stream,_metadata_schema).
  • mergeFrequencyMinutes : The number of minutes between merges for a given table. Defaults to: 5.
  • dlqRetryMinutes : The number of minutes between DLQ Retries. Defaults to: 10.
  • dataStreamRootUrl : The Datastream API root URL. Defaults to: https://datastream.googleapis.com/.
  • applyMerge : Whether to disable MERGE queries for the job. Defaults to: true.
  • mergeConcurrency : The number of concurrent BigQuery MERGE queries. Only effective when applyMerge is set to true. Defaults to: 30.
  • partitionRetentionDays : The number of days to use for partition retention when running BigQuery merges. Defaults to: 1.
  • useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If true, at-least-once semantics are used for the Storage Write API. Otherwise, exactly-once semantics are used. Defaults to: false.
  • javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is 0, UDF reloading is disabled. The default value is 0.
  • pythonTextTransformGcsPath : The Cloud Storage path pattern for the Python code containing your user-defined functions. (Example: gs://your-bucket/your-transforms/*.py).
  • pythonRuntimeVersion : The runtime version to use for this Python UDF.
  • pythonTextTransformFunctionName : The name of the function to call from your JavaScript file. Use only letters, digits, and underscores. (Example: transform_udf1).
  • runtimeRetries : The number of times a runtime will be retried before failing. Defaults to: 5.
  • useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec : When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the CDC data, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the Datastream to BigQuery template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
    8. Click Run job.

    gcloud

    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: the Cloud Storage path to Datastream data. For example: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: the Pub/Sub subscription to read changed files from. For example: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: your BigQuery dataset name.
    • BIGQUERY_TABLE: your BigQuery table template. For example, {_metadata_schema}_{_metadata_table}_log

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: the Cloud Storage path to Datastream data. For example: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: the Pub/Sub subscription to read changed files from. For example: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: your BigQuery dataset name.
    • BIGQUERY_TABLE: your BigQuery table template. For example, {_metadata_schema}_{_metadata_table}_log

    What's next