Achieving Open Lakehouse Interoperability with Delta UniForm

How to integrate Snowflake with Databricks Unity Catalog to refresh Iceberg table metadata

Usman Zubair
13 min readFeb 9, 2024
Lakehouse Interoperability with Delta UniForm

In recent years, organizations have undergone a significant architectural shift, prioritizing owning their data through the adoption of open storage formats to facilitate data democratization and interoperability. Pioneered by Databricks, this modern open architecture, known as the Lakehouse, seamlessly combines the flexibility, cost-efficiency, and scalability of data lakes with the robust data management capabilities of data warehouses. This empowers organizations to enable Business Intelligence and Generative AI use cases on all your data, at scale.

With the lakehouse as the foundation, Databricks recently launched the next evolution of data systems; the Databricks Data Intelligence Platform. It provides an open, unified foundation for all data and governance, and is powered by an Intelligence Engine that understands the unique semantics of your data. While the Databricks Data Intelligence Platform can meet the end-to-end Data and AI needs of any organization, many find themselves dependent on existing consumption patterns relying on cloud warehouses (i.e. BigQuery, Snowflake) for critical reporting and BI needs. How can these organizations accelerate towards owning their data (i.e. the open lakehouse architecture), without causing disruption to their reporting and BI systems?

Delta UniForm: a Universal Format for Lakehouse Interoperability

The emergence of the lakehouse architecture has seen multiple data storage formats take center stage. Linux Foundation Delta Lake, Apache Iceberg, and Apache Hudi all provide the necessary capabilities to enable data democratization and interoperability. So what gives?

With platforms and systems choosing to standardize on different table formats (i.e. Iceberg → Snowflake and BigQuery, Delta → Databricks and Microsoft Fabric), organizations are often faced with a dilemma; which format to standardize their Lakehouse on?

Delta UniForm (short for Delta Lake Universal Format) solves this problem with one format to rule them all. It offers a simple, easy to implement, seamless unification of table formats without creating additional data copies or silos. “Metadata” is generated automatically for Iceberg (Hudi coming soon) alongside Delta Lake. This allows for use of the most suitable tool for each data workload while operating on a single data source, with perfect interoperability across the three formats.

Delta UniForm — one format to rule them all

As organizations continue the shift towards owning all their data, Delta UniForm is the solution to address any apprehensions related to supporting existing consumption patterns (i.e. BigQuery / Snowflake).

Query Delta UniForm Tables as Iceberg using Snowflake

With your data centralized in your Lakehouse as Delta UniForm tables (how to enable UniForm), let’s discuss how to query these tables from Snowflake as Iceberg tables using SF’s catalog integration (aka external, unmanaged).

Catalog Integration facilitates read-only access to Iceberg tables stored in your cloud storage by specifying the source of the Iceberg metadata. This allows for curation of data in an upstream platform such as the Databricks Data Intelligence Platform while still providing the ability to support existing downstream consumption patterns using Snowflake.

Snowflake currently supports two flavors of catalog integrations:

  • Object Storage — for Iceberg metadata that can be referenced from an external cloud storage location (i.e. Delta UniForm table with automatically generated Iceberg metadata)
  • AWS Glue — for fetching Iceberg metadata location reference from an Amazon Glue Catalog table

Both flavors can be used for querying Delta UniForm tables from Snowflake but for the purpose of this blog we will focus on object storage.

A key component of managing an Iceberg table in Snowflake, using object storage, is the METADATA_FILE_PATH that must point to the latest metadata.json for the Iceberg dataset. While this may appear straightforward initially (i.e. when creating a table), the complexity arises when this has to be updated for every new commit (i.e. append, upsert, delete etc.) and at scale for thousands of tables. How can we streamline management of external Iceberg tables in Snowflake?

Databricks Unity Catalog to the rescue!

Databricks Unity Catalog

Unity Catalog provides a unified governance solution for all your data and AI needs, making it a key pillar of the Lakehouse architecture. Specifically for Iceberg metadata management, Unity Catalog implements the open Iceberg Catalog REST API in accordance with the Apache Iceberg specification. This enables interoperability with other systems through auto-refresh support for fetching the latest Iceberg metadata location, without any charges for Databricks compute.

