Introducción a la programación asíncrona

Anteriormente estuvimos creando un plan de contingencia con AWS S3 para realizar backups de nuestros proyectos usando crontab y aprendimos a planificar las copias de seguridad para que se ejecuten en un momento determinado. Ahora bien, en contraposición a este caso, ¿sabríamos cómo ejecutar tareas en segundo plano o background, no periódicas y que dependan del flujo de nuestra aplicación?

Introducción

El desarrollo de software se basa principalmente en dos metodologías:

  • Programación síncrona, donde el código de cada instrucción se ejecuta una vez haya terminado la ejecución de la instrucción inmediatamente anterior (si la hubiera).
  • Programación asíncrona, donde el código de una instrucción se ejecuta con independencia de las instrucciones anteriores (si las hubieran).

Así pues un desarrollo mediante programación síncrona ofrece algunas ventajas para el desarrollador ya que facilita la labor de depuración o realización de trazas, puesto que todo el flujo de la aplicación se desarrolla en un mismo proceso y de manera secuencial. Por contra, puede favorecer la aparición de códigos bloqueantes o cuellos de botella donde una tarea centraliza los recursos y paraliza el flujo de la aplicación.

En cambio, la programación asíncrona permite que varias tareas se realicen en paralelo, lo cual posibilita realizar tareas secundarias en segundo plano sin tener que bloquear el flujo principal de la aplicación pero, por contra, hace mucho más compleja la labor de depuración y de trazabilidad ya que el flujo ahora no es secuencial y las tareas finalizan sin un orden preestablecido.

Para realizar aplicaciones asíncronas en PHP existe varias alternativas a nuestra disposición:

  • Usando threads
  • Usando sockets
  • Usando coroutines mediante algún framework asíncrono (Swoole, ReactPHP, Amp, etc.)
  • Usando una cola de mensajes
  • ...

Dentro de los enfoques posibles, veremos cómo sacar provecho a Redis y su soporte PUB/SUB para disparar un evento y realizar un proceso en background.

¿Qué es PUB/SUB?

PUB/SUB es un protocolo de comunicación acrónimo de publicación y subscripción (PUBLISH/SUBSCRIBE) y permite desacoplar servicios que producen eventos de los servicios que procesan eventos. Es decir, es un sistema que permite publicar un mensaje en un canal y éste es inmediatamente recibido por todos los subscriptores de dicho canal, permitiendo así desarrollar event-driven architectures o bien, desacoplar aplicaciones y así mejorar su rendimiento y escalabilidad al delegar las tareas más complejas o pesadas en background.

Un caso de uso típico de PUB/SUB es la realización de un chat donde se emite un mensaje en un canal y todos los subscritores del canal reciben dicho mensaje a la vez.

Ejemplo de adaptador PUB/SUB con Redis y PHP

Análisis preliminar

Nuestro pequeño proyecto estará formado por los siguientes actores:

  • Definir cómo serán los mensajes que vamos a transmitir, qué propiedades tendrán, con qué estructura, etc.
  • Necesitamos un script que actuará como subscriptor y que estará conectado en todo momento a un canal de Redis para comprobar si se publican nuevos mensajes.
  • Necesitamos un script que actuará como publicador de mensajes y que nos permitirá publicar mensajes en dicho canal.
  • Necesitamos definir qué tareas serán ejecutadas cuando un determinado mensaje se recibe en dicho canal.

Para conectarnos a Redis haremos uso del cliente de conexión Predis y como todo este proceso es más dificil de depurar y de hacerle seguimiento, haremos uso de Monolog para llevar un log de todo el proceso y volcarlo a la salida estandard.

LLegados a este punto ya estamos listos para comenzar con el desarrollo. Comencemos...

Mensaje

Un mensaje es una estructura que tiene una serie de atributos y propiedades específicas e inmutables que actuará a modo de contrato y que representa un evento o acción.

