Add RabbitMQ and go-cron to your DigitalOcean droplet [Part 3]
(This is the last part of a 3 part series. You can read the first part from here)
In the previous chapter, we built our message producer. The enqueue
function is invoked when our application server receives traffic and publishes a message to RabbitMQ. Depending on your configuration you can call enqueue()
function call elsewhere.
Next, we will build our cron-job to run on a per hours/minute basis(depending on which you prefer) and check messages in our queue. To keep things simple, when a message is consumed we will send a dummy notification email using Mailgun’s go package.
(You can find the repo of the simple consumer from here)
func main() {
err := godotenv.Load(".env")
if err != nil {
log.Fatalf("Error loading .env file")
}
gocron.Every(2).Minutes().Do(processMessages)
gocron.Start()
}
At a high level, the worker will run the background and every 2 minutes it will execute processMessages
function. We will talk about setting up this worker to run as a systemd
service later.
Similar to the previous example where we create a Connection
and a Channel
to be able to execute tasks on our queue.
amqpServerURL := os.Getenv("AMQP_SERVER_URL")
connectRabbitMQ, err := amqp.Dial(amqpServerURL)
if err != nil {
panic(err)
}
defer connectRabbitMQ.Close()
channelRabbitMQ, err := connectRabbitMQ.Channel()
if err != nil {
panic(err)
}
defer channelRabbitMQ.Close()
We will be using Consume()
function on our channel to retrieve the messages. You can see the comments for the options. To keep things simple we will automatically acknowledge the messages as we consume them.
messages, err := channelRabbitMQ.Consume(
"FallbackAPIQueue",
"",
true, // auto acknowledge
false, // exclusive
false, // no local
false, // no wait
nil,
)
if err != nil {
log.Println(err)
}
log.Println("You've connected to RabbitMQ")
log.Println("Waiting for messages")
if len(messages) == 0 {
log.Println("You don't have any messages in the queue")
}
for message := range messages {
if string(message.Body) == "A request has been sent via fallback API" {
sendSimpleMessage()
}
}
Lastly, for each message we consume, we will call sendSimpleMessage()
Using Mailgun’s go client library mailgun-go
we can send simple emails. This will require you to have an account with Mailgun. You will need your Mailgun domain and your API key saved in your environment variables.
func sendSimpleMessage() {
api_key := os.Getenv("MAILGUN_API_KEY")
sandbox_domain := os.Getenv("MAILGUN_DOMAIN")
mg := mailgun.NewMailgun(sandbox_domain, api_key)
sender := "[email protected]"
subject := "security warning"
body := "Your fallback api is exposed"
recipient := "[email protected]"
message := mg.NewMessage(sender, subject, body, recipient)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
resp, id, err := mg.Send(ctx, message)
if err != nil {
log.Fatal(err)
}
fmt.Printf("ID: %s Resp: %s\n", id, resp)
}
Now, let’s configure systemd
to run this application as a service.
Systemd is an initialisation system and system manager that comes with most of the Linux distributions. It comes with its management tool systemctl
If you want to learn more about systemd
or if you are new to it, you can find a link below in the resources. Let’s get back to creating our service.
First, we will create service file in /lib/systemd/system/simple_consumer.service
$ sudo vim /lib/systemd/system/simple_consumer.service
Next, we will add the following configuration options in our simple_consumer.service
:
[Unit]
Description=Simple Consumer
ConditionPathExists=/home/asungur/go/src/simple_consumer
Requires=postgresql.service
After=network.target
After=postgresql.service
[Service]
Type=simple
Restart=always
RestartSec=10
WorkingDirectory=/home/asungur/go/src/simple_consumer
ExecStart=/home/asungur/go/src/simple_api_postgres/simple_consumer
[Install]
WantedBy=multi-user.target
Before we move on, let’s talk about some of these configuration steps:
- On line 3,
ConditionPathExists
is required to make sure that our application directory is present in our server before it starts running After
on line 6, tellssystemd
to wait untilpostgresql
service is started. This is required, because I’m integrating this to my simple API server. Depending on how you are building your demo, this might not be required.- On line 12, we specify
WorkingDirectory
. This is important to retrieve our environment variables from our.env
file. - Lastly,
ExecStart
specifies the location of our executable file
Before we enable our system, we need to give relevant permissions to run our application:
$ sudo chmod +x /opt/appDir/simple_consumer
Now using systemctl
we can first enable the service and start it. We can check the status of the application using the last command
$ sudo systemctl enable simple_consumer # Enable the service
$ sudo systemctl start simple_consumer # Start the service
$ sudo systemctl status simple_consumer # Check service status
Now lets trigger enqueue a couple of times and see our queue before and after the cron-job is run:
$ sudo rabbitmqctl list_queues
In our inbox we should have an email for each message that are dequeued from our queue.