Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call to Delivery.Ack blocks indefinitely in case of disconnection #19

Closed
corcus opened this issue Sep 20, 2021 · 4 comments
Closed

Call to Delivery.Ack blocks indefinitely in case of disconnection #19

corcus opened this issue Sep 20, 2021 · 4 comments

Comments

@corcus
Copy link

corcus commented Sep 20, 2021

Steps to reproduce

  1. Create a queue in the RabbitMQ server
  2. Call consume and start consuming messages
  3. While processing a message and BEFORE Delivery.Ack is called disconnect from the RabbitMQ server (for example turn off your internet)

The call to Delivery.Ack blocks indefinitely instead of returning an error.

@DanielePalaia
Copy link
Contributor

Hi, this issue is a bit old, are you still experimenting this problem? I'm trying to reproduce the issue but till now I wasn't really able to. I tried to create a sort of unit test, Consuming message and closing connection (I tried from the rabbitMQ UI, turning of internet and stopping the broker) while consuming and before Delivery.Ack is called, but Ack is returning an error as expected.
I will try to investigate in the code in the meantime.

This is the code I was testing:

func TestShouldNotWaitOnDeliveryAckAfterConnectionCloseIssue19(t *testing.T) {
	connPublish := integrationConnection(t, "TestShouldNotWaitOnDeliveryAckAfterConnectionCloseIssue19")
	chPublish, err := connPublish.Channel()
	if err != nil {
		t.Fatalf("channel error: %v", err)
	}

	exchangeName := "exchangeTestIssue19"
	queueName := "queueTestIssue19"
	nMsg := 10000

	if err := chPublish.ExchangeDeclare(exchangeName, "fanout", false, true, false, false, nil); err != nil {
		t.Fatalf("cannot declare %v: got: %v", exchangeName, err)
	}

	if _, err := chPublish.QueueDeclare(queueName, false, true, false, false, nil); err != nil {
		t.Fatalf("cannot declare %v: got: %v", queueName, err)
	}

	if err := chPublish.QueueBind(queueName, "#", exchangeName, false, nil); err != nil {
		t.Fatalf("cannot bind %v to %v: got: %v", queueName, exchangeName, err)
	}

	//Publish one element in
	for i := 0; i < nMsg; i++ {
		err = chPublish.Publish(exchangeName, "#", false, false, Publishing{Body: []byte("abc")})
		if err != nil {
			t.Fatalf("PublishWithDeferredConfirm error: %v", err)
		}
	}

	chPublish.Close()
	connPublish.Close()

	connConsume := integrationConnection(t, "TestShouldNotWaitOnDeliveryAckAfterConnectionCloseIssue19")
	chConsume, err := connConsume.Channel()
	if err != nil {
		t.Fatalf("channel error: %v", err)
	}

	msgs, err := chConsume.Consume(queueName, "#", false, false, false, false, nil)
	if err != nil {
		t.Fatalf("Could not consume")
	}

	i := 0

	for d := range msgs {
		i++
		t.Logf(
			"got %dB delivery: [%v] %q",
			len(d.Body),
			d.DeliveryTag,
			d.Body,
		)

		// Simulate a network issue (close connection from management-api, ecc...)
		err := d.Ack(true)

		if err == nil {
			t.Logf("Ack should return a network issue %v", err)
		}

	}

	t.Logf("ending %v", err)

}

@corcus
Copy link
Author

corcus commented Mar 21, 2022

Hey thanks for looking into this. It has been a long time and I don't remember exactly how this issue came up.
To the best of my memory it happened like this.

In the range loop consuming the messages there was some long running task (or a breakpoint during debugging)

for d := range msgs {
		i++
		t.Logf(
			"got %dB delivery: [%v] %q",
			len(d.Body),
			d.DeliveryTag,
			d.Body,
		)

		// Simulate a network issue (close connection from management-api, ecc...)
		err := d.Ack(true)

		if err == nil {
			t.Logf("Ack should return a network issue %v", err)
		}

	}

Then while the long running task is running or while the breakpoint is hit, disconnect from the internet and allow the long running task to complete (or move on from the breakpoint) to the Ack part.
That's how I remember the issue happening.

@DanielePalaia
Copy link
Contributor

HI @corcus I tried to put a sleep in place of breakpoint or to simulate an heavy load with a loop, then close the connection from UI, and continue with the Ack, but I wasn't really able to reproduce it. Let's close this one as now old, in case it will happen again to you please open a new one maybe with a trace if you are able to get it. Thanks!

@DanielePalaia
Copy link
Contributor

As at the moment not reproducible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants