Skip to content

Commit

Permalink
feat: add remote function options to routines (#1558)
Browse files Browse the repository at this point in the history
* feat: add remote function options

This PR adds support for defining routines as remote UDFs.

* basic integration test

* augment tests

* rename prop

* augment tests

* more testing

* cover shenanigans

---------

Co-authored-by: Tim Swast <[email protected]>
  • Loading branch information
shollyman and tswast committed May 19, 2023
1 parent 9ea2e21 commit 84ad11d
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 0 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
from google.cloud.bigquery.routine import RoutineArgument
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.routine import RemoteFunctionOptions
from google.cloud.bigquery.schema import PolicyTagList
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.standard_sql import StandardSqlDataType
Expand Down Expand Up @@ -154,6 +155,7 @@
"Routine",
"RoutineArgument",
"RoutineReference",
"RemoteFunctionOptions",
# Shared helpers
"SchemaField",
"PolicyTagList",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/routine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.cloud.bigquery.routine.routine import RoutineArgument
from google.cloud.bigquery.routine.routine import RoutineReference
from google.cloud.bigquery.routine.routine import RoutineType
from google.cloud.bigquery.routine.routine import RemoteFunctionOptions


__all__ = (
Expand All @@ -28,4 +29,5 @@
"RoutineArgument",
"RoutineReference",
"RoutineType",
"RemoteFunctionOptions",
)
153 changes: 153 additions & 0 deletions google/cloud/bigquery/routine/routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Routine(object):
"type_": "routineType",
"description": "description",
"determinism_level": "determinismLevel",
"remote_function_options": "remoteFunctionOptions",
}

def __init__(self, routine_ref, **kwargs) -> None:
Expand Down Expand Up @@ -297,6 +298,37 @@ def determinism_level(self):
def determinism_level(self, value):
self._properties[self._PROPERTY_TO_API_FIELD["determinism_level"]] = value

@property
def remote_function_options(self):
"""Optional[google.cloud.bigquery.routine.RemoteFunctionOptions]: Configures remote function
options for a routine.
Raises:
ValueError:
If the value is not
:class:`~google.cloud.bigquery.routine.RemoteFunctionOptions` or
:data:`None`.
"""
prop = self._properties.get(
self._PROPERTY_TO_API_FIELD["remote_function_options"]
)
if prop is not None:
return RemoteFunctionOptions.from_api_repr(prop)

@remote_function_options.setter
def remote_function_options(self, value):
api_repr = value
if isinstance(value, RemoteFunctionOptions):
api_repr = value.to_api_repr()
elif value is not None:
raise ValueError(
"value must be google.cloud.bigquery.routine.RemoteFunctionOptions "
"or None"
)
self._properties[
self._PROPERTY_TO_API_FIELD["remote_function_options"]
] = api_repr

@classmethod
def from_api_repr(cls, resource: dict) -> "Routine":
"""Factory: construct a routine given its API representation.
Expand Down Expand Up @@ -563,3 +595,124 @@ def __str__(self):
This is a fully-qualified ID, including the project ID and dataset ID.
"""
return "{}.{}.{}".format(self.project, self.dataset_id, self.routine_id)


class RemoteFunctionOptions(object):
"""Configuration options for controlling remote BigQuery functions."""

_PROPERTY_TO_API_FIELD = {
"endpoint": "endpoint",
"connection": "connection",
"max_batching_rows": "maxBatchingRows",
"user_defined_context": "userDefinedContext",
}

def __init__(
self,
endpoint=None,
connection=None,
max_batching_rows=None,
user_defined_context=None,
_properties=None,
) -> None:
if _properties is None:
_properties = {}
self._properties = _properties

if endpoint is not None:
self.endpoint = endpoint
if connection is not None:
self.connection = connection
if max_batching_rows is not None:
self.max_batching_rows = max_batching_rows
if user_defined_context is not None:
self.user_defined_context = user_defined_context

@property
def connection(self):
"""string: Fully qualified name of the user-provided connection object which holds the authentication information to send requests to the remote service.
Format is "projects/{projectId}/locations/{locationId}/connections/{connectionId}"
"""
return _helpers._str_or_none(self._properties.get("connection"))

@connection.setter
def connection(self, value):
self._properties["connection"] = _helpers._str_or_none(value)

@property
def endpoint(self):
"""string: Endpoint of the user-provided remote service
Example: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"
"""
return _helpers._str_or_none(self._properties.get("endpoint"))

@endpoint.setter
def endpoint(self, value):
self._properties["endpoint"] = _helpers._str_or_none(value)

@property
def max_batching_rows(self):
"""int64: Max number of rows in each batch sent to the remote service.
If absent or if 0, BigQuery dynamically decides the number of rows in a batch.
"""
return _helpers._int_or_none(self._properties.get("maxBatchingRows"))

@max_batching_rows.setter
def max_batching_rows(self, value):
self._properties["maxBatchingRows"] = _helpers._str_or_none(value)

