Importthema erstellen

Mit einem Importthema können Sie Daten aus externen Quellen in Pub/Sub aufnehmen. Anschließend können Sie die Daten an jedes von Pub/Sub unterstützte Ziel streamen.

Pub/Sub unterstützt Amazon Kinesis Data Streams als externe Quelle für die Aufnahme von Daten in ein Importthema.

Themenübersicht importieren

Für ein Importthema ist die Aufnahme des Themas als Property aktiviert. So kann ein Importthema Streamingdaten aufnehmen. Sie können die Aufnahme zu einem Thema mithilfe der Console, der Google Cloud CLI, REST-Aufrufe oder der Clientbibliotheken aktivieren. Im Rahmen der Verwaltung des Importthemas bietet Google Cloud das Monitoring und die Skalierung der Aufnahmepipeline.

Ohne Importthema ist für das Streamen von Daten aus einer Datenquelle in Pub/Sub ein zusätzlicher Dienst erforderlich. Dieser zusätzliche Dienst ruft Daten aus der ursprünglichen Quelle ab und veröffentlicht sie in Pub/Sub. Der zusätzliche Dienst kann eine Streaming-Engine wie Apache Spark oder ein benutzerdefinierter Dienst sein. Außerdem müssen Sie diesen Dienst konfigurieren, bereitstellen, ausführen, skalieren und überwachen.

Im Folgenden finden Sie eine Liste mit wichtigen Informationen zu Importthemen:

  • Ähnlich wie bei einem Standardthema können Sie weiterhin manuell in einem Importthema veröffentlichen.

  • Sie können nur eine einzelne Aufnahmequelle an ein Importthema anhängen.

Wir empfehlen, Themen für Streamingdaten zu importieren. Wenn Sie eine Batch-Datenaufnahme in BigQuery anstelle einer Streamingdatenaufnahme in Betracht ziehen, können Sie den BigQuery Data Transfer Service (BQ DTS) ausprobieren. Wenn Sie Daten in Cloud Storage aufnehmen möchten, ist Storage Transfer Service (STS) eine gute Option.

Hinweise

Erforderliche Rollen und Berechtigungen zum Verwalten von Importthemen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Pub/Sub Editor“ (roles/pubsub.editor) für Ihr Thema oder Projekt zu gewähren, damit Sie die Berechtigungen erhalten, die Sie zum Erstellen und Verwalten von Importthemen benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen und Verwalten von Importthemen erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um Importthemen zu erstellen und zu verwalten:

  • Importthema erstellen: pubsub.topics.create
  • Importthema löschen: pubsub.topics.delete
  • Importthema abrufen: pubsub.topics.get
  • Importthema auflisten: pubsub.topics.list
  • In einem Importthema veröffentlichen: pubsub.topics.publish
  • Importthema aktualisieren: pubsub.topics.update
  • Rufen Sie die IAM-Richtlinie für ein Importthema ab: pubsub.topics.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Importthema: pubsub.topics.setIamPolicy

Möglicherweise können Sie diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können die Zugriffssteuerung auf Projektebene und auf der Ebene einzelner Ressourcen konfigurieren.

Föderierte Identität für den Zugriff auf Kinesis Data Streams einrichten

Mit der Workload Identity-Föderation können Google Cloud-Dienste auf Arbeitslasten zugreifen, die außerhalb von Google Cloud ausgeführt werden. Mit der Identitätsföderation müssen Sie keine Anmeldedaten verwalten oder an Google Cloud weitergeben, um auf Ihre Ressourcen in anderen Clouds zuzugreifen. Stattdessen können Sie die Identitäten der Arbeitslasten selbst verwenden, um sich bei Google Cloud zu authentifizieren und auf Ressourcen zuzugreifen.

Dienstkonto in Google Cloud erstellen

Dieser Schritt ist optional. Wenn Sie bereits ein Dienstkonto haben, können Sie es in diesem Verfahren verwenden, anstatt ein neues Dienstkonto zu erstellen. Wenn Sie ein vorhandenes Dienstkonto verwenden, fahren Sie im nächsten Schritt mit Eindeutige ID des Dienstkontos aufzeichnen fort.

Bei einem Importthema verwendet Pub/Sub das Dienstkonto als Identität, um auf Ressourcen von AWS zuzugreifen.

Weitere Informationen zum Erstellen eines Dienstkontos, einschließlich der Voraussetzungen, erforderlichen Rollen und Berechtigungen sowie Richtlinien zur Benennung finden Sie unter Dienstkonten erstellen. Nachdem Sie ein Dienstkonto erstellt haben, müssen Sie möglicherweise 60 Sekunden oder länger warten, bevor Sie das Dienstkonto verwenden können. Dieses Verhalten ist darauf zurückzuführen, dass Lesevorgänge letztendlich konsistent sind. Es kann einige Zeit dauern, bis das neue Dienstkonto sichtbar wird.

Eindeutige ID des Dienstkontos notieren

Sie benötigen eine eindeutige Dienstkonto-ID, um eine Rolle in der AWS-Konsole einzurichten.

  1. Rufen Sie in der Google Cloud Console die Seite Dienstkontodetails auf.

    Zum Dienstkonto

  2. Klicken Sie auf das Dienstkonto, das Sie gerade erstellt haben, oder auf das Dienstkonto, das Sie verwenden möchten.

  3. Notieren Sie sich auf der Seite Dienstkontodetails die eindeutige ID.

    Sie benötigen die ID im Abschnitt Rolle in AWS mit einer benutzerdefinierten Vertrauensrichtlinie erstellen.

Dem Pub/Sub-Dienstkonto die Rolle „Ersteller von Dienstkonto-Tokens“ hinzufügen

Mit der Rolle „Dienstkontotoken-Ersteller“ (roles/iam.serviceAccountTokenCreator) können Hauptkonten kurzlebige Anmeldedaten für ein Dienstkonto erstellen. Diese Tokens oder Anmeldedaten werden verwendet, um die Identität des Dienstkontos zu übernehmen.

Weitere Informationen zu Identitätswechsel für Dienstkonten.

Dabei können Sie auch die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) hinzufügen. Weitere Informationen zu der Rolle und warum Sie sie hinzufügen, finden Sie unter Dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ hinzufügen.

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Aktivieren Sie die Option Von Google bereitgestellte Rollenzuweisungen einschließen.

  3. Suche nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Wählen Sie die Rolle Ersteller von Dienstkontotokens (roles/iam.serviceAccountTokenCreator) aus.

  7. Klicken Sie auf Speichern.

Richtlinie in AWS erstellen

Sie benötigen eine Richtlinie in AWS, damit sich Pub/Sub bei AWS authentifizieren kann, damit Pub/Sub Daten aus einem AWS Kinesis-Datenstream aufnehmen kann. Bevor Sie eine AWS-Richtlinie erstellen, erstellen Sie einen Kinesis-Datenstream und einen registrierten Nutzer dafür. Wir empfehlen diese Vorgehensweise, damit Sie die Berechtigungen auf den jeweiligen Stream einschränken können.

  • Weitere Informationen zum Erstellen eines AWS Kinesis-Datenstreams finden Sie unter Kinesis-Datenstream.

  • Weitere Informationen zur AWS Kinesis Data Stream API, die zum Registrieren von Nutzern verwendet wird, finden Sie unter RegisterStreamConsumer.

  • Weitere Methoden und Informationen zum Erstellen einer Richtlinie in AWS finden Sie unter IAM-Richtlinien erstellen.

Führen Sie die folgenden Schritte aus, um eine Richtlinie in AWS zu erstellen:

  1. Melden Sie sich in der AWS-Verwaltungskonsole an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie im Navigationsbereich der Console für IAM auf Zugriffsverwaltung > Richtlinien.

  3. Klicken Sie auf Richtlinie erstellen.

  4. Wählen Sie unter Dienst auswählen die Option Kinesis aus.

  5. Wählen Sie für Aktion zulässig Folgendes aus:

    • List > ListShards.

      Durch diese Aktion wird die Berechtigung zum Auflisten der Shards in einem Stream gewährt und es werden Informationen zu jedem Shard bereitgestellt.

    • Lesen > SubscribeToShard.

      Diese Aktion gewährt die Berechtigung, einen bestimmten Shard mit verbessertem Fan-Out zu überwachen.

    • Lesen > DescribeStreamConsumer beschreiben.

      Mit dieser Aktion wird die Berechtigung zum Abrufen der Beschreibung eines registrierten Streamnutzers gewährt.

    Diese Berechtigungen umfassen das Lesen aus dem Stream. Pub/Sub unterstützt nur das Lesen aus einem Kinesis-Stream mit erweitertem Fan-Out mithilfe der Streaming SubscribeToShard API.

  6. Wenn Sie unter Ressourcen die Richtlinie auf einen bestimmten Stream oder Nutzer beschränken möchten (empfohlen), geben Sie den Nutzer-ARN und den Stream-ARN an.

  7. Klicken Sie auf Weitere Berechtigungen hinzufügen.

  8. Geben Sie unter Dienst auswählen die Option STS ein und wählen Sie die Option aus.

  9. Wählen Sie für Aktion zulässig die Option Schreiben > AssumeRoleWithWebIdentity aus.

    Diese Aktion gewährt die Berechtigung zum Abrufen eines Satzes temporärer Sicherheitsanmeldedaten, damit Pub/Sub sich über die Identitätsföderation beim Kinesis-Datenstream authentifizieren kann.

  10. Klicken Sie auf Next (Weiter).

  11. Geben Sie einen Namen und eine Beschreibung für die Richtlinie ein.

  12. Klicken Sie auf Richtlinie erstellen.

Rolle in AWS mithilfe einer benutzerdefinierten Vertrauensrichtlinie erstellen

Sie müssen in AWS eine Rolle erstellen, damit sich Pub/Sub bei AWS authentifizieren kann, um Daten aus Kinesis Data Streams aufzunehmen.

Führen Sie die folgenden Schritte aus, um eine Rolle mithilfe einer benutzerdefinierten Vertrauensrichtlinie zu erstellen:

  1. Melden Sie sich in der AWS-Verwaltungskonsole an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie im Navigationsbereich der Console für IAM auf Rollen.

  3. Klicken Sie auf Rolle erstellen.

  4. Wählen Sie unter Vertrauenswürdige Entität auswählen die Option Benutzerdefinierte Vertrauensrichtlinie aus.

  5. Geben oder fügen Sie im Abschnitt Benutzerdefinierte Vertrauensrichtlinie Folgendes ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Ersetzen Sie <SERVICE_ACCOUNT_UNIQUE_ID> durch die eindeutige ID des Dienstkontos, die Sie unter Eindeutige ID des Dienstkontos notieren notiert haben.

  6. Klicken Sie auf Next (Weiter).

  7. Wählen Sie unter Berechtigungen hinzufügen die soeben erstellte benutzerdefinierte Richtlinie aus.

  8. Klicken Sie auf Next (Weiter).

  9. Geben Sie einen Rollennamen und eine Rollenbeschreibung ein.

  10. Klicken Sie auf Rolle erstellen.

Pub/Sub-Publisher-Rolle dem Pub/Sub-Dienstkonto hinzufügen

Sie müssen dem Pub/Sub-Dienstkonto eine Publisher-Rolle zuweisen, damit Pub/Sub aus AWS Kinesis Data Streams im Importthema veröffentlichen kann.

Veröffentlichung für alle Themen aktivieren

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Aktivieren Sie die Option Von Google bereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Wählen Sie die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) aus.

  7. Klicken Sie auf Speichern.

