Skip to content

Commit

Permalink
fix: remove expired ack_ids (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
acocuzzo committed Sep 22, 2022
1 parent f3ebbae commit b4b809d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,9 @@ def _get_initial_request(
# Return the initial request.
return request

def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float):
def _send_lease_modacks(
self, ack_ids: Iterable[str], ack_deadline: float
) -> List[str]:
exactly_once_enabled = False
with self._exactly_once_enabled_lock:
exactly_once_enabled = self._exactly_once_enabled
Expand All @@ -1002,22 +1004,27 @@ def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float):
assert self._dispatcher is not None
self._dispatcher.modify_ack_deadline(items)

expired_ack_ids = []
for req in items:
try:
assert req.future is not None
req.future.result()
except AcknowledgeError:
except AcknowledgeError as ack_error:
_LOGGER.warning(
"AcknowledgeError when lease-modacking a message.",
exc_info=True,
)
if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
expired_ack_ids.append(req.ack_id)
return expired_ack_ids
else:
items = [
requests.ModAckRequest(ack_id, self.ack_deadline, None)
for ack_id in ack_ids
]
assert self._dispatcher is not None
self._dispatcher.modify_ack_deadline(items)
return []

def _exactly_once_delivery_enabled(self) -> bool:
"""Whether exactly-once delivery is enabled for the subscription."""
Expand Down Expand Up @@ -1071,28 +1078,32 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# modack the messages we received, as this tells the server that we've
# received them.
ack_id_gen = (message.ack_id for message in received_messages)
self._send_lease_modacks(ack_id_gen, self.ack_deadline)
expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline))

with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None

for received_message in received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
self._exactly_once_delivery_enabled,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self._leaser.add([req])
if (
not self._exactly_once_delivery_enabled()
or received_message.ack_id not in expired_ack_ids
):
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
self._exactly_once_delivery_enabled,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self._leaser.add([req])

self._maybe_release_messages()

Expand Down
46 changes: 37 additions & 9 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):

manager = make_manager()

with mock.patch.object(
Expand Down Expand Up @@ -1852,11 +1853,18 @@ def test__on_response_exactly_once_immediate_modacks_fail():
def complete_futures_with_error(*args, **kwargs):
modack_requests = args[0]
for req in modack_requests:
req.future.set_exception(
subscriber_exceptions.AcknowledgeError(
subscriber_exceptions.AcknowledgeStatus.SUCCESS, None
if req.ack_id == "fack":
req.future.set_exception(
subscriber_exceptions.AcknowledgeError(
subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID, None
)
)
else:
req.future.set_exception(
subscriber_exceptions.AcknowledgeError(
subscriber_exceptions.AcknowledgeStatus.SUCCESS, None
)
)
)

dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error

Expand All @@ -1866,19 +1874,39 @@ def complete_futures_with_error(*args, **kwargs):
gapic_types.ReceivedMessage(
ack_id="fack",
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
)
),
gapic_types.ReceivedMessage(
ack_id="good",
message=gapic_types.PubsubMessage(data=b"foo", message_id="2"),
),
],
subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties(
exactly_once_delivery_enabled=True
),
)

# adjust message bookkeeping in leaser
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)
# Actually run the method and prove that modack and schedule are called in
# the expected way.

fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10)

# exactly_once should be enabled
manager._on_response(response)
# exceptions are logged, but otherwise no effect

# The second messages should be scheduled, and not the first.

schedule_calls = scheduler.schedule.mock_calls
assert len(schedule_calls) == 1
call_args = schedule_calls[0][1]
assert call_args[0] == mock.sentinel.callback
assert isinstance(call_args[1], message.Message)
assert call_args[1].message_id == "2"

assert manager._messages_on_hold.size == 0
# No messages available
assert manager._messages_on_hold.get() is None

# do not add message
assert manager.load == 0.001


def test__should_recover_true():
Expand Down

0 comments on commit b4b809d

Please sign in to comment.