Symfony Messenger et rabbitmq

Symfony 4 apporte un nouveau composant qui va nous permettre de brancher notre application sur un, ou des, brokers de messages. Grâce à ce composant, on va pouvoir accélérer notre application en traitant de façon asynchrone tout ce qui n’est pas strictement nécessaire à l’affichage de nos pages. Ce module maintenu par la Core Team Symfony va à terme remplacer les bundles existants. 

Use case

Un exemple couramment utilisé pour expliquer pourquoi c’est nécessaire d’utiliser ce genre de technique est le suivant:

Un utilisateur passe une commande sur mon site de e-commerce. Il a payé sa commande et attend juste le message de confirmation. Si je traite tout en synchrone je vais devoir réaliser plusieurs traitements avant de lui rendre la main:

  1. Changer le statut de sa commande
  2. Mettre à jour les stocks et invalider les caches qui correspondent
  3. Envoyer un email de confirmation au client
  4. Prévenir le service de préparation de cette nouvelle commande
  5. Envoyer des metrics pour suivre le volume des ventes

Sur les 5 tâches citées, le client n’a besoin que de la première de façon immédiate. Les autres peuvent se faire dans les secondes ou minutes qui suivent. 

Au-delà du simple point de performance il y a aussi le problème de responsabilité unique. Si toute cette logique est dans le contrôleur, vous allez devoir gérer tout un ensemble de règles disparates. Le contrôleur va forcement devenir inmaintenable au bout d’un moment.

AMQP

Pour pouvoir différer les autres tâches, il va falloir que je pousse des messages dans des queues (type rabbitmq, SQS) et que des workers viennent les consommer pour réaliser les traitements.

Sur cet exemple, de Microsoft, on voit bien le principe d’un broker de message. Un ou plusieurs producteurs (senders) envoient des messages dans une queue et ils sont dépilés par un ou plusieurs consommateurs (receivers).

Dans le meilleur des mondes, vous n’avez qu’une technologie de service de queue et peut-être même qu’un seul serveur à interroger. Dans ce cas, c’est assez simple et vous utilisez sûrement déjà un client php pour le faire.

Mais si vous êtes à cheval sur plusieurs technos/ serveurs, c’est vite un casse-tête car il va falloir gérer plusieurs protocoles et serializers.

Symfony Messenger

Le composant Messenger va vous permettre de gérer les problèmes de techno et de serveurs grâce à de la configuration YML. Dans votre implémentation vous n’aurez pas à vous soucier de comment va partir votre message, ni de comment le sérializer. Le composant est compatible avec les brokers de messages AMQP (la plupart).

  • Message : Un objet PHP serializable
  • Bus : Le composant qui va s’occuper de la communication avec le queue manager et d’appliquer les middlewares que l’on aura pu enregistrer (logs, debug,…)
  • Handler : La classe qui va recevoir un message à exécuter. C’est cette classe qui va tenir la logique métier.
  • Receiver : Déserialize le message reçu via le bus et le transmet au bon handler.
  • Sender : Sérialize le message et le transmet au queue manager via le bus.

Nous allons appliquer ce principe pour le cas de l’envoi du mail de confirmation de commande. Il faudra répéter le pattern pour chacun des autres types d’action à effectuer.

Mise en application

Installation du composant

Le composant s’installe à l’aide de composer via la commande :

composer require symfony/messenger

Symfony flex va s’occuper automatiquement d’enregistrer le bundle et créer le fichier de configuration par défaut.  Le composant vient avec son intégration à la Symfony toolbar et au profiler. Il est donc possible de voir en détail les messages dispatchés lors d’un hit.

Configuration du composant

Je vais prendre le cas d’un rabbitmq. Il va falloir veiller à installer et activer l’extension php amqp.

Une fois que c’est fait, nous pouvons éditer le fichier de configuration de messenger pour y ajouter nos transports.

parameters:
messenger.transport.default_serialization_context: {groups: [messenger]}
framework:
messenger:
transports:
# les Data Source Name pour mes différents transports
amqp_mailer: '%env(MESSENGER_TRANSPORT_DSN_MAILER)%'
amqp_default: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
# En fonction du type de message on peut router le message
# pour ma part j'utilise des interface
'App\Message\Interfaces\Mailer': amqp_mailer
# Tout ce qui n'a pas matché avance passera ici
'*': amqp_default

Dans votre fichier .env à la racine de votre dépôt, il faudra ajouter les différents Data Source Name.

###> symfony/messenger ###
MESSENGER_TRANSPORT_DSN_MAILER=amqp://LOGIN:PASSWORD@HOST:5672/%2f/mails
MESSENGER_TRANSPORT_DSN=amqp://LOGIN:PASSWORD@HOST:5672/%2f/default
###< symfony/messenger ###
view raw .env hosted with ❤ by GitHub

Création de notre message

Le message est un simple objet php.

namespace App\Message\Interfaces;
interface Message
{
}
view raw message hosted with ❤ by GitHub
L’interface par défaut de mes messages (Optionnel mais pratique à l’usage)
<?php
namespace App\Message\Interfaces;
interface Mailer extends Message
{
}
view raw mailer hosted with ❤ by GitHub
L’interface des objets de type mail
<?php
namespace App\Message;
use App\Message\Interfaces\Mailer;
class ConfirmCommandMailer implements Mailer
{
/**
* @var int
*/
protected $commandId;
/**
* ConfirmCommandMailer constructor.
* @param int $commandId
*/
public function __construct(int $commandId)
{
$this->commandId = $commandId;
}
/**
* @return int
*/
public function getCommandId() :int
{
return $this->commandId;
}
}

Dans mon message j’envoie les id des différentes entités. Selon vos besoins vous pouvez directement mettre les entités.

Envoie du message dans le broker

Maintenant que nous avons notre objet de message nous allons pouvoir le pousser.

use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\Messenger\MessageBusInterface;
use App\Message\ConfirmCommandMailer;
class CommandController extends Controller
{
/**
* @var MessageBusInterface
*/
private $bus;
/**
* CommandController constructor.
*/
public function __construct(MessageBusInterface $bus)
{
$this->bus = $bus;
}
/**
* @Route("/customer/{idCustomer}/command/{idCommand}/confirm", name="confirm_command", methods={"POST"})
* @param $name
*/
public function index($idCustomer, $idCommand)
{
// check des params
// changement du statut de la commande
//…
// on envoie notre message dans le broker
$this->bus->dispatch(new ConfirmCommandMailer($idCommand));
// rendu du tpl
}
}

Traitement du message

Pour le moment, votre application ne fonctionnera pas car Symfony Messenger refusera de prendre en compte un message dont il ne connait pas le handler.

<?php
namespace App\Message\Handler;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use App\Message\ConfirmCommandMailer;
use \Swift_Mailer;
/**
* Class ConfirmCommandMailerHandler
*/
class ConfirmCommandMailerHandler
{
/**
* @var Swift_Mailer
*/
protected $mailer;
/**
* @var EntityManagerInterface
*/
protected $entityManager;
/**
* ConfirmCommandMailerHandler constructor.
* @param \Swift_Mailer $mailer
* @param EntityManagerInterface $entityManager
*/
public function __construct(Swift_Mailer $mailer, EntityManagerInterface $entityManager)
{
$this->mailer = $mailer;
$this->entityManager = $entityManager;
}
/**
* @param ConfirmCommandMailer $message
*/
public function __invoke(ConfirmCommandMailer $message)
{
$command = $this->entityManager->getRepository(Command::class)->find($message->getCommandId());
// verifier que les données sont cohérantes
$message = (new \Swift_Message('Commande confirmée'))
->setFrom('no-reply@example.com')
->setTo($command->getCustomer()->getEmail())
->setBody(
"Commande #" . $command->getId() . " confirmée"
)
;
$this->mailer->send($message);
}
}

Il faut maintenant enregistrer notre handler dans le container avec le tag ‘messenger.message_handler’.

App\Message\Handler\ConfirmCommandMailerHandler:
tags: ['messenger.message_handler']
view raw service.yml hosted with ❤ by GitHub

À partir de ce moment, Symfony va vous autoriser à dispatcher les messages ConfirmCommandMailer. Pour cet exemple, j’ai utilisé  Swift Mailer mais libre à vous d’utiliser une autre librairie.

Le composant va utiliser la reflection PHP pour détecter le handler qui doit être utilisé pour un message. Il va regarder le type du paramètre passé à la fonction __invoke. 

Lancer le worker

Maintenant que l’on a tout ce qu’il nous faut, il ne reste plus qu’à lancer notre consommateur. Il faudra lancer au minimum autant de workers que de channels.

bin/console messenger:consume-messages amqp_mailer

Il y a pas mal d’options disponibles pour limiter la durée de vie du daemon, la mémoire allouée, le temps de pause entre chaque message traité… Pour ma part je lance toujours au moins deux consommateurs pour un type de message et je fais en sorte qu’ils se tuent automatiquement tous les n messages traités.

Voici une démonstration sous forme de GIF de ce que l’on vient de faire. Pour simplifier la démonstration, j’ai fait une seconde commande Symfony qui pousse notre message dans la queue.

En production

En production il faut automatiser le lancement du daemon et le relancer en cas de crash. Pour ce faire vous pouvez utiliser supervisor avec la config suivante :

[program:amqp_mailer]
command=php /symfony/bin/console messenger:consume-messages amqp_mailer
startsecs = 0
stdout_logfile=/tmp/supervisord-amqp.log
stdout_logfile_maxbytes=10MB
view raw gistfile1.txt hosted with ❤ by GitHub

Généralement je fais tourner les workers dans des conteners docker dans un cluster swarm. De cette manière je peux gérer le nombre de consommateurs par type de message à la volée. Voici un exemple de Docker file qui peut faire tourner un worker symfony messenger. Je l’ai fait pour un projet perso, il n’est donc pas parfaitement optimisé pour de la vraie prod. 

FROM php:7.2
# Composer
RUN php -r "copy('https://getcomposer.org/installer&#39;, 'composer-setup.php');" \
&& php -r "if (hash_file('SHA384', 'composer-setup.php') === '544e09ee996cdf60ece3804abc52599c22b1f40f4323403c44d44fdfdd586475ca9813a858088ffbc1f233e9b180f061') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;" \
&& php composer-setup.php –install-dir=/bin –filename=composer \
&& php -r "unlink('composer-setup.php');"
# amqp
RUN apt-get update && apt-get install -y librabbitmq-dev libssh-dev \
&& docker-php-ext-install opcache bcmath sockets \
&& pecl install amqp \
&& docker-php-ext-enable amqp
# core tools
RUN apt-get install -y git-core
# php exts
RUN docker-php-ext-install pdo pdo_mysql zip mbstring
RUN mkdir /app
WORKDIR /app
env QUEUE_NAME amqp_default
ENTRYPOINT ["sh", "-c", "/app/bin/console messenger:consume-messages ${QUEUE_NAME} –limit=100 –time-limit=900 -n"]
Exemple de docker file simple pour un worker Symfony messenger

Et voici comment l’utiliser dans le cadre d’un docker compose :

version: '3'
services:
worker_mailer:
image: ma-super-image-de-worker:latest
environment:
– QUEUE_NAME=amqp_mailer
volumes:
– "/PATH/VERS/MON/APP/SF:/app:rw"
deploy:
mode: replicated
replicas: 2

Vous pouvez maintenant faire un docker-compose up ou le lancer sur un cluster swarm via la commande docker stack deploy.

Limitation

Pour le moment il n’y a pas de solution « out of the box » pour gérer le re-jeux des messages en cas d’erreur. C’est à vous de catcher les erreurs et soit de les pousser dans une autre queue, d’écrire un log, une métrique, ou de les stocker en base pour les identifier au besoin. C’est une lacune assez importante de la librairie qui devrait être corrigée.

Les problèmes possibles

Que veux dire l’erreur : Attempted to load class « AMQPConnection » from the global namespace.
Did you forget a « use » statement?

Ce message vous signale que vous n’avez pas l’extension amqp d’activée sur votre machine. Il faut installer et activer l’extension php-amqp.

Comment résoudre l’erreur: [ErrorException] Declaration of SymfonyFlexParallelDownloader::getRemoteContents($originUrl, $fileUrl, $context) should be compatible with ComposerUtilRemoteFilesystem::getRemoteContents($originUrl, $fileUrl, $context, ?array & $responseHeaders = NULL)

Il faut exécuter composer update –no-plugins pour mettre Symfony flex à jour.

Comment corriger l’erreur : [SymfonyComponentMessengerExceptionNoHandlerForMessageException]
No handler for message « AppMessageConfirmCommandMailer ».

Il faut veiller à deux points pour trouver la source de cette erreur. Premièrement que vous avez bel et bien créé un handler avec une méthode __invoke qui prend un objet de type AppMessageConfirmCommandMailer en premier et unique paramètre. Et dans un second temps que vous avez bien ajouté votre handler dans votre fichier services.yml avec le tag « messenger.message_handler »

Cocktailand – Ajouter du cache HTTP dans mon symfony

C’est quoi un ESI ?

Les ESI ou Edge Side Includes sont un balisage supporté par Varnish qui permet de gérer des temps de cache différents pour des blocs de la même page.

Dans le cadre de Cocktailand, certains blocs sont actualisés régulièrement comme le « Cocktail du jour » mais d’autres ne changent quasiment jamais comme la liste des catégories.

Voici donc le découpage que j’ai fait sur la page principale. Pour la barre de menu, c’est bien évidemment le contenu du méga menu que j’ai voulu mettre en évidence.

