Skip to content

Commit

Permalink
fix: set x-goog-request-params for streaming pull request (#884)
Browse files Browse the repository at this point in the history
* samples: schema evolution

* Add command-line commands

* Fix tag for rollback

* Make formatting fixes

* Formatting fixes

* Fix exceptions

* fix: Set x-goog-request-params for streaming pull request
  • Loading branch information
kamalaboulhosn committed Mar 14, 2023
1 parent a6df376 commit 0d247e6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._stream_metadata = [
["x-goog-request-params", "subscription=" + subscription]
]

# If max_duration_per_lease_extension is the default
# we set the stream_ack_deadline to the default of 60
Expand Down Expand Up @@ -845,6 +848,7 @@ def open(
initial_request=get_initial_request,
should_recover=self._should_recover,
should_terminate=self._should_terminate,
metadata=self._stream_metadata,
throttle_reopen=True,
)
self._rpc.add_done_callback(self._on_rpc_done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def test__wrap_callback_errors_error():


def test_constructor_and_default_state():
mock.sentinel.subscription = str()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client, mock.sentinel.subscription
)
Expand All @@ -113,6 +114,7 @@ def test_constructor_and_default_state():


def test_constructor_with_default_options():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
Expand All @@ -128,6 +130,7 @@ def test_constructor_with_default_options():


def test_constructor_with_min_and_max_duration_per_lease_extension_():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
)
Expand All @@ -142,6 +145,7 @@ def test_constructor_with_min_and_max_duration_per_lease_extension_():


def test_constructor_with_min_duration_per_lease_extension_too_low():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
)
Expand All @@ -156,6 +160,7 @@ def test_constructor_with_min_duration_per_lease_extension_too_low():


def test_constructor_with_max_duration_per_lease_extension_too_high():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
)
Expand Down Expand Up @@ -1181,6 +1186,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
initial_request=mock.ANY,
should_recover=manager._should_recover,
should_terminate=manager._should_terminate,
metadata=manager._stream_metadata,
throttle_reopen=True,
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
Expand Down

0 comments on commit 0d247e6

Please sign in to comment.