Skip to content

Latest commit

 

History

History
384 lines (315 loc) · 14.8 KB

File metadata and controls

384 lines (315 loc) · 14.8 KB

Google Cloud Pub/Sub Connector

This connector provides access to reading data from and writing data to Google Cloud Pub/Sub.

Usage

This library is currently not published to any repositories. Usage requires building it from source and packaging it with your Flink application.

Prerequisites:

  • Unix-like environment (we use Linux, Mac OS X)
  • Git
  • Maven (we recommend version 3.8.6)
  • Java 11
git clone https://github.com/GoogleCloudPlatform/pubsub.git
cd pubsub/flink-connector
mvn clean package -DskipTests

The resulting jars can be found in the target directory of the respective module.

Flink applications built with Maven can include the connector as a dependency in their pom.xml file by adding:

<dependency>
  <groupId>com.google.pubsub.flink</groupId>
  <artifactId>flink-connector-gcp-pubsub</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

An example Flink application can be found under flink-connector/flink-examples-gcp-pubsub/.

Learn more about Flink connector packaging here.

Configuring Access to Google Cloud Pub/Sub

Requests sent to Google Cloud Pub/Sub must be authenticated. By default, the connector library authenticates requests using Application Default Credentials.

Credentials can also be in the source and sink builders. The connector library prioritizes using credentials set in builders over Application Default Credentials. The snippet below shows how to authenticate using an OAuth 2.0 token.

final String tokenValue = "...";

// Authenticate with OAuth 2.0 token when pulling messages from a subscription.
PubSubSource.<String>builder().setCredentials(
    GoogleCredentials.create(new AccessToken(tokenValue, /* expirationTime= */ null)))

// Authenticate with OAuth 2.0 token when publishing messages to a topic.
PubSubSink.<String>builder().setCredentials(
    GoogleCredentials.create(new AccessToken(tokenValue, /* expirationTime= */ null)))

The authenticating principal must be authorized to pull messages from a subscription when using Pub/Sub source or publish messages to a topic when using Pub/Sub sink. Authorization is managed through Google IAM and can be configured either at the Google Cloud project-level or the Pub/Sub resource-level.

Pub/Sub Source

Pub/Sub source streams data from a single Google Cloud Pub/Sub subscription with an at-least-once guarantee. The sample below shows the minimal configurations required to build Pub/Sub source.

PubSubSource.<String>builder()
    .setDeserializationSchema(
        PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
    .setProjectName("my-project-name")
    .setSubscriptionName("my-subscription-name")
    .build()

Subscription

Pub/Sub source only supports streaming messages from pull subscriptions. Push subscriptions are not supported.

Pub/Sub source can create parallel readers to to the same subscription. Since Google Cloud Pub/Sub has no notion of subscription partitions or splits, a message can be received by any reader. Google Cloud Pub/Sub automatically load balances message delivery across readers.

Deserialization Schema

PubSubDeserializationSchema<T> is required to define how PubsubMessage is deserialized into an output that is suitable for processing.

For convenience, PubSubDeserializationSchema.dataOnly(DeserializationSchema<T> schema) can be used if the field PubsubMessage.data stores all of the data to be processed. Additionally, Flink provides basic schemas for further convenience.

// Deserialization output is PubsubMessage.data field converted to String type.
PubSubSource.<String>builder()
    .setDeserializationSchema(PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))

Implementing PubSubDeserializationSchema<T> is required to process data stored in fields other than PubsubMessage.data.

Boundedness

Pub/Sub source streams unbounded data, only stopping when a Flink job stops or fails.

Checkpointing

Checkpointing is required to use Pub/Sub source. When a checkpoint completes, all messages delivered before the last successful checkpoint are acknowledged to Google Cloud Pub/Sub. In case of failure, messages delivered after the last successful checkpoint are unacknowledged and will automatically be redelivered. Note that there is no message delivery state stored in checkpoints, so retained checkpoints are not necessary to resume using Pub/Sub source.

StreamingPull Connections and Flow Control

Each Pub/Sub source subtask opens and manages StreamingPull connections to Google Cloud Pub/Sub. The number of connections per subtask can be set using PubSubSource.<OutputT>builder().setParallelPullCount (defaults to 1). Opening more connections can increase the message throughput delivered to each subtask. Note that the underlying subscriber client library creates an executor with 5 threads for each connection opened, so too many connections can be detrimental to performance.

Google Cloud Pub/Sub servers pause message delivery to a StreamingPull connection when a flow control limit is exceeded. There are two forms of flow control:

  1. Message delivery throughput
  2. Outstanding message count / bytes

StreamingPull connections are limited to pulling messages at 10 MB/s. This limit cannot be configured. Opening more connections is recommended when observing message throughput flow control. See Pub/Sub quotas and limits for a full list of limitations.

The other form of flow control is based on outstanding messages--when a message has been delivered but not yet acknowledged. Since outstanding messages are acknowledged when a checkpoint completes, flow control limits for outstanding messages are effectively per-checkpoint interval limits. Infrequent checkpointing can cause connections to be flow controlled due to too many outstanding messages.

The snippet below shows how to configure connection count and flow control settings.

