Skip to main content

Add RabbitMQ and go-cron to your DigitalOcean droplet [Part 3]

·4 mins

(This is the last part of a 3 part series. You can read the first part from here)

In the previous chapter, we built our message producer. The enqueue function is invoked when our application server receives traffic and publishes a message to RabbitMQ. Depending on your configuration you can call enqueue() function call elsewhere.

Next, we will build our cron-job to run on a per hours/minute basis(depending on which you prefer) and check messages in our queue. To keep things simple, when a message is consumed we will send a dummy notification email using Mailgun’s go package.

(You can find the repo of the simple consumer from here)

func main() {
	err := godotenv.Load(".env")
	if err != nil {
		log.Fatalf("Error loading .env file")
	}

	gocron.Every(2).Minutes().Do(processMessages)
	gocron.Start()
}

At a high level, the worker will run the background and every 2 minutes it will execute processMessages function. We will talk about setting up this worker to run as a systemd service later.

Similar to the previous example where we create a Connection and a Channel to be able to execute tasks on our queue.

	amqpServerURL := os.Getenv("AMQP_SERVER_URL")
	connectRabbitMQ, err := amqp.Dial(amqpServerURL)
	if err != nil {
		panic(err)
	}
	defer connectRabbitMQ.Close()
	channelRabbitMQ, err := connectRabbitMQ.Channel()
	if err != nil {
		panic(err)
	}
	defer channelRabbitMQ.Close()

We will be using Consume() function on our channel to retrieve the messages. You can see the comments for the options. To keep things simple we will automatically acknowledge the messages as we consume them.


	messages, err := channelRabbitMQ.Consume(
		"FallbackAPIQueue",
		"",
		true,                      // auto acknowledge
		false,                     // exclusive
		false,                     // no local
		false,                     // no wait
		nil,
	)
	if err != nil {
		log.Println(err)
	}
	log.Println("You've connected to RabbitMQ")
	log.Println("Waiting for messages")

	if len(messages) == 0 {
		log.Println("You don't have any messages in the queue")
	}
	for message := range messages {
		if string(message.Body) == "A request has been sent via fallback API" {
			sendSimpleMessage()
		}
	}

Lastly, for each message we consume, we will call sendSimpleMessage()

Using Mailgun’s go client library mailgun-go we can send simple emails. This will require you to have an account with Mailgun. You will need your Mailgun domain and your API key saved in your environment variables.

func sendSimpleMessage() {
	api_key := os.Getenv("MAILGUN_API_KEY")
	sandbox_domain := os.Getenv("MAILGUN_DOMAIN")

	mg := mailgun.NewMailgun(sandbox_domain, api_key)
	sender := "[email protected]"
	subject := "security warning"
	body := "Your fallback api is exposed"
	recipient := "[email protected]"
	message := mg.NewMessage(sender, subject, body, recipient)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	resp, id, err := mg.Send(ctx, message)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("ID: %s Resp: %s\n", id, resp)
}

Now, let’s configure systemd to run this application as a service.

Systemd is an initialisation system and system manager that comes with most of the Linux distributions. It comes with its management tool systemctl

If you want to learn more about systemd or if you are new to it, you can find a link below in the resources. Let’s get back to creating our service.

First, we will create service file in /lib/systemd/system/simple_consumer.service

$ sudo vim /lib/systemd/system/simple_consumer.service

Next, we will add the following configuration options in our simple_consumer.service :

[Unit]
Description=Simple Consumer
ConditionPathExists=/home/asungur/go/src/simple_consumer
Requires=postgresql.service
After=network.target
After=postgresql.service

[Service]
Type=simple
Restart=always
RestartSec=10
WorkingDirectory=/home/asungur/go/src/simple_consumer
ExecStart=/home/asungur/go/src/simple_api_postgres/simple_consumer

[Install]
WantedBy=multi-user.target

Before we move on, let’s talk about some of these configuration steps:

  1. On line 3, ConditionPathExists is required to make sure that our application directory is present in our server before it starts running
  2. After on line 6, tells systemd to wait until postgresql service is started. This is required, because I’m integrating this to my simple API server. Depending on how you are building your demo, this might not be required.
  3. On line 12, we specify WorkingDirectory . This is important to retrieve our environment variables from our .env file.
  4. Lastly, ExecStart specifies the location of our executable file

Before we enable our system, we need to give relevant permissions to run our application:

$ sudo chmod +x /opt/appDir/simple_consumer

Now using systemctl we can first enable the service and start it. We can check the status of the application using the last command

$ sudo systemctl enable simple_consumer      # Enable the service
$ sudo systemctl start simple_consumer       # Start the service
$ sudo systemctl status simple_consumer      # Check service status

Now lets trigger enqueue a couple of times and see our queue before and after the cron-job is run:

$ sudo rabbitmqctl list_queues

In our inbox we should have an email for each message that are dequeued from our queue.