Notre promotion d'été sur les formations PHP et Symfony est disponible: obtenez -25% pour toute formation ayant lieu du 7 juillet au 29 août Demandez-nous un devis dès maintenant


Comment prioriser les messages lors du développement d'applications asynchrones avec Symfony Messenger

· Thibaut Chieux · Temps de lecture: 8 minutes
Blue sign on a building with several Now What? letters

Le traitement asynchrone offre des avantages tels que la découplage des processus et des temps de réponse plus rapides, mais la gestion des priorités des messages peut s'avérer complexe. Pour traiter des tâches allant de la réinitialisation de mot de passe à des exports complexes, il est essentiel de garantir la livraison rapide des messages critiques. Cet article examine les problèmes fréquents liés au traitement asynchrone et propose des solutions avec Symfony Messenger pour optimiser votre application sans refonte majeure.

Le passage à l'asynchrone peut sembler être un rêve : des processus découplés, un temps de réponse plus rapides et plus aucun utilisateur à fixer leur écran en attendant une réponse. Mais très vite, la réalité reprend le dessus : certains messages prennent une éternité, d'autres sont trop importants pour être retardés, et vous vous retrouvez soudain submergé par une multitude de priorités.

Que vous envoyiez des e-mails de réinitialisation de mot de passe ou que vous déclenchiez des exportations complexes, vous devez vous assurer que les bons messages sont traités au bon moment. Cet article se penche sur les problèmes auxquels vous serez confronté et vous explique comment les résoudre à l'aide de Symfony Messenger, sans avoir à réécrire votre application à partir de zéro ni à pleurer sur vos logs à trois heures du matin.

🧠 Le problème : Prioriser de manière dynamique et traiter chaque message pendant une période donnée

Animated gif with a lady with red glasses talking with the text "We have a priorities problem" belowLorsque vous commencez à mettre des messages en file d'attente dans votre application Symfony, vous vous rendez vite compte que tous les messages ne se valent pas. Certains sont critiques et urgents. D'autres le sont beaucoup moins.

Certains transports offrent déjà un moyen de gérer les priorités, par exemple :

🐰 RabbitMQ dispose du header ‘x-priority’

🌱 Beanstalkd dispose d'un système de “priorité par tube” intégré

C'est bien, mais que faire si je veux passer à un autre transport demain sans réécrire la moitié de mon code ?

Symfony Messenger a la solution

La documentation officielle explique comment répartir les messages entre plusieurs transports en fonction de leur priorité. Imaginez cela comme l'attribution de voies sur une autoroute : une pour les ambulances, une pour les scooters.

framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
            async_priority_medium:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: medium
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
            async_priority_very_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: very_low

        routing:
            'App\Message\ExportMessage': async_priority_low
            'App\Message\UpdateStateMessage': async_priority_high

💡 Remarque : vous pouvez librement choisir les noms des files d'attente, mais veillez à ce qu'ils correspondent à votre cas d'utilisation réel. Il s'agit ici d'exemples fictifs.

Maintenant que nous avons classé les messages par priorité, il est tout aussi important de les traiter dans le bon ordre. Heureusement, Symfony Messenger facilite grandement cette tâche :

php bin/console messenger:consume async_priority_high async_priority_medium async_priority_low async_priority_very_low

Le worker consommera d'abord à partir de async_priority_high. S'il est vide, il passera au suivant. Et ainsi de suite. Ainsi, même s'il y a beaucoup de messages non urgents, les messages hautement prioritaires ne restent pas bloqués.

Comment choisir la bonne queue?

Honnêtement, c'était la partie la plus délicate pour moi. Il n'existe pas de formule universelle, et ce tableau est crucial, car tout le reste de cet article en dépend.

👉 Il doit évidemment être rempli avec le client ou le propriétaire du produit, et pas seulement par les développeurs.

La question à laquelle vous devez répondre ici est simple, mais fondamentale :