En nuestro caso los mensajes tendrán los siguientes atributos:

  • Nombre. Nombre de nuestro mensaje o evento, generalmente asociado a alguna entidad y representa una acción sobre la misma. Por ejemplo: image:add-watermark
  • UUID. Identificador único universal basado en marcas de tiempo. Se autogenera en el momento de creación del mensaje. Permite hacer un seguimiento de los eventos y replicar el proceso en el mismo órden en que fueron ejecutados. Por ejemplo: 123e4567-e89b-12d3-a456-426614174000
  • Timestamp. Marca de tiempo. Se autogenera en el momento de creación del mensaje. Nos permite visualizar las marcas de tiempo en un formato más amigable. Por ejemplo: 2021-04-22T05:24:56.111845+00:00
  • Properties. Array con todas aquellas propiedades adicionales relevantes para complementar el mensaje. Puede ser el nombre de la entidad, el identificador de registro... Por ejemplo: ["path" => "/tmp/fqw3r23", ...]
¿Cómo crearemos los mensajes?

Se pueden crear mensajes de las siguientes maneras:

// Mediante el constructor
$message1 = new Message('demo:event', ['id' => 123456]);

// Mediante el uso del método mágico __set() 
$message2 = new Message('demo:event');
$message2->id = 123456;

print_r($message1);

App\Models\Message Object
(
    [name] => demo:event
    [uuid] => f877a3b8-ecf5-42c2-9758-8cfa7fde1957
    [properties] => Array
        (
            [id] => 12345
        )

    [timestamp] => 1619073155
)

print_r($message2);

App\Models\Message Object
(
    [name] => demo:event
    [uuid] => 6d7e2b17-683c-4d22-bfd0-59eae58112f1
    [properties] => Array
        (
            [id] => 12345
        )

    [timestamp] => 1619073175
)

En este ejemplo $message1 y $message2 son equivalentes que no iguales pues tienen marcas de tiempo diferentes, lo que quiere decir que se ejecutó la misma órden en dos momentos temporales diferentes.

¿Cómo leer nuestros mensajes?
echo 'Mensaje: '. $message->getName() . PHP_EOL;
echo 'UUID: '. $message->getUUID() . PHP_EOL;
echo 'Timestamp como valor entero: '. $message->getTimestamp() . PHP_EOL;
echo 'Timestamp con formato: '. $message->getTimestamp('Y-m-d H:i:s') . PHP_EOL;

echo 'Mensaje como array: '. PHP_EOL;
print_r($message->toArray());

echo 'Mensaje como objeto: '. PHP_EOL;
print_r($message->toObject());

echo 'Mensaje como string: '. $message;

echo 'Propiedad ID del mensaje: '. $message->id . PHP_EOL;

echo 'Propiedades del mensaje como array: '. PHP_EOL;
print_r($message->toArray()['properties']);
Código fuente

Para simplificar la gestión de las propiedades adicinales del mensaje y para facilitar la depuración de esta entidad se ha hecho uso de los métodos mágicos de PHP y debe considerarse esta implementación como un ejemplo teórico funcional.

Publisher

Veamos ahora cómo desarrollar nuestro script que publicará nuestros mensajes en Redis.

[...]

$channel1 = 'channel:default';
$channel2 = 'channel:logs';
//...

$message1 = new Message('demo:event1', ['id' => 12345]);
$message2 = new Message('demo:event2', ['id' => 98765]);
//...

(new Redis($client, $logger))
    ->withMessages($message1, $message2...)
    ->publish($channel1, $channel2...);

// Para simplificar podremos hacer uso del patrón factoría y así reducir algo de código...
(new Redis($client, $logger))
    ->addMessage('demo:event1', ['id' => 12345])
    ->addMessage('demo:event2', ['id' => 98765])
    ->publish($channel1, $channel2...);

Puedes conocer más sobre el Patrón Factoría en el fantástico recurso Refactoring Guru donde explican con detalle cada patrón de diseño con ejemplos y casos de uso de aplicación real.

Código resultante
<?php

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use PhpRedisPubSub\PhpRedisPubSub;
use Predis\Client;

$client = new Client(
    parameters: [
        'scheme'             => 'tcp',
        'host'               => 'redis',
        'port'               => 6379,
        'database'           => 0,
        'read_write_timeout' => 0,
    ],
    options: [
        'cluster' => 'redis',
        'prefix'  => 'redis_pubsub_database_',
    ],
);

$logger = (new Logger(
    name: 'php-redis-pubsub',
))->pushHandler(
    handler: new StreamHandler(
        stream: 'php://stdout',
        level: Logger::INFO,
    )
);

(new PhpRedisPubSub(client: $client, logger: $logger))
    ->addMessage('custom-event-name', ['id' => 9999])
    ->publish('channel:name1', 'channel:name2');
