-
Notifications
You must be signed in to change notification settings - Fork 0
High Availability Cron
In a high availability cluster, we may have a use case to run crons on a "master", i.e., only once for the whole cluster.
Some approaches are:
- designate one of the nodes as "master", e.g., ansible pushes our cron playbook only to this node; the downside of this approach is that if the "master" node goes down, we may need to manually re-push to a new "master"
- set-up a distributed cron; there are a variety of solutions, each of which requires analysis
In this example, we propose leveraging existing infrastructure (RabbitMQ). We will push the cron playbook to all nodes in the cluster. A process (e.g., daemon, supervisor worker, etc) on each node connects to a queue configured for single active consumer. As a result, only one consumer will be active. Cron jobs will be prefixed to check if the active consumer if running on the same node. If so, the cron job continues; otherwise the cron job terminates.
In this way, no node is explicitly designated as the master. If thr active consumer terminates, RabbitMQ automatically round-robins to another consumer.
References:
- RabbitMQ Single Active Consumer
Code snippet follows. Feel free to imagineer the rest.
$queueName = 'rpc_queue';
$client = new RpcClient($exchange);
$result = $client->call('gethostname', null, $queueName);
// if the single active consumer has the same hostname, then 0 (success); otherwise 1 (error)
$status = $result === gethostname() ? 0 : 1;
exit($status);
In the crontab, we use a logical AND (&&
) to make execution of subsequent commands conditional. Here, the first command is the above samehost
client.
0 16 1,15 1 0 samehost && cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1
The main change here is declare the queue as single active consumer.
<?php
// Single Active Consumer RPC server
require_once __DIR__ . '/vendor/autoload.php';
use VIPSoft\Amqp\RpcServer;
class SingleActiveConsumer
{
private $queue;
public function __construct()
{
$connection = new \AMQPConnection();
$connection->setHost('localhost');
$connection->setLogin('guest');
$connection->setPassword('guest');
$connection->connect();
$channel = new \AMQPChannel($connection);
$channel->setPrefetchCount(1);
$queueName = 'rpc_queue';
$this->queue = new \AMQPQueue($channel);
$this->queue->setName($queueName);
$this->queue->setArgument('x-single-active-consumer', true);
$this->queue->declareQueue();
}
public function dispatch()
{
$server = new RpcServer($this->queue);
$server->answer(
function ($from, $service, $arguments) {
switch ($service) {
case 'gethostname':
return gethostname();
default:
throw new \Exception('No such service');
}
}
);
}
}
$server = new SingleActiveConsumer;
$server->dispatch();