Alcides Ramos Cordero

Software Engineer

Introducción a la programación asíncrona

Anteriormente estuvimos creando un plan de contingencia con AWS S3 para realizar backups de nuestros proyectos usando crontab y aprendimos a planificar las copias de seguridad para que se ejecuten en un momento determinado. Ahora bien, en contraposición a este caso, ¿sabríamos cómo ejecutar tareas en segundo plano o background, no periódicas y que dependan del flujo de nuestra aplicación?

Introducción

El desarrollo de software se basa principalmente en dos metodologías:

Así pues un desarrollo mediante programación síncrona ofrece algunas ventajas para el desarrollador ya que facilita la labor de depuración o realización de trazas, puesto que todo el flujo de la aplicación se desarrolla en un mismo proceso y de manera secuencial. Por contra, puede favorecer la aparición de códigos bloqueantes o cuellos de botella donde una tarea centraliza los recursos y paraliza el flujo de la aplicación.

En cambio, la programación asíncrona permite que varias tareas se realicen en paralelo, lo cual posibilita realizar tareas secundarias en segundo plano sin tener que bloquear el flujo principal de la aplicación pero, por contra, hace mucho más compleja la labor de depuración y de trazabilidad ya que el flujo ahora no es secuencial y las tareas finalizan sin un orden preestablecido.

Para realizar aplicaciones asíncronas en PHP existe varias alternativas a nuestra disposición:

Dentro de los enfoques posibles, veremos cómo sacar provecho a Redis y su soporte PUB/SUB para disparar un evento y realizar un proceso en background.

¿Qué es PUB/SUB?

PUB/SUB es un protocolo de comunicación acrónimo de publicación y subscripción (PUBLISH/SUBSCRIBE) y permite desacoplar servicios que producen eventos de los servicios que procesan eventos. Es decir, es un sistema que permite publicar un mensaje en un canal y éste es inmediatamente recibido por todos los subscriptores de dicho canal, permitiendo así desarrollar event-driven architectures o bien, desacoplar aplicaciones y así mejorar su rendimiento y escalabilidad al delegar las tareas más complejas o pesadas en background.

Un caso de uso típico de PUB/SUB es la realización de un chat donde se emite un mensaje en un canal y todos los subscritores del canal reciben dicho mensaje a la vez.

Ejemplo de adaptador PUB/SUB con Redis y PHP

Análisis preliminar

Nuestro pequeño proyecto estará formado por los siguientes actores:

Para conectarnos a Redis haremos uso del cliente de conexión Predis y como todo este proceso es más dificil de depurar y de hacerle seguimiento, haremos uso de Monolog para llevar un log de todo el proceso y volcarlo a la salida estandard.

LLegados a este punto ya estamos listos para comenzar con el desarrollo. Comencemos...

Mensaje

Un mensaje es una estructura que tiene una serie de atributos y propiedades específicas e inmutables que actuará a modo de contrato y que representa un evento o acción.

En nuestro caso los mensajes tendrán los siguientes atributos:

¿Cómo crearemos los mensajes?

Se pueden crear mensajes de las siguientes maneras:

// Mediante el constructor
$message1 = new Message('demo:event', ['id' => 123456]);

// Mediante el uso del método mágico __set() 
$message2 = new Message('demo:event');
$message2->id = 123456;

print_r($message1);
 
App\Models\Message Object
(
    [name] => demo:event
    [uuid] => f877a3b8-ecf5-42c2-9758-8cfa7fde1957
    [properties] => Array
        (
            [id] => 12345
        )

    [timestamp] => 1619073155
)

print_r($message2);

App\Models\Message Object
(
    [name] => demo:event
    [uuid] => 6d7e2b17-683c-4d22-bfd0-59eae58112f1
    [properties] => Array
        (
            [id] => 12345
        )

    [timestamp] => 1619073175
)

En este ejemplo $message1 y $message2 son equivalentes que no iguales pues tienen marcas de tiempo diferentes, lo que quiere decir que se ejecutó la misma órden en dos momentos temporales diferentes.

¿Cómo leer nuestros mensajes?
echo 'Mensaje: '. $message->getName() . PHP_EOL;
echo 'UUID: '. $message->getUUID() . PHP_EOL;
echo 'Timestamp como valor entero: '. $message->getTimestamp() . PHP_EOL;
echo 'Timestamp con formato: '. $message->getTimestamp('Y-m-d H:i:s') . PHP_EOL;

echo 'Mensaje como array: '. PHP_EOL;
print_r($message->toArray());

echo 'Mensaje como objeto: '. PHP_EOL;
print_r($message->toObject());

echo 'Mensaje como string: '. $message;

echo 'Propiedad ID del mensaje: '. $message->id . PHP_EOL;

echo 'Propiedades del mensaje como array: '. PHP_EOL;
print_r($message->toArray()['properties']);
Código fuente

Para simplificar la gestión de las propiedades adicinales del mensaje y para facilitar la depuración de esta entidad se ha hecho uso de los métodos mágicos de PHP y debe considerarse esta implementación como un ejemplo teórico funcional.

Publisher

Veamos ahora cómo desarrollar nuestro script que publicará nuestros mensajes en Redis.

[...]

$channel1 = 'channel:default';
$channel2 = 'channel:logs';
//...

$message1 = new Message('demo:event1', ['id' => 12345]);
$message2 = new Message('demo:event2', ['id' => 98765]);
//...

(new Redis($client, $logger))
    ->withMessages($message1, $message2...)
    ->publish($channel1, $channel2...);

// Para simplificar podremos hacer uso del patrón factoría y así reducir algo de código...
(new Redis($client, $logger))
    ->addMessage('demo:event1', ['id' => 12345])
    ->addMessage('demo:event2', ['id' => 98765])
    ->publish($channel1, $channel2...);

Puedes conocer más sobre el Patrón Factoría en el fantástico recurso Refactoring Guru donde explican con detalle cada patrón de diseño con ejemplos y casos de uso de aplicación real.

Código resultante
<?php

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use PhpRedisPubSub\PhpRedisPubSub;
use Predis\Client;

$client = new Client(
    parameters: [
        'scheme'             => 'tcp',
        'host'               => 'redis',
        'port'               => 6379,
        'database'           => 0,
        'read_write_timeout' => 0,
    ],
    options: [
        'cluster' => 'redis',
        'prefix'  => 'redis_pubsub_database_',
    ],
);

$logger = (new Logger(
    name: 'php-redis-pubsub',
))->pushHandler(
    handler: new StreamHandler(
        stream: 'php://stdout',
        level: Logger::INFO,
    )
);

(new PhpRedisPubSub(client: $client, logger: $logger))
    ->addMessage('custom-event-name', ['id' => 9999])
    ->publish('channel:name1', 'channel:name2');
Subscriber

Veamos ahora cómo desarrollar nuestro script que se mantendrá escuchando en Redis a uno o varios canales:

[...]

function messageHandler(MessageInterface $message) 
{
	// Lógica de negocio a ejecutar cuando se recibe cada mensaje
}

(new Redis($client, $logger))
    ->withHandler(messageHandler(MessageInterface $message))
    ->subscribe($channel1, $channel2...);
Código resultante
<?php

require_once __DIR__ .'/../vendor/autoload.php';

use App\Handlers\DemoEventHandler;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use PhpRedisPubSub\Interfaces\Message as MessageInterface;
use PhpRedisPubSub\PhpRedisPubSub;
use Predis\Client;

$REGISTERED_EVENTS = [
    DemoEventHandler::eventName() => DemoEventHandler::class,
];

$client = new Client(
    parameters: [
        'scheme'             => 'tcp',
        'host'               => 'redis',
        'port'               => 6379,
        'database'           => 0,
        'read_write_timeout' => 0,
    ],
    options: [
        'cluster' => 'redis',
        'prefix'  => 'redis_pubsub_database_',
    ],
);

$logger = (new Logger(
    name: 'php-redis-pubsub',
))->pushHandler(
    handler: new StreamHandler(
        stream: 'php://stdout',
        level: Logger::INFO,
    )
);

(new PhpRedisPubSub(client: $client, logger: $logger))
    ->withHandler(function (MessageInterface $message) use ($logger, $REGISTERED_EVENTS) {
        // Prevent dealing with messages wrongly posted on this channel
        if (! array_key_exists($message->getName(), $REGISTERED_EVENTS)) {
            return;
        }

        $className = $REGISTERED_EVENTS[$message->getName()];

        // Invokable class
        (new $className(logger: $logger, message: $message))();
    })
    ->subscribe('channel:name1', 'channel:name2');

Poniéndolo en funcionamiento

Para hacerlo funcionar vamos a tener que jugar con dos terminales:

Terminal 1: Subscriber
php subscriber.php

// Vemos por consola...
[2021-04-22T11:51:06.966738+00:00] redis-pubsub.INFO: Successfully subscribed to channel {"channel":"channel:name1"} []
[2021-04-22T11:51:06.967366+00:00] redis-pubsub.INFO: Successfully subscribed to channel {"channel":"channel:name2"} []

Vemos como nuestro script de subscripción se ha subscrito a dos canales diferentes; esto es, se mantendrá a la espera de que llegue algún mensaje en dichos canales.

Terminal 2: Publisher
php publisher.php

// Vemos por consola...
[2021-04-22T11:52:25.912148+00:00] redis-pubsub.INFO: Successfully published to channel {"channel":"channel:name1","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []
[2021-04-22T11:52:25.912712+00:00] redis-pubsub.INFO: Successfully published to channel {"channel":"channel:name2","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []

Aquí podemos ver cómo hemos publicado un mensaje en dichos canales. Mismo mensaje pero en los canales definidos previamente.

Terminal 1: Subscriber
// Vemos por consola...
[2021-04-22T11:52:25.912299+00:00] redis-pubsub.INFO: DemoEventHandler has been fired! {"id":"9999"} []
[2021-04-22T11:52:25.912336+00:00] redis-pubsub.INFO: Message successfully received from channel {"channel":"pubsub_database_channel:name1","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []
[2021-04-22T11:52:25.912695+00:00] redis-pubsub.INFO: DemoEventHandler has been fired! {"id":"9999"} []
[2021-04-22T11:52:25.912724+00:00] redis-pubsub.INFO: Message successfully received from channel {"channel":"pubsub_database_channel:name2","payload":"O:29:\"PhpRedisPubSub\\Models\\Message\":4:{s:41:\"\u0000PhpRedisPubSub\\Models\\Message\u0000properties\";a:1:{s:2:\"id\";s:4:\"9999\";}s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000name\";s:10:\"demo:event\";s:35:\"\u0000PhpRedisPubSub\\Models\\Message\u0000uuid\";s:36:\"37fcd88d-e521-45d1-bb60-cb0c9634172e\";s:40:\"\u0000PhpRedisPubSub\\Models\\Message\u0000timestamp\";i:1619092345;}"} []

Aquí podemos ver cómo los eventos asociados al mensaje se han disparado y a posteriori, la confirmación de que los mensajes se han recibido por nuestro subscriptor.

Como vemos, la asincronizidad dificulta el seguimiento y flujo y puede provocar que los procesos finalicen en un órden inesperado. De ahí la importancia de llevar un log con marcas de tiempo únicos que permita ordenar los eventos y realizar una correcta trazabilidad.

Descarga

Puedes descargar el proyecto de ejemplo desde aquí.

Versión del documento

[^v1.0]: Última Modificación: 22/04/2021