Skip to content

Commit a2520ca

Browse files
authored
feat: add default LoadJobConfig to Client (#1526)
1 parent aa0fa02 commit a2520ca

File tree

5 files changed

+621
-56
lines changed

5 files changed

+621
-56
lines changed

google/cloud/bigquery/client.py

Lines changed: 71 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ class Client(ClientWithProject):
210210
default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
211211
Default ``QueryJobConfig``.
212212
Will be merged into job configs passed into the ``query`` method.
213+
default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
214+
Default ``LoadJobConfig``.
215+
Will be merged into job configs passed into the ``load_table_*`` methods.
213216
client_info (Optional[google.api_core.client_info.ClientInfo]):
214217
The client info used to send a user-agent string along with API
215218
requests. If ``None``, then default info will be used. Generally,
@@ -235,6 +238,7 @@ def __init__(
235238
_http=None,
236239
location=None,
237240
default_query_job_config=None,
241+
default_load_job_config=None,
238242
client_info=None,
239243
client_options=None,
240244
) -> None:
@@ -260,6 +264,7 @@ def __init__(
260264
self._connection = Connection(self, **kw_args)
261265
self._location = location
262266
self._default_query_job_config = copy.deepcopy(default_query_job_config)
267+
self._default_load_job_config = copy.deepcopy(default_load_job_config)
263268

264269
@property
265270
def location(self):
@@ -277,6 +282,17 @@ def default_query_job_config(self):
277282
def default_query_job_config(self, value: QueryJobConfig):
278283
self._default_query_job_config = copy.deepcopy(value)
279284

285+
@property
286+
def default_load_job_config(self):
287+
"""Default ``LoadJobConfig``.
288+
Will be merged into job configs passed into the ``load_table_*`` methods.
289+
"""
290+
return self._default_load_job_config
291+
292+
@default_load_job_config.setter
293+
def default_load_job_config(self, value: LoadJobConfig):
294+
self._default_load_job_config = copy.deepcopy(value)
295+
280296
def close(self):
281297
"""Close the underlying transport objects, releasing system resources.
282298
@@ -2330,8 +2346,8 @@ def load_table_from_uri(
23302346
23312347
Raises:
23322348
TypeError:
2333-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2334-
class.
2349+
If ``job_config`` is not an instance of
2350+
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
23352351
"""
23362352
job_id = _make_job_id(job_id, job_id_prefix)
23372353

@@ -2348,11 +2364,14 @@ def load_table_from_uri(
23482364

23492365
destination = _table_arg_to_table_ref(destination, default_project=self.project)
23502366

2351-
if job_config:
2352-
job_config = copy.deepcopy(job_config)
2353-
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
2367+
if job_config is not None:
2368+
_verify_job_config_type(job_config, LoadJobConfig)
2369+
else:
2370+
job_config = job.LoadJobConfig()
23542371

2355-
load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config)
2372+
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2373+
2374+
load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
23562375
load_job._begin(retry=retry, timeout=timeout)
23572376

23582377
return load_job
@@ -2424,8 +2443,8 @@ def load_table_from_file(
24242443
mode.
24252444
24262445
TypeError:
2427-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2428-
class.
2446+
If ``job_config`` is not an instance of
2447+
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
24292448
"""
24302449
job_id = _make_job_id(job_id, job_id_prefix)
24312450

@@ -2437,10 +2456,15 @@ def load_table_from_file(
24372456

24382457
destination = _table_arg_to_table_ref(destination, default_project=self.project)
24392458
job_ref = job._JobReference(job_id, project=project, location=location)
2440-
if job_config:
2441-
job_config = copy.deepcopy(job_config)
2442-
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
2443-
load_job = job.LoadJob(job_ref, None, destination, self, job_config)
2459+
2460+
if job_config is not None:
2461+
_verify_job_config_type(job_config, LoadJobConfig)
2462+
else:
2463+
job_config = job.LoadJobConfig()
2464+
2465+
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2466+
2467+
load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
24442468
job_resource = load_job.to_api_repr()
24452469

24462470
if rewind:
@@ -2564,43 +2588,40 @@ def load_table_from_dataframe(
25642588
If a usable parquet engine cannot be found. This method
25652589
requires :mod:`pyarrow` to be installed.
25662590
TypeError:
2567-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2568-
class.
2591+
If ``job_config`` is not an instance of
2592+
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
25692593
"""
25702594
job_id = _make_job_id(job_id, job_id_prefix)
25712595

2572-
if job_config:
2573-
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
2574-
# Make a copy so that the job config isn't modified in-place.
2575-
job_config_properties = copy.deepcopy(job_config._properties)
2576-
job_config = job.LoadJobConfig()
2577-
job_config._properties = job_config_properties
2578-
2596+
if job_config is not None:
2597+
_verify_job_config_type(job_config, LoadJobConfig)
25792598
else:
25802599
job_config = job.LoadJobConfig()
25812600

2601+
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2602+
25822603
supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
2583-
if job_config.source_format is None:
2604+
if new_job_config.source_format is None:
25842605
# default value
2585-
job_config.source_format = job.SourceFormat.PARQUET
2606+
new_job_config.source_format = job.SourceFormat.PARQUET
25862607

25872608
if (
2588-
job_config.source_format == job.SourceFormat.PARQUET
2589-
and job_config.parquet_options is None
2609+
new_job_config.source_format == job.SourceFormat.PARQUET
2610+
and new_job_config.parquet_options is None
25902611
):
25912612
parquet_options = ParquetOptions()
25922613
# default value
25932614
parquet_options.enable_list_inference = True
2594-
job_config.parquet_options = parquet_options
2615+
new_job_config.parquet_options = parquet_options
25952616

2596-
if job_config.source_format not in supported_formats:
2617+
if new_job_config.source_format not in supported_formats:
25972618
raise ValueError(
25982619
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2599-
job_config.source_format
2620+
new_job_config.source_format
26002621
)
26012622
)
26022623

2603-
if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET:
2624+
if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET:
26042625
# pyarrow is now the only supported parquet engine.
26052626
raise ValueError("This method requires pyarrow to be installed")
26062627

@@ -2611,8 +2632,8 @@ def load_table_from_dataframe(
26112632
# schema, and check if dataframe schema is compatible with it - except
26122633
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
26132634
if (
2614-
not job_config.schema
2615-
and job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
2635+
not new_job_config.schema
2636+
and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
26162637
):
26172638
try:
26182639
table = self.get_table(destination)
@@ -2623,7 +2644,7 @@ def load_table_from_dataframe(
26232644
name
26242645
for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
26252646
)
2626-
job_config.schema = [
2647+
new_job_config.schema = [
26272648
# Field description and policy tags are not needed to
26282649
# serialize a data frame.
26292650
SchemaField(
@@ -2637,11 +2658,11 @@ def load_table_from_dataframe(
26372658
if field.name in columns_and_indexes
26382659
]
26392660

2640-
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2641-
dataframe, job_config.schema
2661+
new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2662+
dataframe, new_job_config.schema
26422663
)
26432664

2644-
if not job_config.schema:
2665+
if not new_job_config.schema:
26452666
# the schema could not be fully detected
26462667
warnings.warn(
26472668
"Schema could not be detected for all columns. Loading from a "
@@ -2652,13 +2673,13 @@ def load_table_from_dataframe(
26522673
)
26532674

26542675
tmpfd, tmppath = tempfile.mkstemp(
2655-
suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower())
2676+
suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
26562677
)
26572678
os.close(tmpfd)
26582679

26592680
try:
26602681

2661-
if job_config.source_format == job.SourceFormat.PARQUET:
2682+
if new_job_config.source_format == job.SourceFormat.PARQUET:
26622683
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
26632684
msg = (
26642685
"Loading dataframe data in PARQUET format with pyarrow "
@@ -2669,13 +2690,13 @@ def load_table_from_dataframe(
26692690
)
26702691
warnings.warn(msg, category=RuntimeWarning)
26712692

2672-
if job_config.schema:
2693+
if new_job_config.schema:
26732694
if parquet_compression == "snappy": # adjust the default value
26742695
parquet_compression = parquet_compression.upper()
26752696

26762697
_pandas_helpers.dataframe_to_parquet(
26772698
dataframe,
2678-
job_config.schema,
2699+
new_job_config.schema,
26792700
tmppath,
26802701
parquet_compression=parquet_compression,
26812702
parquet_use_compliant_nested_type=True,
@@ -2715,7 +2736,7 @@ def load_table_from_dataframe(
27152736
job_id_prefix=job_id_prefix,
27162737
location=location,
27172738
project=project,
2718-
job_config=job_config,
2739+
job_config=new_job_config,
27192740
timeout=timeout,
27202741
)
27212742

@@ -2791,22 +2812,22 @@ def load_table_from_json(
27912812
27922813
Raises:
27932814
TypeError:
2794-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2795-
class.
2815+
If ``job_config`` is not an instance of
2816+
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
27962817
"""
27972818
job_id = _make_job_id(job_id, job_id_prefix)
27982819

2799-
if job_config:
2800-
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
2801-
# Make a copy so that the job config isn't modified in-place.
2802-
job_config = copy.deepcopy(job_config)
2820+
if job_config is not None:
2821+
_verify_job_config_type(job_config, LoadJobConfig)
28032822
else:
28042823
job_config = job.LoadJobConfig()
28052824

2806-
job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
2825+
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2826+
2827+
new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
28072828

2808-
if job_config.schema is None:
2809-
job_config.autodetect = True
2829+
if new_job_config.schema is None:
2830+
new_job_config.autodetect = True
28102831

28112832
if project is None:
28122833
project = self.project
@@ -2828,7 +2849,7 @@ def load_table_from_json(
28282849
job_id_prefix=job_id_prefix,
28292850
location=location,
28302851
project=project,
2831-
job_config=job_config,
2852+
job_config=new_job_config,
28322853
timeout=timeout,
28332854
)
28342855

google/cloud/bigquery/job/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def to_api_repr(self) -> dict:
269269
"""
270270
return copy.deepcopy(self._properties)
271271

272-
def _fill_from_default(self, default_job_config):
272+
def _fill_from_default(self, default_job_config=None):
273273
"""Merge this job config with a default job config.
274274
275275
The keys in this object take precedence over the keys in the default
@@ -283,6 +283,10 @@ def _fill_from_default(self, default_job_config):
283283
Returns:
284284
google.cloud.bigquery.job._JobConfig: A new (merged) job config.
285285
"""
286+
if not default_job_config:
287+
new_job_config = copy.deepcopy(self)
288+
return new_job_config
289+
286290
if self._job_type != default_job_config._job_type:
287291
raise TypeError(
288292
"attempted to merge two incompatible job types: "

tests/system/test_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,7 +2319,7 @@ def _table_exists(t):
23192319
return False
23202320

23212321

2322-
def test_dbapi_create_view(dataset_id):
2322+
def test_dbapi_create_view(dataset_id: str):
23232323

23242324
query = f"""
23252325
CREATE VIEW {dataset_id}.dbapi_create_view
@@ -2332,7 +2332,7 @@ def test_dbapi_create_view(dataset_id):
23322332
assert Config.CURSOR.rowcount == 0, "expected 0 rows"
23332333

23342334

2335-
def test_parameterized_types_round_trip(dataset_id):
2335+
def test_parameterized_types_round_trip(dataset_id: str):
23362336
client = Config.CLIENT
23372337
table_id = f"{dataset_id}.test_parameterized_types_round_trip"
23382338
fields = (
@@ -2358,7 +2358,7 @@ def test_parameterized_types_round_trip(dataset_id):
23582358
assert tuple(s._key()[:2] for s in table2.schema) == fields
23592359

23602360

2361-
def test_table_snapshots(dataset_id):
2361+
def test_table_snapshots(dataset_id: str):
23622362
from google.cloud.bigquery import CopyJobConfig
23632363
from google.cloud.bigquery import OperationType
23642364

@@ -2429,7 +2429,7 @@ def test_table_snapshots(dataset_id):
24292429
assert rows == [(1, "one"), (2, "two")]
24302430

24312431

2432-
def test_table_clones(dataset_id):
2432+
def test_table_clones(dataset_id: str):
24332433
from google.cloud.bigquery import CopyJobConfig
24342434
from google.cloud.bigquery import OperationType
24352435

tests/unit/job/test_base.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ def test_ctor_with_unknown_property_raises_error(self):
11041104
config = self._make_one()
11051105
config.wrong_name = None
11061106

1107-
def test_fill_from_default(self):
1107+
def test_fill_query_job_config_from_default(self):
11081108
from google.cloud.bigquery import QueryJobConfig
11091109

11101110
job_config = QueryJobConfig()
@@ -1120,6 +1120,22 @@ def test_fill_from_default(self):
11201120
self.assertTrue(final_job_config.use_query_cache)
11211121
self.assertEqual(final_job_config.maximum_bytes_billed, 1000)
11221122

1123+
def test_fill_load_job_from_default(self):
1124+
from google.cloud.bigquery import LoadJobConfig
1125+
1126+
job_config = LoadJobConfig()
1127+
job_config.create_session = True
1128+
job_config.encoding = "UTF-8"
1129+
1130+
default_job_config = LoadJobConfig()
1131+
default_job_config.ignore_unknown_values = True
1132+
default_job_config.encoding = "ISO-8859-1"
1133+
1134+
final_job_config = job_config._fill_from_default(default_job_config)
1135+
self.assertTrue(final_job_config.create_session)
1136+
self.assertTrue(final_job_config.ignore_unknown_values)
1137+
self.assertEqual(final_job_config.encoding, "UTF-8")
1138+
11231139
def test_fill_from_default_conflict(self):
11241140
from google.cloud.bigquery import QueryJobConfig
11251141

@@ -1132,6 +1148,17 @@ def test_fill_from_default_conflict(self):
11321148
with self.assertRaises(TypeError):
11331149
basic_job_config._fill_from_default(conflicting_job_config)
11341150

1151+
def test_fill_from_empty_default_conflict(self):
1152+
from google.cloud.bigquery import QueryJobConfig
1153+
1154+
job_config = QueryJobConfig()
1155+
job_config.dry_run = True
1156+
job_config.maximum_bytes_billed = 1000
1157+
1158+
final_job_config = job_config._fill_from_default(default_job_config=None)
1159+
self.assertTrue(final_job_config.dry_run)
1160+
self.assertEqual(final_job_config.maximum_bytes_billed, 1000)
1161+
11351162
@mock.patch("google.cloud.bigquery._helpers._get_sub_prop")
11361163
def test__get_sub_prop_wo_default(self, _get_sub_prop):
11371164
job_config = self._make_one()

0 commit comments

Comments
 (0)