Veröffentlichung aus einem einzelnen Thema aktivieren

Wenn Sie die Berechtigung zum Veröffentlichen nur für ein bestimmtes Importthema erteilen möchten, gehen Sie so vor:

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den Befehl gcloud pubsub topics add-iam-policy-binding aus:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"
    

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Themen-ID des Importthemas.

    • ist PROJECT_NUMBER die Projektnummer. Informationen zum Anzeigen der Projektnummer finden Sie unter Projekte identifizieren.

Rolle „Dienstkontonutzer“ zum Dienstkonto hinzufügen

Die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) enthält die Berechtigung iam.serviceAccounts.actAs, mit der ein Hauptkonto ein Dienstkonto an die Aufnahmeeinstellungen des Importthemas anhängen und dieses Dienstkonto für die föderierte Identität verwenden kann.

Führen Sie diese Schritte aus:

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Klicken Sie für das Hauptkonto, das die Themenaufrufe zum Erstellen oder Aktualisieren ausgibt, auf die Schaltfläche Hauptkonto bearbeiten.

  3. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  4. Wählen Sie die Rolle „Dienstkontonutzer“ (roles/iam.serviceAccountUser) aus.

  5. Klicken Sie auf Speichern.

Importthema erstellen

Weitere Informationen zu den Attributen eines Themas finden Sie unter Attribute eines Themas.

Prüfen Sie, ob Sie die folgenden Schritte ausgeführt haben:

So erstellen Sie ein Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

  3. Geben Sie im Feld Themen-ID eine ID für das Importthema ein.

    Weitere Informationen zur Benennung von Themen finden Sie in den Benennungsrichtlinien.

  4. Wählen Sie Standardabo hinzufügen aus.

  5. Wählen Sie Aufnahme aktivieren aus.

  6. Wählen Sie als Aufnahmequelle Amazon Kinesis Data Streams aus.

  7. Geben Sie die folgenden Informationen ein:

    • Kinesis-Stream-ARN: Der ARN für den Kinesis-Datenstream, den Sie in Pub/Sub aufnehmen möchten. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • Kinesis-Nutzer-ARN: Der ARN der Nutzerressource, die beim AWS Kinesis Data Stream registriert ist. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN der AWS-Rolle: Der ARN der AWS-Rolle. Das ARN-Format der Rolle sieht so aus: arn:aws:iam:${Account}:role/${RoleName}.

    • Dienstkonto: Das Dienstkonto, das Sie im Schritt Dienstkonto in Google Cloud erstellen erstellt haben.

  8. Klicken Sie auf Thema erstellen.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den Befehl gcloud pubsub topics create aus:

    gcloud pubsub topics create TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Themen-ID.

    • KINESIS_STREAM_ARN ist der ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Nutzerressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist der ARN der AWS-Rolle. Das ARN-Format der Rolle sieht so aus: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Folgen Sie der Einrichtungsanleitung für Node.js in der Pub/Sub-Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Weitere Informationen zu ARNs finden Sie unter Amazon Resource Names (ARNs) und IAM-Kennungen.

Falls Probleme auftreten, lesen Sie den Hilfeartikel Fehlerbehebung beim Importieren.

Importthema bearbeiten

Sie können die Einstellungen für die Datenquelle der Aufnahme eines Importthemas bearbeiten. Führen Sie diese Schritte aus:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Importthema.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Aktualisieren Sie die Felder, die Sie ändern möchten.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Themen-ID. Dieses Feld kann nicht aktualisiert werden.

    • KINESIS_STREAM_ARN ist der ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Nutzerressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format sieht so aus: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist der ARN der AWS-Rolle. Das ARN-Format der Rolle sieht so aus: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.

Kontingente und Limits für Importthemen

Der Publisher-Durchsatz für Importthemen ist an das Veröffentlichungskontingent des Themas gebunden. Weitere Informationen finden Sie unter Pub/Sub-Kontingente und -Limits.

Nächste Schritte