-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): support exactly once delivery #6506
Conversation
* feat(pubsub): read exactly once for SubscriptionProperties * rename vars to be specific this is exactly once delivery
…to pubsub-exactly-once
…oogleapis#6157 (googleapis#6162) * add RWMutex for guarding exactly once bool * feat(pubsub): send stream ack deadline seconds on exactly once change * remove extra test
…apis#6201) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * fix style issues with variadic function options * add back comment format to exported const * keep track of AckResults if exactly once is enabled
* add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * add process results * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * add process results * update process info with new enum names * add tests to process error info * add process results * update process info with new enum names * add process results * add tests to process error info * clean up iterator from merge * cleanup comments * add list of retriable errors to test * simplify testing of completed/retry slice lengths * remove getStatus/ackErrors methods * address code review comments * remove error string conversion step
* refactor sendAck to pipe errors to AckResult map * rewrite sendAck/sendModAck for exactly once * add AckResult to list of uncompared methods * use ackResultWithID in all locations
…to pubsub-exactly-once
* retry acks in goroutine * retry acks/modacks with transient errors * add retry test * add nack tests and support shorter timeouts * add integration tests * remove extra comment * add commnets to ack/modack methods in iterator
…gle-cloud-go into pubsub-exactly-once
pubsub/iterator.go
Outdated
} | ||
// If exactly once is enabled, keep track of all pending AckResults | ||
// so we can cleanly close them all at shutdown. | ||
it.eoMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any concern about performance impact to non exactly once delivery subs due to these locks, here and below?
can the critical section be reduced if we read the value of it.enableExactlyOnceDelivery
into a local variable inside a reader lock, and then execute the rest of the logic using that variable? Would that impact the logical correctness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the lock acquisition should've been a RLock() + RUnlock()
here and outside of the loop. I made a similar optimization (moving the mutex locking) in other places as well. The only place where the mutex is locked for writing is when receiving on the stream so impact on non-exactly once subscriptions should be minimal.
pubsub/iterator.go
Outdated
|
||
const ( | ||
transientErrStringPrefix = "TRANSIENT_" | ||
transientInvalidAckErrString = transientErrStringPrefix + "FAILURE_INVALID_ACK_ID" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen. Invalid ack id is a permanent failure, always. I do not see this variable used anywhere apart from tests so maybe the logic is right, just that we don't need this variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I wasn't aware of this. I was mostly copying Python's error strings though I removed the variable anyway and made it a generic "TRANSIENT_FAILURE" error string for tests.
…d-go into pubsub-exactly-once
…d-go into pubsub-exactly-once
In googleapis#6506 changes to internal were made that are not apart of the pubsub module. The pubsub module was releated before the internal changes were released resulting in build errors for those that pulled the latest pubsub version. Fixes: googleapis#6555
The
pubsub-exactly-once
branch has code that has been reviewed in individual PRS already.