Skip to content

gcp vertex ai engine concurrent import requests limit makes rag.import_files_async call almost pointless if uploading to multiple corpora #5424

Open
@jsteinberg-rbi

Description

@jsteinberg-rbi

The code below works to get the first 50 files from bucket_dir_1 and then does not get anymore, no does it get any other files at all from any of the other bucket_dirs. the vertex ai rag engine says that up to 10K files can be imported at one time so I've no idea why I'd be hitting a limit.

corpora_ids_and_display_names = [('bucket_dir_1', 'projects/$PROJECT_NUMBER/locations/us-central1/ragCorpora/$CORPUS_ID_1'), ('bucket_dir_2', 'projects/$PROJECT_NUMBER/locations/us-central1/ragCorpora/$CORPUS_ID_2')]

async def import_files_async(corpus_id_and_display_name: tuple):
    try:
        response = await rag.import_files_async(
            corpus_name=corpus_id_and_display_name[1],
            paths=[f"gs://some-bucket/{corpus_id_and_display_name[0]}"],
        )
        return response
    except Exception as e:
        print(f"error: {e}")

async def create_import_files_task_group_async():
    coros = [import_files_async(corpus_id_and_display_name) for corpus_id_and_display_name in corpora_ids_and_display_names]
    import_files_task_group = [asyncio.create_task(coro) for coro in coros]
    results = await asyncio.gather(*import_files_task_group, return_exceptions=True)
    print(f"results: {results}")

asyncio.run(create_import_files_task_group_async())
# final asyncio.gather/asyncio.run response -- no errors
results: [<google.api_core.operation_async.AsyncOperation object at 0x10eaff4d0>, <google.api_core.operation_async.AsyncOperation object at 0x10eaf2fd0>, <google.api_core.operation_async.AsyncOperation object at 0x10ec48590>, <google.api_core.operation_async.AsyncOperation object at 0x10eaf1ba0>, <google.api_core.operation_async.AsyncOperation object at 0x10eafee90>]
# works to list corpora hence corpora paths are correct
for corpus_id_and_display_name in corpora_ids_and_display_names:
    print(rag.list_files(corpus_name=corpus_id_and_display_name[1]))
# works to list all files in each bucket path hence import_files_async
# has valid paths argument
for corpus_id_and_display_name in corpora_ids_and_display_names:
    subprocess.run(['gcloud storage ls', f"gs://bucket/{corpus_id_and_display_name[0]}"])

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: vertex-aiIssues related to the googleapis/python-aiplatform API.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions