Application Integration でサポートされているコネクタをご覧ください。

Apache Kafka トリガー

Apache Kafka トリガーは、Apache Kafka 接続に登録している Apache Kafka イベントに基づいて統合を呼び出すことができるコネクタ イベント トリガーです。

始める前に

Apache Kafka トリガーの新しい接続を作成または構成する場合は、プロジェクトに対する次の Identity and Access Management(IAM)ロールがあることを確認してください。

  • Connectors 管理者( roles/connectors.admin
  • ロールの付与については、アクセスの管理をご覧ください。

Apache Kafka トリガーに使用するサービス アカウントに、次の IAM ロールを付与します。

制約事項

Apache Kafka トリガーには次の制限があります。

  • イベント サブスクリプション用に SASL_SSL セキュリティ プロトコルをサポートします。
  • イベント サブスクリプションの mTLS はサポートしません。

Apache Kafka トリガーを追加する

統合に Apache Kafka トリガーを追加する手順は次のとおりです。

  1. Google Cloud コンソールで [Application Integration] ページに移動します。

    Application Integration に移動

  2. ナビゲーション メニューで [統合] をクリックします。

    [統合リスト] ページが開き、Google Cloud プロジェクトで使用可能なすべての統合が一覧表示されます。

  3. 既存の統合を選択するか、[統合の作成] をクリックして新しい統合を作成します。

    新しい統合を作成する場合:

    1. [統合の作成] ペインで名前と説明を入力します。
    2. 統合のリージョンを選択します。
    3. 統合用のサービス アカウントを選択します。統合のサービス アカウントの詳細は、統合ツールバーの [統合の概要] ペインでいつでも変更または更新できます。
    4. [作成] をクリックします。

    統合エディタで統合が開きます。

  4. 統合エディタのナビゲーション バーで、[トリガー] をクリックして、使用可能なトリガーのリストを表示します。
  5. 統合エディタで、[Apache Kafka トリガー] 要素をクリックして配置します。
  6. Apache Kafka トリガーを構成するには、Integration Connectors で使用可能な既存の Apache Kafka 接続を使用するか、インライン接続作成オプションを使用して新しい Apache Kafka 接続を作成します。

既存の接続を使用して Apache Kafka トリガーを構成する

Integration Connectors の既存の Apache Kafka 接続を使用して、Apache Kafka トリガーを構成できます。なお、Apache Kafka トリガーコネクタ イベント トリガーであるため、トリガーの構成にはイベント サブスクリプションを有効にした Apache Kafka 接続のみが使用できます。

新しい Apache Kafka 接続を使用して Apache Kafka トリガーを構成する方法については、新しい接続を使用して Apache Kafka トリガーを構成するをご覧ください。

既存の Apache Kafka 接続を使用して Apache Kafka トリガーを構成するには、次の手順を行います。

  1. 統合エディタで [Apache Kafka トリガー] 要素をクリックして、トリガー構成ペインを開きます。
  2. [トリガーの構成] をクリックします。
  3. [Connector Event Trigger Editor] ページで、構成の詳細を次のとおりに入力します。
    1. リージョン: Apache Kafka 接続のリージョンを選択します。
    2. 接続: 使用する Apache Kafka 接続を選択します。

      Application Integration には、アクティブでイベント サブスクリプションが有効になっている Apache Kafka 接続のみが表示されます。

    3. トピック名: トピック名を入力します。
    4. コンシューマ グループ ID: 必要に応じて、Kafka コンシューマが属するコンシューマー グループの名前を指定するコンシューマ グループ ID を入力します。
    5. パーティション: 特定のパーティションに登録する場合は、そのパーティションを入力します。パーティションは、トピックを複数のパーツに分割し、各パーツをクラスタ内の異なるノードに格納する方法です。
    6. Initial Offset: オフセット リセット戦略を選択します。
      • [最も早い] を選択すると、コンシューマ グループの存続前に生成されたメッセージを含む、消費されていないメッセージが接続で消費されます。
      • [最新] を選択すると、コンシューマ グループの作成後に生成されたメッセージが接続で使用されます。
    7. サービス アカウント: Apache Kafka トリガーに必要な IAM ロールを持つサービス アカウントを選択します。
  4. [完了] をクリックします。

新しい接続を使用して Apache Kafka トリガーを構成する

新しい Apache Kafka 接続を使用して Apache Kafka トリガーを構成するには、次の手順を行います。

  1. 統合エディタで [Apache Kafka トリガー] 要素をクリックして、トリガー構成ペインを開きます。
  2. [Apache Kafka トリガーの構成] をクリックします。
  3. [リージョン] フィールドはスキップします。
  4. [接続] をクリックし、リストから [接続の作成] オプションを選択します。[接続を作成] ペインが表示されます。
  5. [ロケーション] ステップで、新しい Apache Kafka 接続のロケーションを選択します。
    1. リージョン:リストからリージョンを選択します。
    2. [次へ] をクリックします。
  6. [接続の詳細] ステップで、新しい Apache Kafka 接続の詳細を入力します。
    1. コネクタのバージョン: リストから Apache Kafka コネクタの使用可能なバージョンを選択します。
    2. 接続名: Apache Kafka 接続の名前を入力します。
    3. (省略可)説明文: 接続の説明を入力します。
    4. (省略可)Cloud Logging を有効にする: 接続のすべてのログデータを保存するには、このチェックボックスをオンにします。
    5. サービス アカウント: Apache Kafka 接続に必要な IAM ロールを持つサービス アカウントを選択します。
    6. Apache Kafka 接続では、[イベント サブスクリプション、エンティティ、アクションを有効にする] オプションがデフォルトで選択されています。
    7. 型検出スキーム: MessageOnly を選択します。
    8. レジストリ サービス: トピック スキーマの操作に使用される Schema Registry サービス。
    9. レジストリタイプ: 特定のトピックに指定されたスキーマのタイプ。
    10. レジストリ バージョン: 指定したトピックの RegistryUrl から読み取られたスキーマのバージョン。
    11. レジストリ ユーザー: RegistryUrl で指定されたサーバーを承認するユーザー名またはアクセスキーの値。
    12. レジストリ パスワード: RegistryUrl で指定されたサーバーで承認するパスワードと秘密鍵のキー値を含む Secret Manager シークレット。
    13. 必要に応じて、接続ノードの設定を構成します。

      • ノードの最小数: 接続ノードの最小数を入力します。
      • ノードの最大数: 接続ノードの最大数を入力します。

      ノードは、トランザクションを処理する接続の単位(またはレプリカ)です。1 つの接続でより多くのトランザクションを処理するには、より多くのノードが必要になります。逆に、より少ないトランザクションを処理するには、より少ないノードが必要になります。ノードがコネクタの料金に与える影響については、接続ノードの料金をご覧ください。値を入力しない場合は、デフォルトで最小ノード数は 2 に設定され(可用性を高めるため)、最大ノード数は 50 に設定されます。

    14. 必要に応じて、[+ ラベルを追加] をクリックして キーと値ペアの形式でラベルを接続に追加します。
    15. SSL を有効にする: このフィールドは SSL を有効にするかどうかを設定します。
    16. [次へ] をクリックします。
  7. [宛先] セクションに、接続するリモートホスト(バックエンド システム)の詳細を入力します。
    1. Destination Type: 宛先の種類を選択します。
      1. [ホストアドレス] フィールドに、宛先のホスト名または IP アドレスを指定します。
        1. バックエンド システムへのプライベート接続を確立する場合は、次のようにします。
          1. PSC サービス アタッチメントを作成します。
          2. エンドポイント アタッチメント作成してから、ホストアドレスフィールドにあるエンドポイント アタッチメントの詳細を入力します。
        2. セキュリティをさらに強化してバックエンド システムへのパブリック接続を確立する場合は、接続用の静的送信 IP アドレスの構成を検討してから、特定の静的 IP アドレスのみを許可リストに登録するようファイアウォール ルールを構成します。

      他の宛先を入力するには、[+ 宛先を追加] をクリックします。

    2. [次へ] をクリックします。
  8. [認証] セクションで、認証の詳細を入力します。
    1. [認証タイプ] を選択し、関連する詳細を入力します。

      Apache Kafka 接続でサポートされる認証タイプは次のとおりです。

      • ユーザー名とパスワード
        • ユーザー名: 接続に使用する Apache Cassandra ユーザー名。
        • パスワード: Apache Kafka ユーザー名に関連付けられているパスワードを含む Secret Manager のシークレット。
        • Auth Scheme: 認証に使用されるスキーム

          Apache Kafka 接続でサポートされる認証スキームは次のとおりです。

          • Plain
          • SCRAM-SHA-1
          • SCRAM-SHA-256
      • 利用不可

        匿名ログインを使用する場合は、[Not Available] を選択します。

    2. [次へ] をクリックします。
  9. デッドレター構成を入力します。デッドレターを構成すると、指定された Pub/Sub トピックに未処理のイベントが書き込まれます。次の詳細情報を入力します。
    1. デッドレター プロジェクト ID: デッドレター Pub/Sub トピックを構成した Google Cloud プロジェクト ID。
    2. デッドレター トピック: 未処理イベントの詳細を書き込む Pub/Sub トピック。
  10. [次へ] をクリックします。
  11. Review: 接続と認証の詳細を確認します。
  12. [作成] をクリックします。

トリガー出力

Apache Kafka トリガーがイベント サブスクリプションの構成を完了するまでに数分かかります。イベント サブスクリプションのステータスは、トリガー構成ペインの [Event Subscription details] で確認できます。

Apache Kafka トリガーは、次の状態を使用して、イベント サブスクリプションのステータスを示します。

  • Creating: トリガーがイベント サブスクリプションに登録中であることを示します。
  • Active: トリガーがイベント サブスクリプションに正常に登録されたことを示します。
  • Error: 構成されたイベント サブスクリプションに問題があることを示します。

[Event Subscription details] セクションには、イベント サブスクリプションのステータスに加えて、接続リージョン、接続名、イベント サブスクリプション名などの詳細が表示されます。

トリガー出力変数

各イベントでは、Apache Kafka トリガーにより、ダウンストリーム タスクで使用できる ConnectorEventPayload 出力変数(JSON 形式)が生成されます。出力変数には、Apache Kafka イベントのペイロードが含まれます。例:

  {
    "message": "{\"key\":\"18\",\"topic\":\"test\",\"value\":\"hello world\",\"partition\":0,\"offset\":12,\"headers\":\"{}\",\"timestamp\":1712755076824,\"serializedMessagesize\":14}",
    "contentType": "text/plain"
 }

イベント サブスクリプションを表示する

Integration Connectors で接続に関連付けられているすべてのイベント サブスクリプションの表示と管理を行う手順は、次のとおりです。

  1. [Integration Connectors] > [接続] ページに移動します。

    [接続] ページに移動

  2. サブスクリプションを表示する接続をクリックします。
  3. [イベント サブスクリプション] タブをクリックします。

    ここでは、接続のすべてのサブスクリプションが表示されます。

Apache Kafka トリガーを編集する

Apache Kafka トリガーを編集して、接続構成とイベント サブスクリプションの詳細を変更または更新できます。

Apache Kafka トリガーを編集するには、次の手順を行います。

  1. 統合エディタで [Apache Kafka トリガー] 要素をクリックして、トリガー構成ペインを開きます。
  2. [Apache Kafka トリガーの構成] をクリックします。
  3. [Connector Event Trigger Editor] ページで、次の操作を行います。
    1. 構成済みのイベント サブスクリプションを保持するには、[保持] をクリックします。それ以外の場合は、[削除] をクリックします。
    2. 必要に応じて、接続構成とイベント サブスクリプションの詳細を更新します。
    3. [完了] をクリックします。
  4. 更新された接続とイベント サブスクリプションの詳細は、トリガー構成ペインの [Event Subscription details] で確認できます。