Subscriber

Veamos ahora cómo desarrollar nuestro script que se mantendrá escuchando en Redis a uno o varios canales:

[...]

function messageHandler(MessageInterface $message) 
{
    // Lógica de negocio a ejecutar cuando se recibe cada mensaje
}

(new Redis($client, $logger))
    ->withHandler(messageHandler(MessageInterface $message))
    ->subscribe($channel1, $channel2...);
Código resultante
<?php

require_once __DIR__ .'/../vendor/autoload.php';

use App\Handlers\DemoEventHandler;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use PhpRedisPubSub\Interfaces\Message as MessageInterface;
use PhpRedisPubSub\PhpRedisPubSub;
use Predis\Client;

$REGISTERED_EVENTS = [
    DemoEventHandler::eventName() => DemoEventHandler::class,
];

$client = new Client(
    parameters: [
        'scheme'             => 'tcp',
        'host'               => 'redis',
        'port'               => 6379,
        'database'           => 0,
        'read_write_timeout' => 0,
    ],
    options: [
        'cluster' => 'redis',
        'prefix'  => 'redis_pubsub_database_',
    ],
);

$logger = (new Logger(
    name: 'php-redis-pubsub',
))->pushHandler(
    handler: new StreamHandler(
        stream: 'php://stdout',
        level: Logger::INFO,
    )
);

(new PhpRedisPubSub(client: $client, logger: $logger))
    ->withHandler(function (MessageInterface $message) use ($logger, $REGISTERED_EVENTS) {
        // Prevent dealing with messages wrongly posted on this channel
        if (! array_key_exists($message->getName(), $REGISTERED_EVENTS)) {
            return;
        }

        $className = $REGISTERED_EVENTS[$message->getName()];

        // Invokable class
        (new $className(logger: $logger, message: $message))();
    })
    ->subscribe('channel:name1', 'channel:name2');

Poniéndolo en funcionamiento

Para hacerlo funcionar vamos a tener que jugar con dos terminales:

Terminal 1: Subscriber
php subscriber.php

// Vemos por consola...
[2021-04-22T11:51:06.966738+00:00] redis-pubsub.INFO: Successfully subscribed to channel {"channel":"channel:name1"} []
[2021-04-22T11:51:06.967366+00:00] redis-pubsub.INFO: Successfully subscribed to channel {"channel":"channel:name2"} []

Vemos como nuestro script de subscripción se ha subscrito a dos canales diferentes; esto es, se mantendrá a la espera de que llegue algún mensaje en dichos canales.

Terminal 2: Publisher
php publisher.php

// Vemos por consola...
[2021-04-22T11:52:25.912148+00:00] redis-pubsub.INFO: Successfully published to channel {"channel":"channel:name1","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []
[2021-04-22T11:52:25.912712+00:00] redis-pubsub.INFO: Successfully published to channel {"channel":"channel:name2","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []

Aquí podemos ver cómo hemos publicado un mensaje en dichos canales. Mismo mensaje pero en los canales definidos previamente.

Terminal 1: Subscriber
// Vemos por consola...
[2021-04-22T11:52:25.912299+00:00] redis-pubsub.INFO: DemoEventHandler has been fired! {"id":"9999"} []
[2021-04-22T11:52:25.912336+00:00] redis-pubsub.INFO: Message successfully received from channel {"channel":"pubsub_database_channel:name1","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []
[2021-04-22T11:52:25.912695+00:00] redis-pubsub.INFO: DemoEventHandler has been fired! {"id":"9999"} []
[2021-04-22T11:52:25.912724+00:00] redis-pubsub.INFO: Message successfully received from channel {"channel":"pubsub_database_channel:name2","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []

Aquí podemos ver cómo los eventos asociados al mensaje se han disparado y a posteriori, la confirmación de que los mensajes se han recibido por nuestro subscriptor.

Como vemos, la asincronizidad dificulta el seguimiento y flujo y puede provocar que los procesos finalicen en un órden inesperado. De ahí la importancia de llevar un log con marcas de tiempo únicos que permita ordenar los eventos y realizar una correcta trazabilidad.

Descarga

Puedes descargar el proyecto de ejemplo desde aquí.

Versión del documento

[^v1.0]: Última Modificación: 22/04/2021