具有 Python UDF 的 Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 读取 JSON 格式的消息并将其写入 BigQuery 表中。或者,您可以提供用 Python 编写的用户定义的函数 (UDF) 来处理收到的消息。
流水线要求
- BigQuery 表必须存在且具有架构。
- Pub/Sub 消息数据必须使用 JSON 格式,或者您必须提供将消息数据转换为 JSON 的 UDF。JSON 数据必须与 BigQuery 表架构匹配。例如,如果 JSON 载荷的格式为
{"k1":"v1", "k2":"v2"}
,则 BigQuery 表必须具有两个名为k1
和k2
的字符串列。 - 指定
inputSubscription
或inputTopic
参数,但不能同时指定这两者。
模板参数
参数 | 说明 |
---|---|
outputTableSpec |
要写入的 BigQuery 表,格式为 "PROJECT_ID:DATASET_NAME.TABLE_NAME" 。
|
inputSubscription |
可选:要读取的 Pub/Sub 订阅,格式为 "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME" 。
|
inputTopic |
可选:要读取的 Pub/Sub 主题,格式为 "projects/PROJECT_ID/topics/TOPIC_NAME" 。
|
outputDeadletterTable |
未能到达输出表的消息的 BigQuery 表,格式为 "PROJECT_ID:DATASET_NAME.TABLE_NAME" 。
如果该表不存在,则系统会在流水线运行时创建该表。如果您未指定此参数,则系统会改为使用值 "OUTPUT_TABLE_SPEC_error_records" 。
|
pythonExternalTextTransformGcsPath |
可选:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py 。 |
pythonExternalTextTransformFunctionName |
可选:您要使用的 Python 用户定义的函数 (UDF) 的名称。 |
useStorageWriteApi |
可选:如果为 true ,则流水线使用 BigQuery Storage Write API。默认值为 false 。如需了解详情,请参阅使用 Storage Write API。
|
useStorageWriteApiAtLeastOnce |
可选:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true 。如需使用“正好一次”语义,请将参数设置为 false 。仅当 useStorageWriteApi 为 true 时,此参数才适用。默认值为 false 。 |
numStorageWriteApiStreams |
可选:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApi 为 true 且 useStorageWriteApiAtLeastOnce 为 false ,则必须设置此参数。
|
storageWriteApiTriggeringFrequencySec |
可选:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApi 为 true 且 useStorageWriteApiAtLeastOnce 为 false ,则必须设置此参数。
|
用户定义的函数
(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
函数规范
UDF 具有以下规范:
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to BigQuery with Python UDF template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
STAGING_LOCATION
:暂存本地文件的位置(例如gs://your-bucket/staging
)TOPIC_NAME
:您的 Pub/Sub 主题名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
STAGING_LOCATION
:暂存本地文件的位置(例如gs://your-bucket/staging
)TOPIC_NAME
:您的 Pub/Sub 主题名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
模板源代码
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@MultiTemplate({
@Template(
name = "PubSub_to_BigQuery_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to BigQuery",
description =
"The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
+ "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
+ "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
optionsClass = Options.class,
skipOptions = {
"pythonExternalTextTransformGcsPath",
"pythonExternalTextTransformFunctionName",
},
flexContainerName = "pubsub-to-bigquery",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
"The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
},
streaming = true,
supportsAtLeastOnce = true,
supportsExactlyOnce = true),
@Template(
name = "PubSub_to_BigQuery_Xlang",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to BigQuery with Python UDFs",
type = Template.TemplateType.XLANG,
description =
"The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
+ "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
+ "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
optionsClass = Options.class,
skipOptions = {
"javascriptTextTransformGcsPath",
"javascriptTextTransformFunctionName",
"javascriptTextTransformReloadIntervalMinutes"
},
flexContainerName = "pubsub-to-bigquery-xlang",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
"The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
},
streaming = true,
supportsAtLeastOnce = true,
supportsExactlyOnce = true)
})
public class PubSubToBigQuery {
/** The log to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
/** The tag for the main output for the UDF. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the main output of the json transformation. */
public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
/** The tag for the dead-letter output of the udf. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The tag for the dead-letter output of the json to table row transform. */
public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
/** The default suffix for error tables if dead letter table is not specified. */
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/** Pubsub message/string coder for pipeline. */
public static final FailsafeElementCoder<PubsubMessage, String> CODER =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
/** String/String Coder for FailsafeElement. */
public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options
extends PipelineOptions,
BigQueryStorageApiStreamingOptions,
PythonExternalTextTransformerOptions,
DataflowPipelineWorkerPoolOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
helpText =
"The BigQuery table to write to, formatted as `\"PROJECT_ID:DATASET_NAME.TABLE_NAME\"`.")
String getOutputTableSpec();
void setOutputTableSpec(String value);
@TemplateParameter.PubsubTopic(
order = 2,
optional = true,
description = "Input Pub/Sub topic",
helpText =
"The Pub/Sub topic to read from, formatted as `\"projects/<PROJECT_ID>/topics/<TOPIC_NAME>\"`.")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 3,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"The Pub/Sub subscription to read from, "
+ "formatted as `\"projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>\"`.")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.BigQueryTable(
order = 4,
optional = true,
description =
"Table for messages failed to reach the output table (i.e., Deadletter table)",
helpText =
"The BigQuery table to use for messages that failed to reach the output table, "
+ "formatted as `\"PROJECT_ID:DATASET_NAME.TABLE_NAME\"`. If the table "
+ "doesn't exist, it is created when the pipeline runs. "
+ "If this parameter is not specified, "
+ "the value `\"OUTPUT_TABLE_SPEC_error_records\"` is used instead.")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String value);
@TemplateParameter.Boolean(
order = 5,
optional = true,
parentName = "useStorageWriteApi",
parentTriggerValues = {"true"},
description = "Use at at-least-once semantics in BigQuery Storage Write API",
helpText =
"When using the Storage Write API, specifies the write semantics. "
+ "To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"
+ ", set this parameter to true. "
+ "To use exactly-once semantics, set the parameter to `false`. "
+ "This parameter applies only when `useStorageWriteApi` is `true`. "
+ "The default value is `false`.")
@Default.Boolean(false)
@Override
Boolean getUseStorageWriteApiAtLeastOnce();
void setUseStorageWriteApiAtLeastOnce(Boolean value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
// options.setWorkerDiskType(
//
// "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
/*
* Steps:
* 1) Read messages in from Pub/Sub
* 2) Transform the PubsubMessages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/
/*
* Step #1: Read messages in from Pub/Sub
* Either from a Subscription or Topic
*/
PCollection<PubsubMessage> messages = null;
if (useInputSubscription) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"ReadPubSubTopic",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
PCollectionTuple convertedTableRows =
messages
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write records that failed table row transformation
* or conversion out to BigQuery deadletter table.
*/
PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
!Strings.isNullOrEmpty(options.getOutputDeadletterTable())
? options.getOutputDeadletterTable()
: options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
// 5) Insert records that failed insert into deadletter table
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
!Strings.isNullOrEmpty(options.getOutputDeadletterTable())
? options.getOutputDeadletterTable()
: options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
/**
* The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
* {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
* applying an optional UDF to the input. The executions of the UDF and transformation to {@link
* TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
* inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
* output a {@link PCollectionTuple} which contains all output and dead-letter {@link
* PCollection}.
*
* <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
*
* <ul>
* <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
* successfully processed by the optional UDF.
* <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which failed processing during the UDF execution.
* <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
* JSON to {@link TableRow} objects.
* <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
* records which couldn't be converted to table rows.
* </ul>
*/
static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private final Options options;
PubsubMessageToTableRow(Options options) {
this.options = options;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
boolean useJavascriptUdf =
!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
boolean usePythonUdf =
!Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
if (useJavascriptUdf && usePythonUdf) {
throw new IllegalArgumentException(
"Either javascript or Python gcs path must be provided, but not both.");
}
PCollectionTuple udfOut;
if (usePythonUdf) {
PCollection<Row> udfRowsOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply(
"MapToRecord",
PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
.pubSubMappingFunction())
.setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
.apply(
"InvokeUDF",
PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
.setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
.setFunctionName(options.getPythonExternalTextTransformFunctionName())
.build());
udfOut =
udfRowsOut.apply(
"MapRowsToFailsafeElements",
ParDo.of(new RowToPubSubFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
.withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
} else {
udfOut =
input
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setReloadIntervalMinutes(
options.getJavascriptTextTransformReloadIntervalMinutes())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
}
// Convert the records which were successfully processed by the UDF into TableRow objects.
PCollectionTuple jsonToTableRowOut =
udfOut
.get(UDF_OUT)
.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
// Re-wrap the PCollections so we can return a single PCollectionTuple
return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
.and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
.and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
.and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
}
}
/**
* The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
* {@link FailsafeElement} class so errors can be recovered from and the original message can be
* output to a error records table.
*/
static class PubsubMessageToFailsafeElementFn
extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
}
}
}
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。