PubSubSource.<OutputT>builder()
    // Open 5 StreamingPull connections.
    .setParallelPullCount(5)
    // Allow up to 10,000 message deliveries per checkpoint interval.
    .setMaxOutstandingMessagesCount(10_000L)
    // Allow up to 1000 MB in cumulatitive message size per checkpoint interval.
    .setMaxOutstandingMessagesBytes(1000L * 1024L * 1024L)  // 1000 MB

A Pub/Sub source subtask with these options is able to:

  • Pull messages at up to 50 MB/s
  • Receive up to 50,000 messages or 5000 MB in cumulative message size per checkpoint interval

Message Leasing

Pub/Sub source automatically extends the acknowledgement deadline of messages. This means a checkpointing interval can be longer than a message's acknowledgement deadline without causing message redelivery. Note that message acknowledgement deadlines are extended for up to 1 hour, after which, they are redelivered by Google Cloud Pub/Sub.

All Options

Required Builder Options

Builder Method Default Value Description
setSubscriptionName(String subscriptionName) (none) The ID of the subscription from which Pub/Sub source consumes messages.
setProjectName(String projectName) (none) The ID of the GCP project that owns the subscription from which Pub/Sub source consumes messages.
setDeserializationSchema(PubSubDeserializationSchema<OutputT> deserializationSchema) (none) How PubsubMessage is deserialized when Pub/Sub source receives a message.

Optional Builder Options

Builder Method Default Value Description
setMaxOutstandingMessagesCount(Long count) 1000L The maximum number of messages that can be delivered to a StreamingPull connection within a checkpoint interval.
setMaxOutstandingMessagesBytes(Long bytes) 100L * 1024L * 1024L (100 MB) The maximum number of cumulative bytes that can be delivered to a StreamingPull connection within a checkpoint interval.
setParallelPullCount(Integer parallelPullCount) 1 The number of StreamingPull connections to open for pulling messages from Google Cloud Pub/Sub.
setCredentials(Credentials credentials) (none) The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to pull messages from the subscription. If not set, then Pub/Sub source uses Application Default Credentials.
setEndpoint(String endpoint) pubsub.googleapis.com:443 The Google Cloud Pub/Sub gRPC endpoint from which messages are pulled. Defaults to the global endpoint, which routes requests to the nearest regional endpoint.

Pub/Sub Sink

Pub/Sub sink publishes data to a single Google Cloud Pub/Sub topic with an at-least-once guarantee. The sample below shows the minimal configurations required to build Pub/Sub sink.

PubSubSink.<String>builder()
                .setSerializationSchema(
                    PubSubSerializationSchema.dataOnly(new SimpleStringSchema()))
                .setProjectName("my-project-name")
                .setTopicName("my-topic-name")
                .build()

Serialization Schema

PubSubSerializationSchema<T> is required to define how incoming data is serialized to PubsubMessage.

For convenience, PubSubSerializationSchema.dataOnly(SerializationSchema<T> schema) can be used to write output data to the field PubsubMessage.data. The type of SerializationSchema<T> must be one of the supported types matching ByteString.copyFrom(schema.serialize(T ...)).

Implementing PubSubSerializationSchema<T> is required to publish messages with attributes or ordering keys.

All Options

Required Builder Options

Builder Method Default Value Description
setTopicName(String topicName) (none) The ID of the topic to which Pub/Sub sink publishes messages.
setProjectName(String projectName) (none) The ID of the GCP project that owns the topic to which Pub/Sub sink publishes messages.
setSerializationSchema(PubSubSerializationSchema<T> serializationSchema) (none) How incoming data is serialized to PubsubMessage.

Optional Builder Options

Builder Method Default Value Description
setCredentials(Credentials credentials) (none) The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to publish messages to the topic. If not set, then Pub/Sub sink uses Application Default Credentials.
setEnableMessageOrdering(Boolean enableMessageOrdering) false This must be set to true when publishing messages with an ordering key.
setEndpoint(String endpoint) pubsub.googleapis.com:443 The Google Cloud Pub/Sub gRPC endpoint to which messages are published. Defaults to the global endpoint, which routes requests to the nearest regional endpoint.

Integration Testing

Instead of integration tests reading from and writing to production Google Cloud Pub/Sub, tests can run against a local instance of the Pub/Sub emulator. Pub/Sub source and sink will automatically try connecting to the emulator if the environment variable PUBSUB_EMULATOR_HOST is set. Alternatively, you can manually set the emulator endpoint in your builder by calling .setEndpoint(EmulatorEndpoint.toEmulatorEndpoint("localhost:8085")).

Steps to run tests against the Pub/Sub emulator:

  1. Ensure that the required dependencies and emulator are installed.
  2. Start the emulator using the Google Cloud CLI.
  3. Run the test with the environment variable PUBSUB_EMULATOR_HOST set to where the emulator is running. For example, if the emulator is listening on port 8085 and running on the same machine as the test, set PUBSUB_EMULATOR_HOST=localhost:8085.

The emulator can also be started within a Docker container while testing. The tests under flink-connector/flink-connector-gcp-pubsub-e2e-tests/ illustrate how to do this.