Pub/Sub to MongoDB template

The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF).

If errors occur while processing records, the template writes them to a BigQuery table, along with the input message. For example, errors might occur due to schema mismatch, malformed JSON, or while executing transforms. Specify the table name in the deadletterTable parameter. If the table doesn't exist, the pipeline automatically creates it.

Pipeline requirements

  • The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.
  • The MongoDB cluster must exist and should be accessible from the Dataflow worker machines.

Template parameters

Required parameters

  • inputSubscription : Name of the Pub/Sub subscription. (Example: projects/your-project-id/subscriptions/your-subscription-name).
  • mongoDBUri : Comma separated list of MongoDB servers. (Example: host1:port,host2:port,host3:port).
  • database : Database in MongoDB to store the collection. (Example: my-db).
  • collection : Name of the collection in the MongoDB database. (Example: my-collection).
  • deadletterTable : The BigQuery table that stores messages caused by failures, such as mismatched schema, malformed JSON, and so on. (Example: your-project-id:your-dataset.your-table-name).

Optional parameters

  • batchSize : Batch size used for batch insertion of documents into MongoDB. Defaults to: 1000.
  • batchSizeBytes : Batch size in bytes. Defaults to: 5242880.
  • maxConnectionIdleTime : Maximum idle time allowed in seconds before connection timeout occurs. Defaults to: 60000.
  • sslEnabled : Boolean value indicating whether the connection to MongoDB is SSL enabled. Defaults to: true.
  • ignoreSSLCertificate : Boolean value indicating whether to ignore the SSL certificate. Defaults to: true.
  • withOrdered : Boolean value enabling ordered bulk insertions into MongoDB. Defaults to: true.
  • withSSLInvalidHostNameAllowed : Boolean value indicating whether an invalid hostname is allowed for the SSL connection. Defaults to: true.
  • javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is 0, UDF reloading is disabled. The default value is 0.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: a single line from an input CSV file.
  • Output: a stringified JSON document to insert into MongoDB.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to MongoDB template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • INPUT_SUBSCRIPTION: the Pub/Sub subscription (for example, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: the MongoDB server addresses (for example, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: the name of the MongoDB database (for example, users)
  • COLLECTION: the name of the MongoDB collection (for example, profiles)
  • UNPROCESSED_TABLE: the name of the BigQuery table (for example, your-project:your-dataset.your-table-name)

What's next