Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel Notify Close not receive event when connection is closed by RMQ server. #241

Closed
ngochd7 opened this issue Jan 28, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@ngochd7
Copy link

ngochd7 commented Jan 28, 2024

Describe the bug

I'm setting up a special case: the client retains the message without processing longer than the server's maximum wait time.
When the timeout expires, the server will display the error: 'delivery acknowledgment on channel 1 timed out' >> Immediately the Connection was closed but Channel Notify Close on the client did not receive any events.

Is this an error? please check it for me

Reproduction steps

  1. Setup Rabbitmq docker, and set consumer_timeout 10s with command 'rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 10000).'
  2. Create Consumer Client and set the time processing for each message is ~ 20s
  3. Start Consumer
  4. Open Rabbitmq dashboard and publish message to the queue manually.
  5. Wait for RMQ server log err

Expected behavior

Channel Notify Close of Connection returns an error when Connection is closed by server.

Additional context

Code sample

package main

import (
	"fmt"
	"sync"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

// start docker: docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management"
// setup consumer_timeout: rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 10000).'

func main() {
	rmq, _ := NewRabbitMQ("amqp://guest:guest@localhost:5672/")

	err := rmq.StartConsumeMessage()
	if err != nil {
		fmt.Printf("error start rabbitmq: %v\n", err)
	}

	_ = rmq.Close()
}

type rabbitMQ struct {
	uri                 string
	conn                *amqp.Connection
	connCloseNotifyChan chan *amqp.Error
}

func NewRabbitMQ(uri string) (*rabbitMQ, error) {
	return &rabbitMQ{
		uri: uri,
	}, nil
}

func (r *rabbitMQ) Connect() error {
	conn, err := amqp.Dial(r.uri)
	if err != nil {
		return fmt.Errorf("error dial rabbitmq: %w", err)
	}

	r.conn = conn

	r.connCloseNotifyChan = make(chan *amqp.Error, 1)
	r.conn.NotifyClose(r.connCloseNotifyChan) // note: this channel will be closed when connection closed >> we need to create new channel when reconnect

	return nil
}

func (r *rabbitMQ) StartConsumeMessage() error {
	fmt.Println("Start rabbitmq")
	err := r.Connect()
	if err != nil {
		return fmt.Errorf("error connect rabbitmq: %w", err)
	}

	// thread to listen connection close
	go func() {
		fmt.Println("Start thread listen connection close")
		defer fmt.Println("Stop thread listen connection close")
		for {
			select {
			case err := <-r.connCloseNotifyChan:
				if err != nil {
					fmt.Printf("Received connection event: err  : %v - %v\n", err.Code, err.Reason)
				}
			}
		}
	}()

	ch, err := r.conn.Channel()
	if err != nil {
		return fmt.Errorf("error create channel: %v\n", err)
	}
	fmt.Println("DEBUG: create channel success")

	// create queue

	_, err = ch.QueueDeclare(
		"test_queue",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return fmt.Errorf("error declare queue: %w", err)
	}

	// set qos
	err = ch.Qos(1, 0, false)
	if err != nil {
		return fmt.Errorf("error set qos: %w", err)
	}

	internalQueue := make(chan amqp.Delivery, 100)
	wg := sync.WaitGroup{}
	wg.Add(2)

	// thread consume message
	go func() {
		defer wg.Done()
		defer close(internalQueue)
		defer func() {
			fmt.Printf("Stop thread consume message: Sleep 20s \n")
			time.Sleep(time.Second * 20)
		}()

		q, err := ch.QueueDeclare(
			"test_queue",
			false,
			false,
			false,
			false,
			nil,
		)
		if err != nil {
			fmt.Printf("error declare queue: %w", err)
		}

		deliveries, err := ch.Consume(
			q.Name,
			"",
			false,
			false,
			false,
			false,
			nil,
		)
		if err != nil {
			fmt.Printf("error consume message: %w", err)
			return
		}

		for msg := range deliveries {
			internalQueue <- msg
		}

	}()

	// thread process message
	go func() {
		defer wg.Done()
		defer func() {
			fmt.Printf("Stop thread handle message: Sleep 20s \n")
			time.Sleep(time.Second * 20)
		}()

		for msg := range internalQueue {
			fmt.Println("start", string(msg.Body))
			time.Sleep(time.Second * 20)
			fmt.Println("done", string(msg.Body))
			err := msg.Ack(false)
			if err != nil {
				fmt.Printf("error ack message: %v\n", err)
			}
		}
	}()

	wg.Wait()

	return nil
}

func (r *rabbitMQ) Close() error {
	return r.conn.Close()
}
@ngochd7 ngochd7 added the bug Something isn't working label Jan 28, 2024
@ngochd7 ngochd7 closed this as completed Jan 30, 2024
@ngochd7
Copy link
Author

ngochd7 commented Jan 30, 2024

Channel closed but connection is not closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant