August 2, 2020

A robust RabbitMQ client in Go

A few months ago I started working on a project that heavily relies on RabbitMQ as a message broker. We have two clients communicating with Go, one via AMQP (RabbitMQ) and the other through HTTP. As our dependency on RabbitMQ is big, I had to write a robust client that does graceful shutdowns, panic recoveries, is multithreaded, logs everything nicely, and more.

10th October 2020: Fixed reconnection logic in example

Modern cloud applications tend to be decoupled into smaller, independent services (microservice) that, compared to classic/monolith applications, are easier to develop, deploy, and maintains. Message queues provide communication and coordination for these distributed applications. They can significantly simplify the coding of decoupled applications while improving performance, reliability, and scalability. Message queues provide asynchronous communication, allowing different services to produce and consume messages with the queue, not each other. New messages can be added to the queue without waiting for them to be processed. And consumers only process messages when they are available. This optimizes the data flow by removing the need for services to wait for each other.

RabbitMQ is the most widely deployed open-source message broker. RabbitMQ is lightweight and easy to deploy on-premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Go doesn’t have an official rabbitmq library. Instead there’s a very popular one maintained by Sean Treadway, located under streadway/amqp. While it does provide some examples, and there are plenty of tutorials explaining how to use it for a basic task, this one wraps it all up and explains how to build a robust rabbitmq consumer/publisher with Go. Honorable mention goes to harrisonturton’s Gist, where a big part of this code originates from.

The code is available as a Gist HERE.

Custom errors needed for proper handling of failures:

var (
	ErrDisconnected             = errors.New("disconnected from rabbitmq, trying to reconnect")
)

The client which holds all the methods needed for streaming/pushing and connecting to rabbitmq has the following fields.

// Client holds necessery information for rabbitMQ
type Client struct {
    pushQueue     string
    logger        zerolog.Logger
    connection    *amqp.Connection
    channel       *amqp.Channel
    done          chan os.Signal
    notifyClose   chan *amqp.Error
    notifyConfirm chan amqp.Confirmation
    isConnected   bool
    alive         bool
    threads       int
    wg            *sync.WaitGroup
}
  • The pushQueue is the name of queue client will push to. Instead of storing it in the client, it could be passed as an argument to Push method.
  • Logger is not mandatory and could be replaced with any other (structured) logger like Zap/Logrus.
  • Connection and Channel are AMQP connection and channel. More info is available in the linked GoDoc pages.
  • The done channel is triggered once the server has received the shutdown signal, which will stop the client from trying to reconnect to RabbitMQ server.
  • Notify close tells the reconnect method that the connections closed and the client needs to reconnect
  • NotifyConfirm is used to acknowledge that the data was pushed to the server
  • The wg waitGroup is used to gracefully shutdown the server once all messages are processed.
const (
    // When reconnecting to the server after connection failure
    reconnectDelay = 5 * time.Second
)

The New function is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don’t exist.

