Skip to content

Commit

Permalink
feat(pubsub/pstest): update max topic retention duration to 31 days (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Oct 5, 2023
1 parent ddbf0a6 commit d4f90ec
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
31 changes: 23 additions & 8 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
if s.topics[t.Name] != nil {
return nil, status.Errorf(codes.AlreadyExists, "topic %q", t.Name)
}
if err := checkMRD(t.MessageRetentionDuration); err != nil {
if err := checkTopicMessageRetention(t.MessageRetentionDuration); err != nil {
return nil, err
}
top := newTopic(t)
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
case "message_storage_policy":
t.proto.MessageStoragePolicy = req.Topic.MessageStoragePolicy
case "message_retention_duration":
if err := checkMRD(req.Topic.MessageRetentionDuration); err != nil {
if err := checkTopicMessageRetention(req.Topic.MessageRetentionDuration); err != nil {
return nil, err
}
t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration
Expand Down Expand Up @@ -493,7 +493,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.MessageRetentionDuration == nil {
ps.MessageRetentionDuration = defaultMessageRetentionDuration
}
if err := checkMRD(ps.MessageRetentionDuration); err != nil {
if err := checkSubMessageRetention(ps.MessageRetentionDuration); err != nil {
return nil, err
}
if ps.PushConfig == nil {
Expand Down Expand Up @@ -561,18 +561,33 @@ func checkAckDeadline(ads int32) error {
}

const (
minMessageRetentionDuration = 10 * time.Minute
maxMessageRetentionDuration = 31 * 24 * time.Hour // 31 days is the maximum supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention)
minTopicMessageRetentionDuration = 10 * time.Minute
// 31 days is the maximum topic supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention)
maxTopicMessageRetentionDuration = 31 * 24 * time.Hour
minSubMessageRetentionDuration = 10 * time.Minute
// 7 days is the maximum subscription supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention)
maxSubMessageRetentionDuration = 7 * 24 * time.Hour
)

var defaultMessageRetentionDuration = durpb.New(168 * time.Hour) // default is 7 days

func checkMRD(pmrd *durpb.Duration) error {
func checkTopicMessageRetention(pmrd *durpb.Duration) error {
if pmrd == nil {
return nil
}
mrd := pmrd.AsDuration()
if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
if mrd < minTopicMessageRetentionDuration || mrd > maxTopicMessageRetentionDuration {
return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd)
}
return nil
}

func checkSubMessageRetention(pmrd *durpb.Duration) error {
if pmrd == nil {
return nil
}
mrd := pmrd.AsDuration()
if mrd < minSubMessageRetentionDuration || mrd > maxSubMessageRetentionDuration {
return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd)
}
return nil
Expand Down Expand Up @@ -644,7 +659,7 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
sub.proto.RetainAckedMessages = req.Subscription.RetainAckedMessages

case "message_retention_duration":
if err := checkMRD(req.Subscription.MessageRetentionDuration); err != nil {
if err := checkSubMessageRetention(req.Subscription.MessageRetentionDuration); err != nil {
return nil, err
}
sub.proto.MessageRetentionDuration = req.Subscription.MessageRetentionDuration
Expand Down
2 changes: 1 addition & 1 deletion pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ type TopicConfigToUpdate struct {
// and may change.
MessageStoragePolicy *MessageStoragePolicy

// If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed.
// If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration optional.Duration
Expand Down

0 comments on commit d4f90ec

Please sign in to comment.