Il est donc intéressant de ne pas avoir à invalider toute la page lorsque le cocktail du jour est changé. Le second avantage en termes de performance est que les blocs peuvent être utilisés sur différentes pages. Cela signifie qu’un ESI présent sur toutes les pages du site ne sera généré qu’une seule fois. Lors des autres appels, Varnish utilisera son cache.

Configuration de varnish

Pour Cocktailand, la configuration de varnish est assez simple car je ne fais pas de purge et parce qu’il n’y a pas d’espace connecté sur le site.

Voici la configuration que j’ai:

vcl 4.0;
import std;
backend default {
// le hostname du nginx dans ma stack
.host = "front";
.port = "80";
}
sub vcl_recv {
set req.http.Surrogate-Capability = "abc=ESI/1.0";
unset req.http.Cookie;
}
//Ensuite, ce block est appelé après la réception des headers de réponse.
//Nous supprimons le header et activons les ESI
sub vcl_backend_response {
if (beresp.http.Surrogate-Control ~ "ESI/1.0") {
unset beresp.http.Surrogate-Control;
set beresp.do_esi = true;
}
if (bereq.url ~ "\.(jpe?g|png|gif|pdf|tiff?|css|js|ttf|woff2?|otf|eot|svg)$") {
set beresp.ttl = std.duration(beresp.http.age+"s",0s) + 24h;
}
}
sub vcl_deliver {
if (obj.hits > 0) {
set resp.http.X-Cache = "HIT";
} else {
set resp.http.X-Cache = "MISS";
}
}
view raw varnish vcl hosted with ❤ by GitHub

La mise en place dans Symfony 4

ESI

Le support des ESI dans Symfony est intégré nativement dans le framework.

Dans le fichier config/packages/framework.yaml il faut activer les ESIs.

framework:
esi: { enabled: true }
fragments: { path: /_fragment }
view raw framework.yaml hosted with ❤ by GitHub

Toutes vos routes qui servent des ESI devront commencer par _fragment.

Dans vos vues twig, vous avez des helpers à disposition pour poser vos tags.

{{ render_esi(url('popular_cocktails')) }}
view raw render_esi hosted with ❤ by GitHub

Cache http

En utilisant le package sensio/framework-extra-bundle on peut gérer le cache sur les controller avec des annotations.

/**
* @Route("/cocktail/recette/{name}-{id}", name="cocktail_detail", requirements={"id"="\d+", "name"="[0-9a-z-]+"})
* @Cache(public=true, maxage=86400, mustRevalidate=false, maxStale=86400)
**/
public function index($name, $id)
{
// …
}
view raw @Cache hosted with ❤ by GitHub

Temps de réponse

Une fois les ESIs et le cache HTTP mis en place, on peut analyser les performances avec Webpagetest.

L’outil va nous donner des informations sur le temps de réponse de l’application et surtout des indications sur ce qu’il faudrait améliorer.

Par exemple, sur ce test, il me dit que je peux potentiellement améliorer la gestion des fonts et des images.

// @todo activer gzip sur les fonts

Ce que l’on peut remarquer, c’est que le site commence à envoyer le HTML après 231ms. Dans ce temps il y a en moyenne 40ms de DNS Lookup, 30ms de connexion et 80ms de négociation SSL. Malheureusement sur cette partie je n’ai pas la main, c’est donc l’overhead de base pour toute page du site. Mais comme j’utilise HTTP2 je vais mutualiser toute cette partie pour les images assets servies sur le même domaine.

Les nouvelles problématiques

Tout mettre en cache, c’est bien pour les performances, mais malheureusement certaines informations ont besoin de « temps réel ».

Quand un visiteur ajoute une note sur une recette, il est nécessaire que cette note mise à jour si le visiteur rafraîchit la page.

Il y a plusieurs solutions:

  • Faire des bans de cache lors de l’ajout d’une note
  • Afficher les données du cache et rafraîchir les données en AJAX

Bien évidemment, la première solution est celle qui devrait être implémentée. Maintenant, c’est pas mal de code et de configuration sur varnish. C’est d’autant plus compliqué que, dans sa version gratuite, Varnish ne permet pas de faire un ban sur plusieurs instances.

Faire les bans impliquerait de maintenir une liste des varnishs qui tournent (si jamais je fais scaller cette brique) et de faire n appels curl pour faire un ban partout.

Pour le moment, j’ai fait le choix de la requête AJAX non cachée. Si jamais le site gagne en popularité, il faudra retravailler sur ce point.

Conclusion

Sur un blog ou un site dans lequel le visiteur ne peut quasiment pas interagir avec vos données, c’est très simple de mettre en place du cache varnish et le gain de performance est énorme. Maintenant, si vous avez des besoins plus complexes (site transactionnel, forum,…) vous allez devoir mettre en place une mécanique d’invalidation de cache.