Apache Hive からスキーマとデータを移行する

このドキュメントでは、Apache Hive から BigQuery にデータ、セキュリティ設定、パイプラインを移行する方法について説明します。

また、一括 SQL 変換を使用して複数の SQL スクリプトを一括で移行することも、インタラクティブ SQL 変換を使用してアドホック クエリ変換することもできます。Apache HiveQL は、両方の SQL 変換サービスで完全にサポートされています。

移行の準備を行う

以降のセクションでは、Hive から BigQuery へのデータ ウェアハウスの移行に役立つテーブル統計情報、メタデータ、セキュリティ設定などの情報を収集する方法について説明します。

ソーステーブル情報の収集

行の数、列の数、列のデータ型、サイズ、データの入力形式、ロケーションなど、Hive ソーステーブルに関する情報を収集します。この情報は移行プロセスで役立つほか、データの移行を検証する際にも有用です。corp という名前のデータベースに employees という名前の Hive テーブルが存在する場合、次のコマンドを使用してテーブル情報を収集します。

# Find the number of rows in the table
hive> SELECT COUNT(*) FROM corp.employees;

# Output all the columns and their data types
hive> DESCRIBE corp.employees;

# Output the input format and location of the table
hive> SHOW CREATE TABLE corp.employees;
Output:
…
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  'hdfs://demo_cluster/user/hive/warehouse/corp/employees'
TBLPROPERTIES (
…

# Get the total size of the table data in bytes
shell> hdfs dfs -du -s TABLE_LOCATION

ソーステーブルの形式の変換

Hive でサポートされる形式の中には、BigQuery に直接取り込むことができないものもあります。

Hive では、次の形式でデータを保存できます。

  • テキスト ファイル
  • RC ファイル
  • シーケンス ファイル
  • Avro ファイル
  • ORC ファイル
  • Parquet ファイル

BigQuery では、以下の任意のファイル形式で Cloud Storage からデータを読み込むことができます。

  • CSV
  • JSON(改行区切り)
  • Avro
  • ORC
  • Parquet

BigQuery では、スキーマ ファイルを使用せずに Avro、ORC、Parquet 形式でデータファイルを直接読み込むことができます。CSV や JSON(改行区切り)としてフォーマットされていないテキスト ファイルの場合、Avro 形式で Hive テーブルにデータをコピーできます。または、テーブル スキーマを BigQuery JSON スキーマに変換して取り込み時に提供できます。

Hive のアクセス制御設定の収集

Hive と BigQuery ではアクセス制御のメカニズムが異なります。役割、グループ、メンバー、それらに付与されている権限など、Hive アクセス制御のすべての設定を収集してください。BigQuery でデータセットごとのレベルでセキュリティ モデルをマッピングし、詳細な ACL を実装できます。たとえば、Hive ユーザーを Google アカウントに、HDFS グループを Google グループにマッピングできます。データセット レベルでアクセス権を設定できます。Hive のアクセス制御の設定を収集するためのコマンドは、以下のようになります。

# List all the users
> hdfs dfs -ls /user/ | cut -d/ -f3

# Show all the groups that a specific user belongs to
> hdfs groups user_name

# List all the roles
hive> SHOW ROLES;

# Show all the roles assigned to a specific group
hive> SHOW ROLE GRANT GROUP group_name

# Show all the grants for a specific role
hive> SHOW GRANT ROLE role_name;

# Show all the grants for a specific role on a specific object
hive> SHOW GRANT ROLE role_name on object_type object_name;

Hive では、必要な権限を持つユーザーであれば、テーブルの背後にある HDFS ファイルに直接アクセスできます。標準の BigQuery テーブルでは、データがテーブルに読み込まれた後、BigQuery ストレージに保存されます。BigQuery Storage Read API を使用してデータを読み取ることもできますが、IAM、行レベル、列レベルのすべてのセキュリティが引き続き適用されます。BigQuery の外部テーブルを使用して Cloud Storage のデータに対するクエリを実行する場合は、Cloud Storage へのアクセスも IAM によって制御されます。

コネクタを使って Apache Spark、Trino、Apache Hive でデータをクエリすることのできる BigLake テーブルを作成できます。BigQuery Storage API により、Cloud Storage や BigQuery のすべての BigLake テーブルに行レベルと列レベルのガバナンス ポリシーが適用されます。

データの移行

Hive データをオンプレミスまたは他のクラウドベースのソースクラスタから BigQuery に移行するには、次の 2 つのステップがあります。

  1. ソースクラスタから Cloud Storage へデータをコピーする
  2. Cloud Storage から BigQuery にデータを読み込む

以降のセクションでは、Hive データの移行、移行されたデータの検証、および継続的に取り込まれるデータの移行処理について説明します。これらの例は、非 ACID テーブル向けに作成されています。

列データの分割

Hive では、パーティション分割テーブル内のデータはディレクトリ構造で保存されます。テーブルの各パーティションは、パーティション列の特定の値に関連付けられます。データファイルそのものには、パーティション列のデータは含まれません。パーティション分割テーブル内のさまざまなパーティションを一覧表示するには SHOW PARTITIONS コマンドを使用します。

次の例では、ソース Hive テーブルが列 joining_datedepartment でパーティション分割されています。このテーブル内のデータファイルには、この 2 つの列に関するデータが含まれません。

hive> SHOW PARTITIONS corp.employees_partitioned
joining_date="2018-10-01"/department="HR"
joining_date="2018-10-01"/department="Analyst"
joining_date="2018-11-01"/department="HR"

これらの列をコピーする方法の一つは、BigQuery に読み込む前に、パーティション分割テーブルをパーティションに分割されないテーブルに変換することです。

  1. パーティション分割テーブルに似たスキーマで、非パーティション分割テーブルを作成します。
  2. ソース パーティション分割テーブルから非パーティション分割テーブルにデータを読み込みます。
  3. ステージング済みの非パーティション分割テーブルにあるデータファイルを Cloud Storage にコピーします。
  4. bq load コマンドを使用してデータを BigQuery に読み込み、time_partitioning_field 引数として、TIMESTAMP 型または DATE 型パーティション列(存在する場合)の名前を指定します。

Cloud Storage にデータをコピーする

データ移行の最初の手順は、Cloud Storage へのデータのコピーです。Hadoop DistCp を使用して、オンプレミスまたは他のクラウド クラスタから Cloud Storage にデータをコピーします。BigQuery でデータの保存場所となるデータセットと同じリージョンまたはマルチリージョンにあるバケットに、データを保存します。たとえば、東京リージョンにある既存の BigQuery データセットを宛先として使用するには、東京にある Cloud Storage リージョン バケットを選択してデータを格納する必要があります。

Cloud Storage バケットのロケーションを選択した後、次のコマンドを使用して、employees Hive テーブル ロケーションにあるすべてのデータファイルを一覧表示できます。

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

上記のすべてのファイルを Cloud Storage にコピーします。

> hadoop distcp
hdfs://demo_cluster/user/hive/warehouse/corp/employees
gs://hive_data/corp/employees

なお、Cloud Storage にデータを保存するとデータ ストレージ料金に沿って課金されます。

場合によっては、クエリジョブ用に作成された中間ファイルを保持するステージング ディレクトリが存在することがあります。bq load コマンドを実行する前に、このようなすべてのディレクトリを必ず削除する必要があります。

データの読み込み

BigQuery では、Cloud Storage から多くの形式でデータの一括読み込みを行うことができます。読み込みジョブを作成する前に、データの読み込み先となる BigQuery データセットが存在することを確認してください。

次のコマンドは、ACID ではないテーブルで Hive からコピーされるデータを表示します。

> gsutil ls gs://hive_data/corp/employees/
gs://hive-migration/corp/employees/
gs://hive-migration/corp/employees/000000_0
gs://hive-migration/corp/employees/000001_0
gs://hive-migration/corp/employees/000002_0

Hive データを BigQuery に読み込むには、bq load コマンドを使用します。URL でワイルドカード文字 * を使用すると、共通のオブジェクト接頭辞を持つ複数のファイルからデータを読み込むことができます。たとえば、同じ接頭辞 gs://hive_data/corp/employees/ を持つすべてのファイルを読み込むには、次のコマンドを使用します。

bq load --source_format=AVRO corp.employees gs://hive_data/corp/employees/*

ジョブが完了するまでに長時間を要する場合があるため、--sync フラグを False に設定すると、ジョブを非同期的に実行できます。bq load コマンドを実行すると、作成された読み込みジョブのジョブ ID が出力されます。このコマンドを使ってジョブのステータスをポーリングできます。これにはジョブの種類、ジョブの状態、そのジョブを実行したユーザーなどの詳細情報が含まれます。

それぞれのジョブ ID を使用して各読み込みジョブのステータスをポーリングし、エラーが発生して失敗したジョブがないかをチェックします。テーブルへのデータの読み込み時に障害が発生した場合、BigQuery では「All or None」方式の処理が行われます。エラーの解消に取り組んだ後で、別の読み込みジョブを再作成するのが安全です。詳細については、エラーのトラブルシューティングをご覧ください。

テーブルごと、プロジェクトごとに、読み込みジョブの割り当てが十分であることを確認してください。割り当てを超過すると、読み込みジョブは quotaExceeded エラーによって失敗します。

Cloud Storage から BigQuery へのデータの読み込み処理には、料金はかかりません。BigQuery に読み込まれた後のデータには、BigQuery のストレージ料金が適用されます。読み込みジョブが正常に終了したら、Cloud Storage に残っているファイルを削除することで、冗長データの保存に対して課金されないようにできます。

検証

データを正常に読み込んだ後、Hive テーブルの行数と BigQuery テーブルの行数を比較することで、移行後のデータを検証できます。行数、列数、パーティショニング フィールド、クラスタリング フィールドなど、BigQuery テーブルの詳細情報を取得するには、テーブル情報をご覧ください。その他の検証については、データ検証ツールを試してみることができます。

継続的な取り込み

継続的に Hive テーブルにデータを取り込む場合は、最初の移行を実行した後、増分的なデータ変更のみを BigQuery に移行します。一般的には、新しいデータを見つけて読み込む、繰り返し実行されるスクリプトを作成します。これを行うには多くの方法があります。以降のセクションでは、可能な 1 つの手法について説明します。

移行の進捗状況を 1 つの Cloud SQL データベース テーブルで追跡できます。以降のセクションでは、これをトラッキング テーブルと呼びます。最初に移行を実行するとき、進捗状況をトラッキング テーブルに保存します。その後の移行の実行では、トラッキング テーブル情報を使用することで、追加のデータが取り込まれて BigQuery に移行可能であるかどうか検出します。

増分データを区別するには INT64TIMESTAMP、または DATE タイプの識別子列を選択します。これは増分列と呼ばれます。

次の表は、増分列に TIMESTAMP 型を使用する、パーティションに分割されないテーブルの例を示しています。

+-----------------------------+-----------+-----------+-----------+-----------+
| timestamp_identifier        | column_2  | column_3  | column_4  | column_5  |
+-----------------------------+-----------+-----------+-----------+-----------+
| 2018-10-10 21\:56\:41       |           |           |           |           |
| 2018-10-11 03\:13\:25       |           |           |           |           |
| 2018-10-11 08\:25\:32       |           |           |           |           |
| 2018-10-12 05\:02\:16       |           |           |           |           |
| 2018-10-12 15\:21\:45       |           |           |           |           |
+-----------------------------+-----------+-----------+-----------+-----------+

次の表は、DATE 型の列 partition_column でパーティション分割されたテーブルの例を示しています。各パーティションに整数型の増分列 int_identifier があります。

+---------------------+---------------------+----------+----------+-----------+
| partition_column    | int_identifier      | column_3 | column_4 | column_5  |
+---------------------+---------------------+----------+----------+-----------+
| 2018-10-01          | 1                   |          |          |           |
| 2018-10-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-10-01          | 1000                |          |          |           |
| 2018-11-01          | 1                   |          |          |           |
| 2018-11-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-11-01          | 2000                |          |          |           |
+---------------------+---------------------+----------+----------+-----------+

以降のセクションでは、パーティション分割の有無、および増分列があるかどうかに基づいて、Hive データの移行を説明します。

増分列がないパーティション分割されていないテーブル

Hive にファイル圧縮がないと仮定します。Hive は新しいデータを取り込む際に新しいデータファイルを作成します。初回実行時にファイルのリストをトラッキング テーブルに保存します。これらのファイルを Cloud Storage にコピーして BigQuery に読み込むことで、Hive テーブルの初期移行を完了します。

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 3 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

最初の移行後に、一部のデータが Hive に取り込まれます。この増分データを BigQuery に移行するだけで済みます。その後の移行の実行では、データファイルを再度リストアップしてトラッキング テーブルの情報と比較し、まだ移行されていない新しいデータファイルを検出します。

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 5 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000003_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000004_0

この例では、テーブルの場所に 2 つの新しいファイルが存在します。これらの新しいデータファイルを Cloud Storage にコピーして既存の BigQuery テーブルに読み込むことで、データを移行します。

増分列があるパーティション分割されていないテーブル

この場合は、増分列の最大値を使用して、新しいデータが追加されたかどうかを判別できます。最初の移行を実行するときに、Hive テーブルに対するクエリを実行して増分列の最大値を取得し、それをトラッキング テーブルに保存します。

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2018-12-31 22:15:04

その後の移行の実行では、同じクエリを再び実行し、その時点での増分列の最大値を取得します。それをトラッキング テーブルの前回の最大値と比較することで、増分データの有無をチェックします。

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2019-01-04 07:21:16

現在の最大値が前回の最大値より大きい場合は、この例のように、増分データが Hive テーブルに取り込まれていることがわかります。増分データを移行するには、ステージング テーブルを作成し、増分データだけをそこに読み込みます。

hive> CREATE TABLE stage_employees LIKE corp.employees;
hive> INSERT INTO TABLE stage_employees SELECT * FROM corp.employees WHERE timestamp_identifier>"2018-12-31 22:15:04" and timestamp_identifier<="2019-01-04 07:21:16"

ステージング テーブルを移行します。そうするには、HDFS データファイルをリストアップして Cloud Storage にコピーし、既存の BigQuery テーブルに読み込みます。

増分列のないパーティション分割テーブル

パーティション分割テーブルへのデータの取り込みでは、新しいパーティションの作成、既存のパーティションへの増分データの追加、またはその両方が行われることがあります。こうしたシナリオでは、更新されたパーティションを特定することはできますが、それらの既存のパーティションに追加されたデータを特定するのは簡単ではありません。区別するための増分列がないからです。もう一つのオプションは HDFS スナップショットを取得して維持することですが、スナップショット処理では Hive のパフォーマンス上の懸念が生じるため、この機能は通常、無効に設定されています。

最初にテーブルを移行する際に、SHOW PARTITIONS コマンドを実行し、さまざまなパーティションに関する情報をトラッキング テーブルに格納します。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

上記の出力では、テーブル employees に 2 つのパーティションがあることがわかります。以下の簡略化されたトラッキング テーブルは、この情報を格納する方法を示しています。

partition_information file_path gcs_copy_status gcs_file_path bq_job_id ...
partition_column =2018-10-01
partition_column =2018-11-01

その後の移行の実行では、SHOW PARTITIONS コマンドを再び実行してすべてのパーティションをリストアップし、それをトラッキング テーブルのパーティション情報と比較することで、まだ移行されていない新しいパーティションの有無をチェックします。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

この例のように新しいパーティションが特定された場合は、ステージング テーブルを作成し、ソーステーブルから新しいパーティションのみを読み込みます。ファイルを Cloud Storage にコピーして既存の BigQuery テーブルに読み込むことで、ステージング テーブルを移行します。

増分列のあるパーティション分割テーブル

このシナリオでは、Hive テーブルがパーティションに分割されており、各パーティションに増分列があります。データが継続的に取り込まれ、この列の値が増加していきます。この場合、前のセクションで説明した方法で新しいパーティションを移行できます。さらに、既存のパーティションに取り込まれた増分データを移行することもできます。

テーブルを初めて移行するときに、各パーティションの増分列の最小値と最大値、およびテーブル パーティションに関する情報をトラッキング テーブルに保存します。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";
1 1000

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-11-01";
1 2000

上記の出力では、テーブル employees に 2 つのパーティションがあり、各パーティションの増分列に最小値と最大値があることがわかります。以下の簡略化されたトラッキング テーブルは、この情報を格納する方法を示しています。

partition_information inc_col_min inc_col_max file_path gcs_copy_status ...
partition_column =2018-10-01 1 1000
partition_column =2018-11-01 1 2000

その後の移行の実行では、同じクエリを実行して各パーティションのその時点での最大値を取得し、それをトラッキング テーブルの前回の最大値と比較します。

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";

この例では、2 つの新しいパーティションが識別され、既存のパーティション partition_column=2018-10-01 に増分データが取り込まれています。増分データがある場合は、ステージング テーブルを作成して増分データだけをそこに読み込み、データを Cloud Storage にコピーして、既存の BigQuery テーブルにそれを読み込みます。

セキュリティ設定

BigQuery では、IAM を使用してリソースへのアクセスを管理します。BigQuery の事前定義ロールは、特定のサービスへのアクセスを詳細に制御します。また、一般的なユースケースやアクセス制御パターンをサポートすることを目的としています。カスタムロールを使用して権限セットをカスタマイズすると、さらにきめ細かなアクセス制御を行うことができます。

テーブルとデータセットのアクセス制御では、ユーザー、グループ、サービス アカウントがテーブル、ビュー、データセットに対して実行できる処理を指定します。承認済みのビューを使用すると、元のソースデータへのアクセス権を付与することなく、特定のユーザーやグループとの間でクエリ結果を共有できます。行レベルのセキュリティ列レベルのセキュリティを使用すると、テーブル内の行または列にアクセスできるユーザーを制限できます。データ マスキングを使用すると、列へのアクセスを許可しながら、ユーザー グループに対して列データを選択的に難読化できます。

アクセス制御を適用するときには、次のユーザーとグループにアクセス権を付与できます。

  • User by e-mail: 個々の Google アカウントに、データセットへのアクセスが許可されます。
  • Group by e-mail: ある Google グループのすべてのメンバーに、データセットへのアクセスが許可されます。
  • Domain: 1 つの Google ドメイン内のすべてのユーザーとグループに、データセットへのアクセスが許可されます。
  • All Authenticated Users: すべての Google アカウント所有者にデータセットへのアクセスが許可されます(つまりデータセットが公開されます)。
  • Project Owners: すべてのプロジェクト オーナーにデータセットへのアクセスが許可されます。
  • Project Viewers: すべてのプロジェクト閲覧者にデータセットへのアクセスが許可されます。
  • Project Editors: すべてのプロジェクト編集者にデータセットへのアクセスが許可されます。
  • Authorized View: データセットに対する表示アクセスだけが許可されます。

データ パイプラインの変更

以降のセクションでは、Hive から BigQuery への移行時にデータ パイプラインを変更する方法について説明します。

Sqoop

既存のパイプラインで Sqoop を使って HDFS または Hive にデータをインポートして処理している場合は、データを Cloud Storage にインポートするようにジョブを修正します。

HDFS にデータをインポートしている場合は、次のいずれかを選択します。

Google Cloud で実行されている Hive の中にデータをインポートするよう Sqoop に指示するには、Hive テーブルを直接指定し、HDFS の代わりに Cloud Storage を Hive ウェアハウスとして使用します。これを行うには、プロパティ hive.metastore.warehouse.dir を Cloud Storage バケットに設定します。

Dataproc を使用して Sqoop ジョブを送信しデータを BigQuery にインポートすることによって、Hadoop クラスタを管理せずに Sqoop ジョブを実行できます。

Spark SQL と HiveQL

バッチ SQL トランスレータまたはインタラクティブ SQL トランスレータは、Spark SQL または HiveQL を GoogleSQL に自動的に変換できます。

Spark SQL や HiveQL を BigQuery に移行したくない場合は、Dataproc を使用するか、BigQuery コネクタを Apache Spark と併用することもできます。

Hive ETL

Hive に既存の ETL ジョブがある場合は、次の方法でそれらを変更して Hive から移行できます。

  • バッチ SQL トランスレータを使用して、Hive ETL ジョブを BigQuery ジョブに変換します。
  • BigQuery コネクタを使用することで、Apache Spark を使って BigQuery との間で読み取り/書き込みを行います。Dataproc を使用すると、エフェメラル クラスタを活用してコスト効率に優れた方法で Spark ジョブを実行できます。
  • Apache Beam SDK を使用してパイプラインを書き換え、それを Dataflow で実行します。
  • Apache Beam SQL を使用してパイプラインを書き換えます。

ETL パイプラインを管理するには、Cloud Composer(Apache Airflow)と Dataproc ワークフロー テンプレートを使用できます。Cloud Composer には、Oozie ワークフローを Cloud Composer ワークフローに変換するためのツールが用意されています。

Dataflow

Hive ETL パイプラインをフルマネージド クラウド サービスに移行したい場合は、Apache Beam SDK を使ってデータ パイプラインを作成し、Dataflow でそれを実行することを検討してください。

Dataflow は、データ処理パイプラインを実行するためのマネージド サービスです。これは、オープンソース フレームワーク Apache Beam を使って記述されたプログラムを実行します。Apache Beam は、バッチとストリーミングの両方のパイプラインを開発できる統合プログラミング モデルです。

標準的なデータ移動が行われるパイプラインの場合、Dataflow テンプレートを使用すれば、コードを記述しなくても Dataflow パイプラインを速やかに作成できます。この Google 提供のテンプレートを使用すると、Cloud Storage からテキスト ファイルを読み取り、変換を適用し、結果を BigQuery テーブルに書き込むことができます。

データ処理をさらに簡略化するには、Beam SQL もお試しください。SQL に似たステートメントを使用してデータを処理できます。