@property
def user_defined_context(self):
"""Dict[str, str]: User-defined context as a set of key/value pairs,
which will be sent as function invocation context together with
batched arguments in the requests to the remote service. The total
number of bytes of keys and values must be less than 8KB.
"""
return self._properties.get("userDefinedContext")

@user_defined_context.setter
def user_defined_context(self, value):
if not isinstance(value, dict):
raise ValueError("value must be dictionary")
self._properties["userDefinedContext"] = value

@classmethod
def from_api_repr(cls, resource: dict) -> "RemoteFunctionOptions":
"""Factory: construct remote function options given its API representation.
Args:
resource (Dict[str, object]): Resource, as returned from the API.
Returns:
google.cloud.bigquery.routine.RemoteFunctionOptions:
Python object, as parsed from ``resource``.
"""
ref = cls()
ref._properties = resource
return ref

def to_api_repr(self) -> dict:
"""Construct the API resource representation of this RemoteFunctionOptions.
Returns:
Dict[str, object]: Remote function options represented as an API resource.
"""
return self._properties

def __eq__(self, other):
if not isinstance(other, RemoteFunctionOptions):
return NotImplemented
return self._properties == other._properties

def __ne__(self, other):
return not self == other

def __repr__(self):
all_properties = [
"{}={}".format(property_name, repr(getattr(self, property_name)))
for property_name in sorted(self._PROPERTY_TO_API_FIELD)
]
return "RemoteFunctionOptions({})".format(", ".join(all_properties))
128 changes: 128 additions & 0 deletions tests/unit/routine/test_remote_function_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
#
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

ENDPOINT = "https://some.endpoint"
CONNECTION = "connection_string"
MAX_BATCHING_ROWS = 50
USER_DEFINED_CONTEXT = {
"foo": "bar",
}


@pytest.fixture
def target_class():
from google.cloud.bigquery.routine import RemoteFunctionOptions

return RemoteFunctionOptions


def test_ctor(target_class):

options = target_class(
endpoint=ENDPOINT,
connection=CONNECTION,
max_batching_rows=MAX_BATCHING_ROWS,
user_defined_context=USER_DEFINED_CONTEXT,
)
assert options.endpoint == ENDPOINT
assert options.connection == CONNECTION
assert options.max_batching_rows == MAX_BATCHING_ROWS
assert options.user_defined_context == USER_DEFINED_CONTEXT


def test_empty_ctor(target_class):
options = target_class()
assert options._properties == {}
options = target_class(_properties=None)
assert options._properties == {}
options = target_class(_properties={})
assert options._properties == {}


def test_ctor_bad_context(target_class):
with pytest.raises(ValueError, match="value must be dictionary"):
target_class(user_defined_context=[1, 2, 3, 4])


def test_from_api_repr(target_class):
resource = {
"endpoint": ENDPOINT,
"connection": CONNECTION,
"maxBatchingRows": MAX_BATCHING_ROWS,
"userDefinedContext": USER_DEFINED_CONTEXT,
"someRandomField": "someValue",
}
options = target_class.from_api_repr(resource)
assert options.endpoint == ENDPOINT
assert options.connection == CONNECTION
assert options.max_batching_rows == MAX_BATCHING_ROWS
assert options.user_defined_context == USER_DEFINED_CONTEXT
assert options._properties["someRandomField"] == "someValue"


def test_from_api_repr_w_minimal_resource(target_class):
resource = {}
options = target_class.from_api_repr(resource)
assert options.endpoint is None
assert options.connection is None
assert options.max_batching_rows is None
assert options.user_defined_context is None


def test_from_api_repr_w_unknown_fields(target_class):
resource = {"thisFieldIsNotInTheProto": "just ignore me"}
options = target_class.from_api_repr(resource)
assert options._properties is resource


def test_eq(target_class):
options = target_class(
endpoint=ENDPOINT,
connection=CONNECTION,
max_batching_rows=MAX_BATCHING_ROWS,
user_defined_context=USER_DEFINED_CONTEXT,
)
other_options = target_class(
endpoint=ENDPOINT,
connection=CONNECTION,
max_batching_rows=MAX_BATCHING_ROWS,
user_defined_context=USER_DEFINED_CONTEXT,
)
assert options == other_options
assert not (options != other_options)

empty_options = target_class()
assert not (options == empty_options)
assert options != empty_options

notanarg = object()
assert not (options == notanarg)
assert options != notanarg


def test_repr(target_class):
options = target_class(
endpoint=ENDPOINT,
connection=CONNECTION,
max_batching_rows=MAX_BATCHING_ROWS,
user_defined_context=USER_DEFINED_CONTEXT,
)
actual_repr = repr(options)
assert actual_repr == (
"RemoteFunctionOptions(connection='connection_string', endpoint='https://some.endpoint', max_batching_rows=50, user_defined_context={'foo': 'bar'})"
)

0 comments on commit 84ad11d

Please sign in to comment.