Apache Kafka to BigQuery template

The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.

Pipeline requirements

  • The output BigQuery table must exist.
  • The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.
  • The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format.

Template parameters

Required parameters

  • outputTableSpec : The BigQuery output table location to write the output to. For example, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.Depending on the createDisposition specified, the output table might be created automatically using the user provided Avro schema.

Optional parameters

  • bootstrapServers : The host address of the running Apache Kafka broker servers in a comma-separated list. Each host address must be in the format 35.70.252.199:9092. (Example: localhost:9092,127.0.0.1:9093).
  • inputTopics : The Apache Kafka input topics to read from in a comma-separated list. (Example: topic1,topic2).
  • outputDeadletterTable : BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, "outputTableSpec_error_records" is used instead. (Example: your-project-id:your-dataset.your-table-name).
  • messageFormat : The message format. Can be AVRO or JSON. Defaults to: JSON.
  • avroSchemaPath : Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.
  • useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
  • readBootstrapServers : Kafka Bootstrap Server list, separated by commas. (Example: localhost:9092,127.0.0.1:9093).
  • kafkaReadTopics : Kafka topic(s) to read input from. (Example: topic1,topic2).
  • kafkaReadOffset : The Kafka Offset to read from. Defaults to: latest.
  • kafkaReadUsernameSecretId : Secret Manager secret ID for the SASL_PLAIN username. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version}. (Example: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
  • kafkaReadPasswordSecretId : Secret Manager secret ID for the SASL_PLAIN password. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version} (Example: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
  • javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is 0, UDF reloading is disabled. The default value is 0.
  • writeDisposition : The BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) value. For example, WRITE_APPEND, WRITE_EMPTY, or WRITE_TRUNCATE. Defaults to WRITE_APPEND.
  • createDisposition : The BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). For example, CREATE_IF_NEEDED and CREATE_NEVER. Defaults to CREATE_IF_NEEDED.
  • useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec : When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the Kafka record value, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Kafka to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
  8. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • BIGQUERY_TABLE: your BigQuery table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • BIGQUERY_TABLE: your BigQuery table name
  • KAFKA_TOPICS: the Apache Kakfa topic list. If multiple topics are provided, please follow instructions on how to escape commas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to use

    For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.

  • KAFKA_SERVER_ADDRESSES: the Apache Kafka broker server IP address list. Each IP address should have with it the port number the server is accessible from. For example: 35.70.252.199:9092. If multiple addresses are provided, follow instructions on how to escape commas.

For more information, see Write data from Kafka to BigQuery with Dataflow.

What's next