Add RabbitMQ and go-cron to your DigitalOcean droplet [Part 2]
(This is part 2 of a 3 part series. You can read the first part here)
Let’s start configuring RabbitMQ on our server. For this, you need to SSH into your server. I advise you to create a new admin user while doing this. Using the root user is fine if you know what you are doing. If you are unsure, you will find some useful links in the references section.
RabbitMQ usually comes installed with your DO depending on how up-to-date your Linux distribution is.
You can confirm this by running the status
command and checking for the output
$ sudo rabbitmqctl status | grep rabbit
.
.
.
{rabbit,"RabbitMQ",....}
If RabbitMQ is not installed, simply run:
$ sudo apt-get install rabbitmq-server
Now that RabbitMQ is installed, we need to create a user before establishing a connection. We will use RabbitMQ’s command line management tool rabbitmqctl
for this:
rabbitmqctl add_user <USERNAME> <PASSWORD>
Using the same tool, give admin rights to your user:
rabbitmqctl set_user_tags <USERNAME> administrator
In this last step, we will give permissions to our admin user. RabbitMQ permissions are classified as configuration, read and write permissions. For this application, it is tolerable to grant all, but this is a security risk in a production application.
rabbitmqctl set_permissions -p / <USERNAME> ".*" ".*" ".*"
Now that we created our user, let’s save the connection string in our applications environment variables. We will be using .env
file for this. Add the below line to your .env
file:
AMQP_SERVER_URL=amqp://<USERNAME>:<PASSWORD>@localhost:5672/
See the amqp
prefix? It stands for “The Advanced Message Queuing Protocol”. This is the default protocol that is used by RabbitMQ. We will be using this with amqp client library for go.
Before we implement the enqueue functionality, let’s confirm if we can check our queues:
$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
We are expecting no queues listed by this command yet since we have not created any.
We will be creating enqueue function that handles all message production process. You can add enqueue()
function call to one of your API controllers if you want to test this with your server. I am using POST /fallback/*
endpoint to test this.
func enqueue() error {
amqpServerURL := os.Getenv("AMQP_SERVER_URL")
connectRabbitMQ, err := amqp.Dial(amqpServerURL)
if err != nil {
panic(err)
}
defer connectRabbitMQ.Close()
channelRabbitMQ, err := connectRabbitMQ.Channel()
if err != nil {
panic(err)
}
defer channelRabbitMQ.Close()
_, err = channelRabbitMQ.QueueDeclare(
"FallbackAPIQueue",
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
message := amqp.Publishing{
ContentType: "text/plain",
Body: []byte("A request has been sent via fallback API"),
}
if err := channelRabbitMQ.Publish(
"",
"FallbackAPIQueue",
false,
false,
message,
); err != nil {
return err
}
return nil
}
Let’s break down the key parts of the function that sends messages to "FallbackAPIQueue"
.
On line 2, we retrieve our connection URL AMQP_SERVER_URL
from environment variables.
Next, we establish a connection using amqp.Dial()
. We are not done yet. With RabbitMQ, most of the operations are handled over channels. We can retrieve a channel using Channel()
on our connection (on line 8)
Lastly, we can defer Close()
functions on the connection and channel to make sure the connection is closed properly after we submit our message to the queue.
On line 13, we declare the queue to which we want to submit messages. For this, we use AMQP’s [func (*Channel)[QueueDeclare]](https://github.com/streadway/amqp/blob/v1.0.0/channel.go#L751)
function.
_, err = channelRabbitMQ.QueueDeclare(
"FallbackAPIQueue",
true, // durable
false, // auto delete
false, // exclusive(only declaring channel can access the queue)
false, // no wait
nil, // table
)
This function creates the queue with the given name if it is not already present.
On line 24, we define our message using amqp.Publishing
. We will be using a simple plain text message.
message := amqp.Publishing{
ContentType: "text/plain",
Body: []byte("A request has been sent via fallback API"),
}
Lastly, we can publish our message calling Publish
on the channel.
channelRabbitMQ.Publish(
"", // exchange
"FallbackAPIQueue", // Queue
false, // mandatory
false, // immediate
message,
);
When the function is triggered the messages will be enqueued to our "FallbackAPIQueue"
We can see the queue and the number of messages by using the same command:
➜ ~ sudo rabbitmqctl list_queues
Password:
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
FallbackAPIQueue 3
With this, we now completed message publishing mechanism 🚀
In the next chapter, we will look into consuming these messages on a daily basis using go-cron
package.