Symfony Messenger et rabbitmq

Temps de lecture : 5 minutes

Symfony 4 apporte un nouveau composant qui va nous permettre de brancher notre application sur un, ou des, brokers de messages. Grâce à ce composant, on va pouvoir accélérer notre application en traitant de façon asynchrone tout ce qui n’est pas strictement nécessaire à l’affichage de nos pages. Ce module maintenu par la Core Team Symfony va à terme remplacer les bundles existants. 

Use case

Un exemple couramment utilisé pour expliquer pourquoi c’est nécessaire d’utiliser ce genre de technique est le suivant:

Un utilisateur passe une commande sur mon site de e-commerce. Il a payé sa commande et attend juste le message de confirmation. Si je traite tout en synchrone je vais devoir réaliser plusieurs traitements avant de lui rendre la main:

  1. Changer le statut de sa commande
  2. Mettre à jour les stocks et invalider les caches qui correspondent
  3. Envoyer un email de confirmation au client
  4. Prévenir le service de préparation de cette nouvelle commande
  5. Envoyer des metrics pour suivre le volume des ventes

Sur les 5 tâches citées, le client n’a besoin que de la première de façon immédiate. Les autres peuvent se faire dans les secondes ou minutes qui suivent. 

Au-delà du simple point de performance il y a aussi le problème de responsabilité unique. Si toute cette logique est dans le contrôleur, vous allez devoir gérer tout un ensemble de règles disparates. Le contrôleur va forcement devenir inmaintenable au bout d’un moment.

AMQP

Pour pouvoir différer les autres tâches, il va falloir que je pousse des messages dans des queues (type rabbitmq, SQS) et que des workers viennent les consommer pour réaliser les traitements.

Sur cet exemple, de Microsoft, on voit bien le principe d’un broker de message. Un ou plusieurs producteurs (senders) envoient des messages dans une queue et ils sont dépilés par un ou plusieurs consommateurs (receivers).

Dans le meilleur des mondes, vous n’avez qu’une technologie de service de queue et peut-être même qu’un seul serveur à interroger. Dans ce cas, c’est assez simple et vous utilisez sûrement déjà un client php pour le faire.

Mais si vous êtes à cheval sur plusieurs technos/ serveurs, c’est vite un casse-tête car il va falloir gérer plusieurs protocoles et serializers.

Symfony Messenger

Le composant Messenger va vous permettre de gérer les problèmes de techno et de serveurs grâce à de la configuration YML. Dans votre implémentation vous n’aurez pas à vous soucier de comment va partir votre message, ni de comment le sérializer. Le composant est compatible avec les brokers de messages AMQP (la plupart).

  • Message : Un objet PHP serializable
  • Bus : Le composant qui va s’occuper de la communication avec le queue manager et d’appliquer les middlewares que l’on aura pu enregistrer (logs, debug,…)
  • Handler : La classe qui va recevoir un message à exécuter. C’est cette classe qui va tenir la logique métier.
  • Receiver : Déserialize le message reçu via le bus et le transmet au bon handler.
  • Sender : Sérialize le message et le transmet au queue manager via le bus.

Nous allons appliquer ce principe pour le cas de l’envoi du mail de confirmation de commande. Il faudra répéter le pattern pour chacun des autres types d’action à effectuer.

Mise en application

Installation du composant

Le composant s’installe à l’aide de composer via la commande :

composer require symfony/messenger

Symfony flex va s’occuper automatiquement d’enregistrer le bundle et créer le fichier de configuration par défaut.  Le composant vient avec son intégration à la Symfony toolbar et au profiler. Il est donc possible de voir en détail les messages dispatchés lors d’un hit.

Configuration du composant

Je vais prendre le cas d’un rabbitmq. Il va falloir veiller à installer et activer l’extension php amqp.

Une fois que c’est fait, nous pouvons éditer le fichier de configuration de messenger pour y ajouter nos transports.

Dans votre fichier .env à la racine de votre dépôt, il faudra ajouter les différents Data Source Name.

Création de notre message

Le message est un simple objet php.

L’interface par défaut de mes messages (Optionnel mais pratique à l’usage)
L’interface des objets de type mail

Dans mon message j’envoie les id des différentes entités. Selon vos besoins vous pouvez directement mettre les entités.

Envoie du message dans le broker

Maintenant que nous avons notre objet de message nous allons pouvoir le pousser.

Traitement du message

Pour le moment, votre application ne fonctionnera pas car Symfony Messenger refusera de prendre en compte un message dont il ne connait pas le handler.

Il faut maintenant enregistrer notre handler dans le container avec le tag ‘messenger.message_handler’.

À partir de ce moment, Symfony va vous autoriser à dispatcher les messages ConfirmCommandMailer. Pour cet exemple, j’ai utilisé  Swift Mailer mais libre à vous d’utiliser une autre librairie.

Le composant va utiliser la reflection PHP pour détecter le handler qui doit être utilisé pour un message. Il va regarder le type du paramètre passé à la fonction __invoke. 

Lancer le worker

Maintenant que l’on a tout ce qu’il nous faut, il ne reste plus qu’à lancer notre consommateur. Il faudra lancer au minimum autant de workers que de channels.

bin/console messenger:consume-messages amqp_mailer

Il y a pas mal d’options disponibles pour limiter la durée de vie du daemon, la mémoire allouée, le temps de pause entre chaque message traité… Pour ma part je lance toujours au moins deux consommateurs pour un type de message et je fais en sorte qu’ils se tuent automatiquement tous les n messages traités.

Voici une démonstration sous forme de GIF de ce que l’on vient de faire. Pour simplifier la démonstration, j’ai fait une seconde commande Symfony qui pousse notre message dans la queue.

En production

En production il faut automatiser le lancement du daemon et le relancer en cas de crash. Pour ce faire vous pouvez utiliser supervisor avec la config suivante :

Généralement je fais tourner les workers dans des conteners docker dans un cluster swarm. De cette manière je peux gérer le nombre de consommateurs par type de message à la volée. Voici un exemple de Docker file qui peut faire tourner un worker symfony messenger. Je l’ai fait pour un projet perso, il n’est donc pas parfaitement optimisé pour de la vraie prod. 

Exemple de docker file simple pour un worker Symfony messenger

Et voici comment l’utiliser dans le cadre d’un docker compose :

Vous pouvez maintenant faire un docker-compose up ou le lancer sur un cluster swarm via la commande docker stack deploy.

Limitation

Pour le moment il n’y a pas de solution « out of the box » pour gérer le re-jeux des messages en cas d’erreur. C’est à vous de catcher les erreurs et soit de les pousser dans une autre queue, d’écrire un log, une métrique, ou de les stocker en base pour les identifier au besoin. C’est une lacune assez importante de la librairie qui devrait être corrigée.

Les problèmes possibles

Que veux dire l’erreur : Attempted to load class « AMQPConnection » from the global namespace.
Did you forget a « use » statement?

Ce message vous signale que vous n’avez pas l’extension amqp d’activée sur votre machine. Il faut installer et activer l’extension php-amqp.

Comment résoudre l’erreur: [ErrorException] Declaration of SymfonyFlexParallelDownloader::getRemoteContents($originUrl, $fileUrl, $context) should be compatible with ComposerUtilRemoteFilesystem::getRemoteContents($originUrl, $fileUrl, $context, ?array & $responseHeaders = NULL)

Il faut exécuter composer update –no-plugins pour mettre Symfony flex à jour.

Comment corriger l’erreur : [SymfonyComponentMessengerExceptionNoHandlerForMessageException]
No handler for message « AppMessageConfirmCommandMailer ».

Il faut veiller à deux points pour trouver la source de cette erreur. Premièrement que vous avez bel et bien créé un handler avec une méthode __invoke qui prend un objet de type AppMessageConfirmCommandMailer en premier et unique paramètre. Et dans un second temps que vous avez bien ajouté votre handler dans votre fichier services.yml avec le tag « messenger.message_handler »

Précédent

Du machine learning dans mes cocktails

Suivant

Intervention à Epitech Nancy

  1. Jorge luis Acosta

    Bonjour, Meci pour votre explication,

    Vu que vous utilisez « entityManager » dans le Handler « ConfirmCommandMailerHandler » sans vérifier la connection vers la base de données, comment gerez-vous le « Long Running Process » afin de ne pas avoir le problème « Lost Connection » de Doctrine?

    Car j’utitlise SupervisordBundle et j’ai ce problème et donc en passant vers Messager je crois que j’aurais quand même l’erreur.

    Merci

    • Bonjour Jorge,

      Je contourne le problème en gérant un pool de consommateurs qui vont se redémarrer automatiquement toutes les n minutes.
      C’est la méthode recommandée par SF :

      Don’t Let Workers Run Forever
      Some services (like Doctrine’s EntityManager) will consume more memory over time. So, instead of allowing your worker to run forever, use a flag like messenger:consume –limit=10 to tell your worker to only handle 10 messages before exiting (then Supervisor will create a new process). There are also other options like –memory-limit=128M and –time-limit=3600.

      Source: https://symfony.com/doc/current/messenger.html#deploying-to-production

      Arnaud

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Fièrement propulsé par WordPress & Thème par Anders Norén