diff --git a/pyproject.toml b/pyproject.toml index 587c3459..9c6d3cef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,9 @@ dependencies = [ "sniffio", "cached-property; python_version < '3.8'", "pandas; python_version >= '3.7'", + "pyarrow>=11.0.0", "pyyaml>=6.0", + "requests_toolbelt>=1.0.0", ] requires-python = ">= 3.7" classifiers = [ diff --git a/src/openlayer/lib/data/__init__.py b/src/openlayer/lib/data/__init__.py index 89cdc091..a4e035ff 100644 --- a/src/openlayer/lib/data/__init__.py +++ b/src/openlayer/lib/data/__init__.py @@ -8,5 +8,8 @@ ] from ._upload import StorageType -from .batch_inferences import update_batch_inferences, upload_batch_inferences +from .batch_inferences import ( + update_batch_inferences, + upload_batch_inferences, +) from .reference_dataset import upload_reference_dataframe diff --git a/src/openlayer/lib/data/_upload.py b/src/openlayer/lib/data/_upload.py index 2695133e..fd90ef2b 100644 --- a/src/openlayer/lib/data/_upload.py +++ b/src/openlayer/lib/data/_upload.py @@ -5,7 +5,6 @@ """ import os -import shutil from enum import Enum from typing import Optional @@ -104,7 +103,7 @@ def upload_blob_s3( with open(file_path, "rb") as f: # Avoid logging here as it will break the progress bar fields = presigned_url_response.fields - fields["file"] = (object_name, f, "application/x-tar") + fields["file"] = (object_name, f, "application/vnd.apache.arrow.file") e = MultipartEncoder(fields=fields) m = MultipartEncoderMonitor( e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n) diff --git a/src/openlayer/lib/data/batch_inferences.py b/src/openlayer/lib/data/batch_inferences.py index dbc7d805..b623f798 100644 --- a/src/openlayer/lib/data/batch_inferences.py +++ b/src/openlayer/lib/data/batch_inferences.py @@ -1,17 +1,14 @@ """Upload a batch of inferences to the Openlayer platform.""" -import os import time -import shutil -import tarfile import tempfile from typing import Optional import httpx import pandas as pd +import pyarrow as pa from . import StorageType, _upload -from .. import utils from ... import Openlayer from ..._utils import maybe_transform from ...types.inference_pipelines import data_stream_params @@ -25,6 +22,7 @@ def upload_batch_inferences( dataset_path: Optional[str] = None, storage_type: Optional[StorageType] = None, merge: bool = False, + verbose: bool = False, ) -> None: """Uploads a batch of inferences to the Openlayer platform.""" if dataset_df is None and dataset_path is None: @@ -33,7 +31,7 @@ def upload_batch_inferences( raise ValueError("Only one of dataset_df or dataset_path should be provided.") uploader = _upload.Uploader(client, storage_type) - object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.tar.gz" + object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.arrow" # Fetch presigned url presigned_url_response = client.storage.presigned_url.create( @@ -42,29 +40,40 @@ def upload_batch_inferences( # Write dataset and config to temp directory with tempfile.TemporaryDirectory() as tmp_dir: - temp_file_path = f"{tmp_dir}/dataset.csv" + # If DataFrame is provided, convert it to Arrow Table and write it using IPC + # writer if dataset_df is not None: - dataset_df.to_csv(temp_file_path, index=False) - else: - shutil.copy(dataset_path, temp_file_path) + temp_file_path = f"{tmp_dir}/dataset.arrow" + if verbose: + print("Converting DataFrame to pyarrow Table...") + pa_table = pa.Table.from_pandas(dataset_df) + pa_schema = pa_table.schema - # Copy relevant files to tmp dir - config["label"] = "production" - utils.write_yaml( - maybe_transform(config, data_stream_params.Config), - f"{tmp_dir}/dataset_config.yaml", - ) + if verbose: + print( + "Writing Arrow Table using RecordBatchStreamWriter to " + f"{temp_file_path}" + ) + with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer: + writer.write_table(pa_table, max_chunksize=16384) + else: + object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.csv" + temp_file_path = dataset_path - tar_file_path = os.path.join(tmp_dir, object_name) - with tarfile.open(tar_file_path, mode="w:gz") as tar: - tar.add(tmp_dir, arcname=os.path.basename("monitoring_data")) + # camelCase the config + config = maybe_transform(config, data_stream_params.Config) - # Upload to storage - uploader.upload( - file_path=tar_file_path, + # Upload tarball to storage + if verbose: + print("Uploading dataset to storage via presigned URL...") + response = uploader.upload( + file_path=temp_file_path, object_name=object_name, presigned_url_response=presigned_url_response, ) + print(response.status_code) + print(response.text) + print(response.content) # Notify the backend client.post( @@ -73,6 +82,7 @@ def upload_batch_inferences( body={ "storageUri": presigned_url_response.storage_uri, "performDataMerge": merge, + "config": config, }, )