« Quel est le délai maximal acceptable (y compris le temps d'attente et le traitement effectif) pour chaque type de message ? »

Et à partir de là, vous obtenez votre cartographie des priorités :

Priorité

Élevée

Moyenne

Faible

Très faible

Délais avant traitement

1 minute

10 minutes

1 heure

1 jour

Exemple

- Mise à jour de l'état

- E-mail

Mise à jour du CMS

- Mise à jour des prix

- Traitement analytique

- Exportation

- Anonymisation

Maintenant que j'ai réparti les messages dans des files d'attente, tout devrait être parfait, n'est-ce pas ?

Eh bien... Pas tout à fait.

🙃 Un problème subsiste : les types de messages peuvent être trop génériques

Imaginons que j'ai un EmailMessage. Ça semble correct. Mais je pourrais l'utiliser pour :

  • Une réinitialisation de mot de passe 🟥 Priorité élevée

  • Une notification de livraison 🟨 Moyenne

  • Une demande d'évaluation de votre achat 🟦 Faible ou très faible

Comment attribuer un transport lorsque la même classe de message peut représenter des niveaux d'urgence totalement différents ?

Problème n°2 : un message devrait pouvoir avoir plusieurs priorités

💡 Introduisons : TransportNamesStamp et notre PriorityStamp personnalisé

Heureusement, Symfony Messenger dispose déjà d'un moyen intégré pour forcer un message à passer par un transport spécifique : TransportNamesStamp. Mais pour rendre les choses plus claires (et plus sémantiques), introduisons notre propre PriorityStamp :

namespace App\Messenger\Stamp;

use Symfony\Component\Messenger\Stamp\StampInterface;

readonly class PriorityStamp implements StampInterface
{
    public function __construct(private string $priority) {}

    public function getPriority(): string
    {
        return $this->priority;
    }
}

Et maintenant, un middleware personnalisé pour s'intégrer dans le flux de répartition :

readonly class PriorityRoutingMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        // Check if the message has a PriorityStamp
        $priorityStamp = $envelope->last(PriorityStamp::class);

        if ($priorityStamp instanceof PriorityStamp) {
            $priority = $priorityStamp->getPriority();

            // Determine the transport based on priority
            $transport = match ($priority) {
                'high' => 'high_priority',
                'medium' => 'medium_priority',
                'low' => 'low_priority',
                'very_low' => 'very_low_priority',
                default => throw new \RuntimeException('Unknow priority level')
            };

            // Add a TransportNamesStamp to redirect the message
            $envelope = $envelope->with(new TransportNamesStamp([$transport]));
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

Ensuite, n'oubliez pas d'ajouter notre nouveau middleware à la configuration du messenger :

framework:
    messenger:
      # ...
      buses:
        messenger.bus.default:
          middleware:
            - 'App\Messenger\Middleware\PriorityRoutingMiddleware'

Désormais, lorsque j'envoie un message, je peux facilement remplacer sa priorité, sans avoir à créer une nouvelle classe de message ou à tout refactoriser.

$this->messageBus->dispatch(
    new SendEmailMessage($notificationEmail),
    [new PriorityStamp('medium')],
)

Dans ce cas, on se dit :

💬 « Hé, cet e-mail n'est pas si urgent que ça, 10 minutes d’attente suffisent. »

Cela permet aux messages critiques d’être traités rapidement, sans encombrer votre file d'attente hautement prioritaire avec des messages moins importants.

Alors, où en sommes-nous maintenant ?

Je peux choisir le mode de transport de façon dynamique.

Je peux ajuster la priorité au moment de l'envoi.

Je peux m'assurer que chaque message est traité dans le délai maximal autorisé ← Nous n'en sommes pas encore là.

Quel est le problème à présent ?

Imaginons que j'envoie un message pour mettre à jour le prix de toutes les variations de produits de mon catalogue, ou d'une grande partie d'entre elles.

Ce n'est pas grave si je n'ai que quelques variations de produits. Mais si j'en ai 100 000 ? 500 000 ? Un million ? Et que chacune d'entre elles effectue un appel API à distance pour récupérer le prix ?

Voici ce qui se passe :

📨 Un message entre dans la file d'attente.

🧠 Le traitement commence.

Cela prend 5, 10, 30... 60 minutes.

🧵 Pendant ce temps, le worker PHP est bloqué.

Moone Boy watching his watch sitting on the ground with a suitcase next to himJe pourrais essayer de regrouper ces appels API. Bien sûr, cela aiderait, mais cela ne ferait que réduire le problème sans le résoudre. Et ce n'est pas une solution vraiment scalable.

C'est un problème, même avec toute notre belle logique de priorisation, car les messages à exécution longue ne fonctionnent pas bien dans ce modèle.

Problème n°3 : certains messages prennent une éternité à être traités

Supposons que j'ai ce message et le handler correspondant dans mon projet correspondant au processus de mise à jour des prix des variations de produit :

readonly class UpdatePrices
{
    public function __construct(
        public array $filters,
    ) {
    }
}
#[AsMessageHandler]
readonly class UpdatePricesHandler
{
    public function __invoke(UpdatePrices $message): void
    {
        foreach ($this->productVariantRepository->findAllByRegex($message->filters) as $product) {
            $this->priceUpdater->updatePriceForVariant($variant);
        }
        
        $this->em->flush();
    }
}

Soyons honnêtes : le problème nous saute aux yeux.

Si 200 000 variantes correspondent à cette expression régulière et que chaque mise à jour prend 0,1 seconde, cela représente environ 5 heures de traitement en une seule fois, ce qui dépasse largement notre limite d'une heure.

Pour éviter d'encombrer la file d'attente, nous allons diviser cette tâche en messages plus petits. Allons même jusqu'à l'extrême : un message = une seule variante.

Conservez le message principal :

readonly class UpdatePrices
{
    public function __construct(
        public array $filters,
    ) {
    }
}

Ajoutez un message unitaire :

readonly class UpdateVariantPrice
{
    public function __construct(
        public int $variantId,
    ) {
    }
}

Modifiez maintenant le handler pour qu'il envoie un message par variation de produit :

public function __invoke(UpdatePrices $message): void
{
	foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
        $this->messageBus->dispatch(new UpdateVariantPrice($product->getId()));
    }
}

En bonus, nous pouvons même intégrer notre logique de priorité dynamique :

public function __invoke(UpdatePrices $message): void
{
	foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
    	if($variant->isSoldVeryOften()) {
        	$this->messageBus->dispatch(
            	new UpdateVariantPrice($variant->getId()),
            	[new PriorityStamp('medium')]
            );
        } else {
        	$this->messageBus->dispatch(new UpdateVariantPrice($variant->getId()));
        }
    }
}

