Open
Description
Description
-
Removing the need to explicitly include the
"api/"
prefix in endpoint strings.- The
_endpoint_url()
method now automatically prepends"api/"
if it's not present. - This makes calling the API more ergonomic and less error-prone.
- The
-
Refactoring HTTP method handling to use constants instead of repeated string literals.
- Improves readability and reduces risk of typos.
- Makes method validation more explicit.
def _do_api_call(
self,
endpoint_info: tuple[str, str],
json: dict[str, Any] | None = None,
wrap_http_errors: bool = True,
):
"""
Perform an API call with retries.
:param endpoint_info: Tuple of method and endpoint
:param json: Parameters for this API call.
:return: If the api call returns a OK status code,
this function returns the response in JSON. Otherwise,
we throw an AirflowException.
"""
method, endpoint = endpoint_info
# TODO: get rid of explicit 'api/' in the endpoint specification
url = self._endpoint_url(endpoint)
aad_headers = self._get_aad_headers()
headers = {**self.user_agent_header, **aad_headers}
auth: AuthBase
token = self._get_token()
if token:
auth = _TokenAuth(token)
else:
self.log.info("Using basic auth.")
auth = HTTPBasicAuth(self.databricks_conn.login, self.databricks_conn.password)
request_func: Any
if method == "GET":
request_func = requests.get
elif method == "POST":
request_func = requests.post
elif method == "PATCH":
request_func = requests.patch
elif method == "DELETE":
request_func = requests.delete
else:
raise AirflowException("Unexpected HTTP Method: " + method)
try:
for attempt in self._get_retry_object():
with attempt:
self.log.debug(
"Initiating %s request to %s with payload: %s, headers: %s",
method,
url,
json,
headers,
)
response = request_func(
url,
json=json if method in ("POST", "PATCH") else None,
params=json if method == "GET" else None,
auth=auth,
headers=headers,
timeout=self.timeout_seconds,
)
self.log.debug("Response Status Code: %s", response.status_code)
self.log.debug("Response text: %s", response.text)
response.raise_for_status()
return response.json()
except RetryError:
raise AirflowException(f"API requests to Databricks failed {self.retry_limit} times. Giving up.")
except requests_exceptions.HTTPError as e:
if wrap_http_errors:
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)
raise
Use case/motivation
No response
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct