Skip to content

Commit

Permalink
fix(pubsub/pstest): fix panic on undelivered message (#7377)
Browse files Browse the repository at this point in the history
* refactor(pubsub/pstest): replace global now atomic with struct field

It is counterintuitive to plumb through a time.Now replacement function
only to have it refer to a global variable. Instead, make the Server
timeNowFunc an atomic.Value field and add a now method accessor that
loads the function value atomically and invokes it. Use that in tests,
and also in the exported SetTimeNowFunc variable.

* fix(pubsub/pstest): prevent panic on undelievered messages

If a running pstest Server has a subscription without a dead letter
topic configured, and there's a pending message in the subscription that
is not received after the retention duration (10 minutes), the deliver
loop for the subscription will trigger a panic.

This commit prevents the panic by not trying to deliver expired messages
to the dead letter topic, since that's what the live service does
either.

---------

Co-authored-by: Alex Hong <[email protected]>
  • Loading branch information
adg and hongalex committed Feb 27, 2023
1 parent f425a60 commit 98dd29d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 41 deletions.
33 changes: 11 additions & 22 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ type publishResponse struct {
err error
}

// For testing. Note that even though changes to the now variable are atomic, a call
// to the stored function can race with a change to that function. This could be a
// problem if tests are run in parallel, or even if concurrent parts of the same test
// change the value of the variable.
var now atomic.Value

func init() {
now.Store(time.Now)
ResetMinAckDeadline()
}

func timeNow() time.Time {
return now.Load().(func() time.Time)()
}

// Server is a fake Pub/Sub server.
type Server struct {
srv *testutil.Server
Expand All @@ -95,6 +80,8 @@ type GServer struct {
pb.UnimplementedSubscriberServer
pb.UnimplementedSchemaServiceServer

timeNowFunc atomic.Value

mu sync.Mutex
topics map[string]*topic
subs map[string]*subscription
Expand All @@ -103,7 +90,6 @@ type GServer struct {
wg sync.WaitGroup
nextID int
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
Expand Down Expand Up @@ -139,13 +125,13 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
topics: map[string]*topic{},
subs: map[string]*subscription{},
msgsByID: map[string]*Message{},
timeNowFunc: timeNow,
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string][]*pb.Schema{},
},
}
s.GServer.timeNowFunc.Store(time.Now)
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer)
Expand All @@ -156,7 +142,11 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
// SetTimeNowFunc registers f as a function to
// be used instead of time.Now for this server.
func (s *Server) SetTimeNowFunc(f func() time.Time) {
s.GServer.timeNowFunc = f
s.GServer.timeNowFunc.Store(f)
}

func (s *GServer) now() time.Time {
return s.timeNowFunc.Load().(func() time.Time)()
}

// Publish behaves as if the Publish RPC was called with a message with the given
Expand Down Expand Up @@ -501,7 +491,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
deadLetterTopic = dlTopic
}

sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps)
sub := newSubscription(top, &s.mu, s.now, deadLetterTopic, ps)
top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -737,7 +727,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis
id := fmt.Sprintf("m%d", s.nextID)
s.nextID++
pm.MessageId = id
pubTime := s.timeNowFunc()
pubTime := s.now()
tsPubTime := timestamppb.New(pubTime)
pm.PublishTime = tsPubTime
m := &Message{
Expand Down Expand Up @@ -1131,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (
return 0, false
}

var retentionDuration = 10 * time.Minute
const retentionDuration = 10 * time.Minute

// Must be called with the lock held.
func (s *subscription) maintainMessages(now time.Time) {
Expand All @@ -1143,7 +1133,6 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
delete(s.msgs, id)
}
}
Expand Down
68 changes: 49 additions & 19 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,10 @@ func TestClearMessages(t *testing.T) {
}

// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".
func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
func publish(t *testing.T, srv *Server, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
pubTime := time.Now()
now.Store(func() time.Time { return pubTime })
defer func() { now.Store(time.Now) }()
srv.SetTimeNowFunc(func() time.Time { return pubTime })
defer srv.SetTimeNowFunc(time.Now)

res, err := pclient.Publish(context.Background(), &pb.PublishRequest{
Topic: topic.Name,
Expand All @@ -517,7 +517,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages
}

func TestPull(t *testing.T) {
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -527,7 +527,7 @@ func TestPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -548,7 +548,7 @@ func TestPull(t *testing.T) {

func TestStreamingPull(t *testing.T) {
// A simple test of streaming pull.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -558,7 +558,7 @@ func TestStreamingPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -572,7 +572,7 @@ func TestStreamingPull(t *testing.T) {
// This test acks each message as it arrives and makes sure we don't see dups.
func TestStreamingPullAck(t *testing.T) {
minAckDeadlineSecs = 1
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -582,7 +582,7 @@ func TestStreamingPullAck(t *testing.T) {
AckDeadlineSeconds: 1,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestAcknowledge(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestAcknowledge(t *testing.T) {

func TestModAck(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -672,7 +672,7 @@ func TestModAck(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -698,7 +698,7 @@ func TestModAck(t *testing.T) {

func TestAckDeadline(t *testing.T) {
// Messages should be resent after they expire.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

minAckDeadlineSecs = 2
Expand All @@ -709,7 +709,7 @@ func TestAckDeadline(t *testing.T) {
AckDeadlineSeconds: minAckDeadlineSecs,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -745,7 +745,7 @@ func TestAckDeadline(t *testing.T) {

func TestMultiSubs(t *testing.T) {
// Each subscription gets every message.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -760,7 +760,7 @@ func TestMultiSubs(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -782,7 +782,7 @@ func TestMultiSubs(t *testing.T) {
func TestMultiStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pclient, sclient, _, cleanup := newFake(ctx, t)
pclient, sclient, srv, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand Down Expand Up @@ -813,7 +813,7 @@ func TestMultiStreams(t *testing.T) {
close(st2Received)
}()

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
})
Expand Down Expand Up @@ -941,7 +941,7 @@ func TestModAck_Race(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, server, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -1599,3 +1599,33 @@ func TestSubscriptionMessageOrdering(t *testing.T) {
ids = ids[len(pull.ReceivedMessages):]
}
}

func TestSubscriptionRetention(t *testing.T) {
// Check that subscriptions with undelivered messages past the
// retention deadline do not trigger a panic.

ctx := context.Background()
s := NewServer()
defer s.Close()

start := time.Now()
s.SetTimeNowFunc(func() time.Time { return start })

const topicName = "projects/p/topics/t"
top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName})
if err != nil {
t.Fatal(err)
}
if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
s.Publish(topicName, []byte("payload"), nil)

s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) })
time.Sleep(1 * time.Second)
}

0 comments on commit 98dd29d

Please sign in to comment.