There are two methods for leveraging Unity Catalog to fetch and refresh the Iceberg metadata location for a Snowflake Iceberg table:

  • Push — the data producer writes data to a Delta UniForm table, fetches the latest Iceberg metadata location from Unity Catalog and executes a ALTER ICEBERG TABLE…REFRESH statement using the Snowflake python connector. This method would be suitable in cases where the data curation process is centrally governed and the metadata refresh step can be coupled with the data load. Implementation of this method is comprehensively covered in an excellent blog by Jason Drew.
  • Pull — the consumption tool (i.e. Snowflake) fetches the latest Iceberg metadata location from Unity Catalog and executes a ALTER ICEBERG TABLE…REFRESH statement, either on a certain cadence or before executing a query against a Snowflake Iceberg table. This method can be used when appropriate controls cannot be enforced during upstream data curation.
High-level process flow for the “pull” method

The remainder of this blog will cover the implementation details of the “pull” method, using Snowflake and Databricks Unity Catalog.

Integrate Snowflake with Unity Catalog to refresh Iceberg table metadata location

We will use AWS as the cloud service provider for this example but the same can be replicated in Azure or GCP. Snowflake’s catalog integration currently lacks support for retrieving the source of Iceberg metadata through the open Iceberg Catalog REST API. We will navigate this by leveraging Snowflake’s API integration service to integrate with Unity Catalog’s Iceberg catalog endpoint, with AWS API Gateway serving as a proxy.

Here are the high level steps:

  1. Create Table (Databricks) — Create a table with UniForm enabled for Iceberg
  2. Create External Volume (Snowflake | AWS) — Create an external volume in Snowflake that grants Snowflake read-only access to the AWS S3 bucket where our table was created
  3. Set up API Integration (Snowflake | AWS) — Create an API integration object in Snowflake that uses AWS API Gateway as a proxy service to invoke an AWS Lambda function. The Lambda function is responsible for calling UC’s Iceberg Rest Catalog API and fetching the latest Iceberg metadata location for our table
  4. Create External Function (Snowflake) — Create an external function in Snowflake that will pass our table name and invoke the proxy API service
  5. Create Iceberg Table (Snowflake) — Create an external (object_storage source) Iceberg table in Snowflake using output from our external function to populate METADATA_FILE_PATH
  6. Streamline Metadata Refresh (Snowflake) Create a procedure in Snowflake that automates the refresh of Iceberg metadata location for a given table

Prerequisites

Appropriate access to following services will be needed for the purposes of this integration:

  • (Databricks) Create Table permissions in Unity Catalog
  • (AWS) Create / Update IAM role permissions — we will be creating an IAM role to allow Snowflake access to our S3 bucket as well as execute permissions to AWS API Gateway
  • (AWS) Create permissions for API Gateway and Lambda
  • (Snowflake) Create object permissions for External Volume, API Integration, External Functions, Procedure and Table

Step 1 (Databricks): Create Table

Create a table in Databricks with uniForm enabled to generate Iceberg metadata (along with Delta metadata). In this example we will use CTAS to populate our table using a sample dataset available in Unity Catalog. If needed, UniForm can also be enabled on existing tables.

CREATE TABLE customer TBLPROPERTIES(
'delta.universalFormat.enabledFormats' = 'iceberg',
'delta.enableIcebergCompatV2' = 'true'
) AS
SELECT *
FROM samples.tpch.customer;

Step 2 (Snowflake | AWS): Create External Volume

An external volume is an account-level Snowflake object that is required to read Iceberg tables from external cloud storage in AWS, Azure, or GCP. The cloud storage location and IAM information you designate for your Iceberg tables are stored in this external volume object.

Creating an external volume consists of multiple steps, including configuration of access permissions and IAM role in AWS. These steps are well documented here.

  • Once access permissions (IAM policy) and an IAM role are set up in AWS, create an external volume in Snowflake.
CREATE
OR REPLACE EXTERNAL VOLUME ext_vol_uniform STORAGE_LOCATIONS = (
(
NAME = 'uniform-s3-bucket'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://path_to_bucket/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<aws_account_id>:role/snowflake_role_for_uniform'
)
);
  • Once the external volume is created, execute DESC EXTERNAL VOLUME <external_volume_name> in Snowflake to retrieve the params; STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID. Use these params to edit the trust relationship on the IAM role created earlier.

Step 3 (Snowflake | AWS): Set up API Integration

A Snowflake API integration object stores information about an HTTPS proxy service (i.e. AWS API Gateway). The API integration object will allow us to invoke an AWS Lambda function that fetches the latest Iceberg metadata location for our table using UC’s Iceberg Rest Catalog API. Following are the steps required to set up this integration:

a. Set up AWS Lambda

The Lambda will be responsible for calling the Unity Catalog’s Iceberg Rest Catalog API and fetching the latest Iceberg metadata location for a table name (passed in by the external function in Step 4).