func New(listenQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client {
    threads := runtime.GOMAXPROCS(0)
    if numCPU := runtime.NumCPU(); numCPU > threads {
        threads = numCPU
    }

    client := Client{
        logger:       l,
        threads:      threads,
        pushQueue:    pushQueue,
        done:         done,
        alive:        true,
        wg:           &sync.WaitGroup{},
    }
    client.wg.Add(threads)

    go client.handleReconnect(addr)
    return &client
}

// handleReconnect will wait for a connection error on
// notifyClose, and then continuously attempt to reconnect.
func (c *Client) handleReconnect(listenQueue, addr string) {
    for c.alive {
        c.isConnected = false
        t := time.Now()
        fmt.Printf("Attempting to connect to rabbitMQ: %s\n", addr)
        var retryCount int
        for !c.connect(listenQueue, addr) {
            if !q.alive {
				return
			}
            select {
			case <-q.done:
				return
			case <-time.After(reconnectDelay + time.Duration(retryCount)*time.Second):
                c.logger.Printf("disconnected from rabbitMQ and failed to connect")
				retryCount++
			}
        }
        q.logger.Printf("Connected to rabbitMQ in: %vms", time.Since(t).Milliseconds())
        select {
        case <-c.done:
            return
        case <-c.notifyClose:
        }
    }
}
// connect will make a single attempt to connect to
// RabbitMq. It returns the success of the attempt.
func (c *Client) connect(listenQueue, addr string) bool {
    conn, err := amqp.Dial(addr)
    if err != nil {
        c.logger.Printf("failed to dial rabbitMQ server: %v", err)
        return false
    }
    ch, err := conn.Channel()
    if err != nil {
        c.logger.Printf("failed connecting to channel: %v", err)
        return false
    }
    ch.Confirm(false)
    _, err = ch.QueueDeclare(
        listenQueue,
        true,  // Durable
        false, // Delete when unused
        false, // Exclusive
        false, // No-wait
        nil,   // Arguments
    )
    if err != nil {
        c.logger.Printf("failed to declare listen queue: %v", err)
        return false
    }

    _, err = ch.QueueDeclare(
        c.pushQueue,
        true,  // Durable
        false, // Delete when unused
        false, // Exclusive
        false, // No-wait
        nil,   // Arguments
    )
    if err != nil {
        c.logger.Printf("failed to declare push queue: %v", err)
        return false
    }
    c.changeConnection(conn, ch)
    c.isConnected = true
    return true
}
// changeConnection takes a new connection to the queue,
// and updates the channel listeners to reflect this.
func (c *Client) changeConnection(connection *amqp.Connection, channel *amqp.Channel) {
    c.connection = connection
    c.channel = channel
    c.notifyClose = make(chan *amqp.Error)
    c.notifyConfirm = make(chan amqp.Confirmation)
    c.channel.NotifyClose(c.notifyClose)
    c.channel.NotifyPublish(c.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirmation.
// If no confirms are received until within the resendTimeout,
// it continuously resends messages until a confirmation is received.
// This will block until the server sends a confirm.
func (c *Client) Push(data []byte) error {
    if !c.isConnected {
        return errors.New("failed to push push: not connected")
    }
    for {
        err := c.UnsafePush(data)
        if err != nil {
            if err == ErrDisconnected {
                continue
            }
            return err
        }
        select {
        case confirm := <-c.notifyConfirm:
            if confirm.Ack {
                return nil
            }
        case <-time.After(resendDelay):
        }
    }
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (c *Client) UnsafePush(data []byte) error {
    if !c.isConnected {
        return ErrDisconnected
    }
    return c.channel.Publish(
        "",     // Exchange
        c.name, // Routing key
        false,  // Mandatory
        false,  // Immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        data,
        },
    )
}

The push method is quite simple. If you need to push to multiple/different queues which may often be the case, instead of providing the queue name in constructor - provide it as an argument to Push. The Push method is blocking until it succeeds, which may not be always what you’re looking for.

Stream creates threads number of goroutines that listen to messages from server, and process in in parseEvent method.

func (c *Client) Stream(cancelCtx context.Context) error {
    for {
        if c.isConnected {
            break
        }
        time.Sleep(1 * time.Second)
    }

    err := c.channel.Qos(1, 0, false)
    if err != nil {
        return err
    }

    var connectionDropped bool

    for i := 1; i <= c.threads; i++ {
        msgs, err := c.channel.Consume(
            c.streamQueue,
            consumerName(i), // Consumer
            false,           // Auto-Ack
            false,           // Exclusive
            false,           // No-local
            false,           // No-Wait
            nil,             // Args
        )
        if err != nil {
            return err
        }

        go func() {
            defer c.wg.Done()
            for {
                select {
                    case <-cancelCtx.Done():
                    	return
                    case msg, ok := <-msgs:
                    	if !ok {
                            connectionDropped = true
                            return
                    	}
                    	c.parseEvent(msg)
                    }
                }
        }()

    }

    c.wg.Wait()

    if connectionDropped {
        return ErrDisconnected
    }

    return nil
}

The called to stream method looks something like:

go func() {
	for {
		err = rmq.Stream(cancelCtx)
		if errors.Is(err, rabbitmq.ErrDisconnected) {
			continue
		}
		break
	}
}()

A stripped-down version of event received from the stream. It can include any data you need.

type event struct{
    Job string `json:"job"`
    Data string `json:"data"`
}

ParseEvent is responsible for the actual application logic. It does things depending on the job received in the event, and calls other services/interfaces passed as arguments to Queue to do the actual job. Panic recovery is responsible for nACKing the message and logging the panic.

func (c *Client) parseEvent(msg amqp.Delivery) {
    l := c.logger.Log().Timestamp()
    startTime := time.Now()

    var evt event
    err := json.Unmarshal(msg.Body, &evt)
    if err != nil {
        logAndNack(msg, l, startTime, "unmarshalling body: %s - %s", string(msg.Body), err.Error())
        return
    }

    if evt.Data == "" {
        logAndNack(msg, l, startTime, "received event without data")
        return
    }

    defer func(ctx context.Context, e event, m amqp.Delivery, logger *zerolog.Event) {
        if err := recover(); err != nil {
            stack := make([]byte, 8096)
            stack = stack[:runtime.Stack(stack, false)]
            l.Bytes("stack", stack).Str("level", "fatal").Interface("error", err).Msg("panic recovery for rabbitMQ message")
            msg.Nack(false, false)
        }
    }(ctx, evt, msg, l)

    switch evt.Job {
    case "job1":
        // Call an actual function
        err = func()
    case "job1":
        err = func()
    default:
        msg.Reject(false)
        return
    }

    if err != nil {
        logAndNack(msg, l, startTime, err.Error())
        return
    }

    l.Str("level", "info").Int64("took-ms", time.Since(startTime).Milliseconds()).Msgf("%s succeeded" evt.Job)
    msg.Ack(false)
}

Log and nack function is used to send negative acknowledgment to RabbitMQ and log the error

func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) {
    msg.Nack(false, false)
    l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...))
}

// Close will cleanly shutdown the channel and connection after there are no messages in the system.
func (c *Client) Close() error {
    if !c.isConnected {
        return nil
    }
    c.alive = false
    fmt.Println("Waiting for current messages to be processed...")
    c.wg.Wait()
    for i := 1; i <= c.threads; i++ {
        fmt.Println("Closing consumer: ", i)
        err := c.channel.Cancel(consumerName(i), false)
        if err != nil {
            return fmt.Errorf("error canceling consumer %s: %v", consumerName(i), err)
        }
    }
    err := c.channel.Close()
    if err != nil {
        return err
    }
    err = c.connection.Close()
    if err != nil {
        return err
    }
    c.isConnected = false
    fmt.Println("gracefully stopped rabbitMQ connection")
    return nil
}

func consumerName(i int) string {
    return fmt.Sprintf("go-consumer-%v", i)
}

2024 © Emir Ribic - Some rights reserved; please attribute properly and link back. Code snippets are MIT Licensed

Powered by Hugo & Kiss.