Skip to content

Commit

Permalink
Fix: Silence invalid_ack_id warnings for receipt modacks (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
acocuzzo committed Oct 6, 2022
1 parent bb42c6f commit 17feea5
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ def _get_initial_request(
return request

def _send_lease_modacks(
self, ack_ids: Iterable[str], ack_deadline: float
self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True
) -> List[str]:
exactly_once_enabled = False
with self._exactly_once_enabled_lock:
Expand All @@ -1010,10 +1010,14 @@ def _send_lease_modacks(
assert req.future is not None
req.future.result()
except AcknowledgeError as ack_error:
_LOGGER.warning(
"AcknowledgeError when lease-modacking a message.",
exc_info=True,
)
if (
ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
or warn_on_invalid
):
_LOGGER.warning(
"AcknowledgeError when lease-modacking a message.",
exc_info=True,
)
if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
expired_ack_ids.append(req.ack_id)
return expired_ack_ids
Expand Down Expand Up @@ -1078,7 +1082,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# modack the messages we received, as this tells the server that we've
# received them.
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline))
expired_ack_ids = set(
self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
)

with self._pause_resume_lock:
assert self._scheduler is not None
Expand Down
85 changes: 83 additions & 2 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,7 @@ def test__on_response_disable_exactly_once():
assert manager._stream_ack_deadline == 60


def test__on_response_exactly_once_immediate_modacks_fail():
def test__on_response_exactly_once_immediate_modacks_fail(caplog):
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

Expand Down Expand Up @@ -1890,7 +1890,8 @@ def complete_futures_with_error(*args, **kwargs):

fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10)

manager._on_response(response)
with caplog.at_level(logging.WARNING):
manager._on_response(response)

# The second messages should be scheduled, and not the first.

Expand All @@ -1902,13 +1903,93 @@ def complete_futures_with_error(*args, **kwargs):
assert call_args[1].message_id == "2"

assert manager._messages_on_hold.size == 0

expected_warnings = [
record.message.lower()
for record in caplog.records
if "AcknowledgeError when lease-modacking a message." in record.message
]
assert len(expected_warnings) == 1

# No messages available
assert manager._messages_on_hold.get() is None

# do not add message
assert manager.load == 0.001


def test__on_response_exactly_once_immediate_modacks_fail_non_invalid(caplog):
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

def complete_futures_with_error(*args, **kwargs):
modack_requests = args[0]
for req in modack_requests:
if req.ack_id == "fack":
req.future.set_exception(
subscriber_exceptions.AcknowledgeError(
subscriber_exceptions.AcknowledgeStatus.OTHER, None
)
)
else:
req.future.set_exception(
subscriber_exceptions.AcknowledgeError(
subscriber_exceptions.AcknowledgeStatus.SUCCESS, None
)
)

dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error

# Set up the messages.
response = gapic_types.StreamingPullResponse(
received_messages=[
gapic_types.ReceivedMessage(
ack_id="fack",
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
),
gapic_types.ReceivedMessage(
ack_id="good",
message=gapic_types.PubsubMessage(data=b"foo", message_id="2"),
),
],
subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties(
exactly_once_delivery_enabled=True
),
)

# Actually run the method and prove that modack and schedule are called in
# the expected way.

fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10)

with caplog.at_level(logging.WARNING):
manager._on_response(response)

# The second messages should be scheduled, and not the first.

schedule_calls = scheduler.schedule.mock_calls
assert len(schedule_calls) == 2
call_args = schedule_calls[0][1]
assert call_args[0] == mock.sentinel.callback
assert isinstance(call_args[1], message.Message)
assert call_args[1].message_id == "1"

assert manager._messages_on_hold.size == 0

expected_warnings = [
record.message.lower()
for record in caplog.records
if "AcknowledgeError when lease-modacking a message." in record.message
]
assert len(expected_warnings) == 2

# No messages available
assert manager._messages_on_hold.get() is None

# do not add message
assert manager.load == 0.002


def test__should_recover_true():
manager = make_manager()

Expand Down

0 comments on commit 17feea5

Please sign in to comment.