Et le nouveau handler pour le message unitaire :

public function __invoke(UpdateVariantPrice $message): void
{
    $variant = $this->variantRepository->find($message->variantId);
    if (!$variant instanceof ProductVariant) {
        throw new UnrecoverableMessageHandlingException("Impossible to find the variant");
    }
    
    $this->priceUpdater->update($variant);
    $this->em->flush();
}

Bien sûr, le délai total peut légèrement augmenter, mais la file d'attente reste fluide. Si un message prioritaire arrive, il est immédiatement pris en charge.

Alors, qu'en est-il maintenant ?

Je peux choisir le mode de transport de manière dynamique.

Je peux ajuster la priorité au moment de l'envoi.

Je peux m'assurer que chaque message est traité dans le délai maximal autorisé ← Ce n'est pas encore le cas... Attendez. Rien n'a changé ?

Anakin Skywalker angry screaming "Liar" Supposons que j'ai envoyé :

  • 3 mises à jour de prix

  • 10 exportations

  • Des centaines de mises à jour d'état

  • Quelques anonymisations

📈 Avons-nous besoin de scaler ?

Probablement. Mais c'est un sujet pour un autre article — d'autres sont bien plus qualifiés que moi pour approfondir les stratégies de scalabilité.

Voici tout de même mon avis :

  • Si vous scalez en fonction du nombre de messages en attente, attribuez un poids à chaque priorité (par exemple, je compte 1 message prioritaire comme 1 200 messages non urgents).

  • Au début, ce ne sera pas parfait. Faites preuve de vigilance.

  • Lorsque vous réduisez le nombre de worker, utilisez l'hystérésis pour éviter les fluctuations entre un nombre trop élevé et un nombre trop faible de worker.

Ah voilà qui est mieux

Je peux enfin le dire, pour de vrai :

Je suis capable de prioriser de manière dynamique et de m'assurer que chaque message est traité pendant une période donnée.

gif problem solved📋 Aide-mémoire pour établir les priorités

Vous souhaitez que vos messages soient traités à temps ? Établissez des priorités.

  • Répartissez vos messages par priorité : définissez des queues telles que haute, moyenne, faible et très faible. Traitez-les dans l'ordre.

  • Routage dynamique avec un stamp : Utilisation de TransportNamesStamp ou d'un stamp personnalisé.

  • Divisez les grandes tâches en petites tâches : ne laissez pas un seul message monopoliser un worker pendant des heures. Divisez-le en plusieurs petits messages et répartissez-les.

🏁 En conclusion

La priorisation des messages dans Symfony Messenger n'est pas une tâche facile, mais ce n'est pas non plus sorcier. Avec un peu de planification, quelques lignes de code personnalisées et une approche axée sur le temps de traitement (et pas seulement sur le débit), vous pouvez créer un système qui traite les tâches importantes en priorité, sans pour autant bloquer les autres.

Et une fois que vous avez mis cela en place ? Vous ne vous contentez plus d'envoyer des messages, vous orchestrez le flux.

Prêt à optimiser les performances de votre application Symfony ?

Mettez en place dès aujourd'hui une hiérarchisation des messages avec l'équipe du créateur de Symfony et assurez-vous que vos tâches les plus critiques soient toujours traitées en priorité.

Image