We will use the “requests” module to call UC’s Iceberg Rest Catalog API. As this module is not included by default in python, create a .zip file by zipping the python script and the modules together, upload this .zip file to the AWS Lambda UI.

mkdir api_lambda
cd api_lambda
pip3 install requests -t ./ # this will install the requests module in this folder
touch lambda_function.py
vim lambda_function.py

Now insert the following python code in lambda_function.py. This code will parse the payload (body) sent by Snowflake’s API integration object, pass the table_name to UC’s Iceberg Rest Catalog API and compose the output in the format expected by Snowflake’s API integration object.

import os
import json
import requests


def lambda_handler(event, context):

status_code = 200
array_of_rows_to_return = []

try:
event_body = event["body"]
payload = json.loads(event_body)
rows = payload["data"]

# Fetch values from environment variables
databricks_host = os.environ.get("DATABRICKS_HOST")
bearer_token = os.environ.get("BEARER_TOKEN")
base_api_endpoint = databricks_host + "/api/2.1/unity-catalog/iceberg/"

headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
}

# For each input row in the JSON object...
for row in rows:
# Read the input row number (the output row number will be the same)
row_number = row[0]

# Read the first input parameter's value which contains catalog.schema.table
input_parameter = row[1]

# parse input parameter
catalog_schema = (
input_parameter.split(".")[0] + "." + input_parameter.split(".")[1]
)
table = input_parameter.split(".")[2]

response = requests.get(
base_api_endpoint
+ "v1/namespaces/"
+ catalog_schema
+ "/tables/"
+ table,
headers=headers,
)

if response.status_code == 200:

api_response = response.json()
metadata_location = api_response.get("metadata-location", {})
output_value = {"metadata_location": metadata_location}

else:
output_value = {"Error": response.text}

# Put the returned row number and the returned value into an array
row_to_return = [row_number, output_value]

# ... and add that array to the main array.
array_of_rows_to_return.append(row_to_return)

# Compose the output based on format expected by Snowflake
json_compatible_string_to_return = json.dumps({"data": array_of_rows_to_return})

except Exception as err:
# 400 implies an error
status_code = 400
json_compatible_string_to_return = event_body

# Return the return value and HTTP status code
return {"statusCode": status_code, "body": json_compatible_string_to_return}

Create a zip containing all the contents of this folder (do NOT zip the parent folder).

zip -r api_lambda.zip .

Create AWS Lambda Function.

In the top right, use the “Upload from” → .zip file option.

Under “configuration”, set up environment variables for DATABRICKS_HOST and BEARER_TOKEN. Deploy the Lambda function.

Note: The BEARER_TOKEN used above will determine which tables within Unity Catalog the Lambda function can fetch the Iceberg metadata location for.

b. Set up AWS API Gateway

API Gateway will be used as a proxy service by Snowflake’s API integration object to send data to our Lambda function. The response from the Lambda function will then be returned to Snowflake’s API integration object.

Start with building a REST API.

Create a resource and create a POST method under this resource. Enable the check box for Lambda proxy integration and select the Lambda function we created earlier.

Deploy API. Note down the “Invoke URL”.

c. Add API Gateway permissions to IAM role

Next we have to update the IAM role (from Step 2) to allow Snowflake access to our API Gateway resource as well as grant execute permissions on the API. Lets start by adding the following policy to the IAM role to grant access to API Gateway resources (this can be restricted to specific resource(s) by modifying the Resource key).

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "apigateway:*",
"Resource": "*"
}
]
}

Add execute API permissions to the IAM role using the following policy. The Resource key points to the Method Request ARN from the POST method we created earlier.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"execute-api:Invoke"
],
"Resource": "arn:aws:execute-api:us-east-1:<aws_account_id>:u*****9uy4/*/POST/getmetadataloc"
}
]
}

d. Create API Integration for AWS in Snowflake

Replace the resource_invocation_url with the API Gateway “Invoke URL” (from Step 3).

CREATE OR REPLACE api integration uc_rest_api_integration 
api_provider = aws_api_gateway
api_aws_role_arn = 'arn:aws:iam::<aws_account_id>:role/snowflake_role'
api_allowed_prefixes =('<resource_invocation_url>')
enabled = true;

e. Link theAPI Integration object to API Gateway

Once the API integration object is created, we have to link it to our proxy service (i.e. API Gateway). Execute DESCRIBE API INTEGRATION <api_integration_name> in Snowflake to retrieve the params; API_AWS_IAM_USER_ARN and API_AWS_EXTERNAL_ID. Use these params to edit the trust relationship on the IAM role created earlier (from Step 2).

Step 4 (Snowflake): Create External Function

Create an external function object in Snowflake that will be used to pass a table name and invoke the proxy API service. Replace the resource_invocation_url with the API Gateway “Invoke URL” (from Step 3).

CREATE OR REPLACE EXTERNAL FUNCTION  get_uc_iceberg_location(table_name VARCHAR)
RETURNS variant
api_integration = uc_rest_api_integration
AS '<resource_invocation_url>';

Step 5 (Snowflake): Create Iceberg Table

Before we can create an external Iceberg table in Snowflake, we have to create a catalog integration. For our scenario, this will be using “object storage” as our Iceberg metadata files are located in AWS S3.

CREATE OR REPLACE CATALOG INTEGRATION s3_catalog
CATALOG_SOURCE=OBJECT_STORE
TABLE_FORMAT=ICEBERG
ENABLED=TRUE;

Now we are ready to call our External Function (from Step 4) to fetch the latest Iceberg metadata location for our table. We will pass in the UC catalog, schema and table to the function.

SELECT get_uc_iceberg_location('main.default.customer') AS metadata_location

The metadata_file_path is the relative path of the Iceberg metadata file (*.metadata.json) in relation to the STORAGE_BASE_URL specified when creating the External Volume (in Step 2). Use the metadata_location path returned by our External Function to derive the metadata_file_path and create an Iceberg table in Snowflake.

CREATE iceberg TABLE customer 
EXTERNAL_VOLUME = 'ext_vol_uniform'
CATALOG = 's3_catalog'
METADATA_FILE_PATH = '<metadata_file_path>';

Time to query our table in Snowflake.

Now that the External Function is set up and functional, it can be used to fetch the latest Iceberg metadata location for any Delta UniForm table that we have access to in Unity Catalog.

Step 6 (Snowflake): Streamline Metadata Refresh

For any upstream changes to a table (i.e. insert, upsert, delete) we can execute the External Function and then refresh the table to point to the latest Iceberg metadata location. But let’s combine these 2 steps to streamline this process even further. Here’s how:

Create a Procedure in Snowflake

This procedure will execute our External Function, fetch the latest Iceberg table metadata location and then execute an ALTER ICEBERG TABLE command to refresh the metadata location.

CREATE OR REPLACE PROCEDURE refresh_iceberg_metadata (uc_catalog STRING, uc_schema STRING, table_name STRING)
RETURNS VARIANT
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
val STRING;
metadata_location STRING;
relative_refresh_path STRING;
result VARIANT;
status_message STRING;
sql_stmt STRING;
BEGIN
-- Attempt to get the metadata location
SELECT get_uc_iceberg_location(:uc_catalog || '.' || :uc_schema || '.' || :table_name):metadata_location::STRING
INTO :val;

-- Check if :val contains 'metadata_location'
IF (NOT contains(:val, 'Error')) THEN
-- If successful, set status message
status_message := 'metadata updated successfully';
metadata_location := :val;

-- Extract relative path using regexp_substr
SELECT regexp_substr(:val, 's3://[^/]+/(.*)', 1, 1, 'e')
INTO :relative_refresh_path;

ELSE
-- If not successful, set status message
status_message := 'metadata update failed';
END IF;

-- Alter Iceberg Table and Refresh metadata location
sql_stmt := 'ALTER ICEBERG TABLE ' || :table_name || ' REFRESH ''' || :relative_refresh_path || '''';
EXECUTE IMMEDIATE :sql_stmt;

-- Build the result VARIANT with JSON
result := OBJECT_INSERT(OBJECT_INSERT(OBJECT_CONSTRUCT('table_name', :table_name), 'metadata_location', :metadata_location), 'status_message', :status_message);

-- Return the result
RETURN :result;

END;
$$;

Let’s test this out. First we are going to insert new records into our Delta UniForm table and validate count.

INSERT INTO customer
SELECT *
FROM samples.tpch.customer
LIMIT 2000;

Call the Procedure to REFRESH the metadata location.

CALL refresh_iceberg_metadata('main', 'default', 'customer');

Validate the count in Snowflake.

Conclusion

In summary, the integration detailed in this blog streamlines Iceberg metadata management within Snowflake through the use of Databricks Unity Catalog. Beyond the technical details, Delta UniForm represents a significant advancement towards an open and interoperable data ecosystem. By simplifying the complex landscape of varied storage formats, Delta UniForm accelerates organizations’ progression towards owning all their data and establishing a Data Intelligence platform.

--

--