Brauchst du einen Experten, der dir bei deinem Symfony- oder PHP-Entwicklungsprojekt hilft? Fordere jetzt ein Angebot an


Wie man Nachrichten beim Aufbau asynchroner Anwendungen mit dem Symfony-Messenger priorisiert

· Thibaut Chieux · 7 Minuten zum Lesen
Blue sign on a building with several Now What? letters

Die asynchrone Verarbeitung bietet Vorteile wie entkoppelte Prozesse und schnellere Reaktionszeiten. Die Verwaltung von Nachrichtenprioritäten kann jedoch zu einer Herausforderung werden. Bei Aufgaben, die vom Zurücksetzen von Passwörtern bis hin zu komplexen Exporten reichen, ist die rechtzeitige Zustellung kritischer Nachrichten unerlässlich. Dieser Artikel befasst sich mit häufigen Problemen bei der asynchronen Verarbeitung und zeigt Lösungen mit Symfony Messenger auf, mit denen Sie Ihre Anwendung ohne umfangreiches Refactoring optimieren können.

Asynchronität klingt wie ein Traum: entkoppelte Prozesse, schnellere Reaktionszeiten und keine Benutzer mehr, die auf Ladebalken starren. Doch schnell holt einen die Realität ein – manche Nachrichten brauchen ewig, andere sind viel zu wichtig, um verzögert zu werden, und plötzlich versinkt man in einem Sumpf von Prioritäten.

Ganz gleich, ob Sie Passwörter zurücksetzen oder komplexe Exporte auslösen, Sie müssen sicherstellen, dass die richtigen Nachrichten zur richtigen Zeit ankommen. Dieser Artikel beleuchtet die Probleme, denen Sie begegnen werden – und wie Sie sie mit Symfony Messenger lösen können, ohne Ihre Anwendung von Grund auf neu zu schreiben oder um 3 Uhr morgens in Ihre Logs zu weinen.

Das Problem: Jede Nachricht dynamisch priorisieren

Animated gif with a lady with red glasses talking with the text "We have a priorities problem" belowWenn Sie beginnen, Nachrichten in Ihrer Symfony-Anwendung in eine Warteschlange zu stellen, wird eines schnell offensichtlich: Nicht alle Nachrichten sind gleich. Einige sind kritisch und zeitkritisch. Andere... eher weniger.

Einige Transportsysteme bieten bereits eine Möglichkeit, Prioritäten zu behandeln, wie zum Beispiel:

🐰 RabbitMQ hat x-priority

🌱 Beanstalkd hat eine integrierte Warteschlangenpriorität

Schön – aber was, wenn ich morgen zu einem anderen Transport wechseln möchte, ohne die Hälfte meines Codes neu zu schreiben?

Symfony Messenger bietet eine Lösung

Die offizielle Dokumentation zeigt, wie man Nachrichten basierend auf Priorität auf mehrere Transportsysteme aufteilt. Stellen Sie sich das wie das Zuweisen von Fahrspuren auf einer Autobahn vor: eine für Krankenwagen, eine für Roller.

framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
            async_priority_medium:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: medium
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low
            async_priority_very_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: very_low

        routing:
            'App\Message\ExportMessage': async_priority_low
            'App\Message\UpdateStateMessage': async_priority_high

💡 Hinweis: Die Warteschlangennamen können Sie selbst wählen – stellen Sie einfach sicher, dass sie Ihren tatsächlichen Anwendungsfall widerspiegeln. Dies sind fiktive Beispiele.

Nachdem wir die Nachrichten nach Priorität aufgeteilt haben, ist das Konsumieren in der richtigen Reihenfolge genauso wichtig. Glücklicherweise macht Symfony Messenger das super einfach:

php bin/console messenger:consume async_priority_high async_priority_medium async_priority_low async_priority_very_low

Der Worker wird zuerst von async_priority_high konsumieren. Wenn diese leer ist, versucht er die nächste. Und so weiter. Selbst wenn es einen Rückstand an nicht dringenden Nachrichten gibt, bleiben Nachrichten mit hoher Priorität nicht dahinter hängen.

Wie wähle ich die richtige Warteschlange?

Das war ehrlich gesagt der kniffligste Teil für mich. Es gibt keine Patentlösung, und diese Tabelle ist entscheidend – alles andere in diesem Artikel hängt davon ab.

👉 Sie muss offensichtlich mit dem Kunden oder dem Produktverantwortlichen gefüllt werden und nicht nur mit dem Bauchgefühl des Entwicklers.

Die Frage, die Sie hier beantworten, ist einfach, aber aussagekräftig:

„Was ist die maximal akzeptable Verzögerung (einschließlich Warteschlangenzeit und tatsächlicher Bearbeitung) für jede Art von Nachricht?“

Und daraus ergibt sich Ihre Prioritätszuordnung:

Priorität

Hoch

Mittel

Niedrig

Sehr niedrig

Bearbeitungszeit

1 Minute

10 Minuten

1 Stunde

1 Tag

Beispiel

- Status-Update- E-Mail

- CMS-Update

- Preis-Update

- Analyse-Verarbeitung

- Export- Anonymisierung

Nachdem ich die Nachrichten in Warteschlangen aufgeteilt habe, sollte alles perfekt sein, oder?

Nun ja… nicht ganz.

🙃 Ein Problem bleibt: Nachrichtentypen können zu generisch sein

Nehmen wir an, ich habe eine EmailMessage. Klingt gut. Aber ich könnte sie verwenden für:

  • Eine Passwortrücksetzung → 🟥 Hohe Priorität

  • Eine Lieferbenachrichtigung → 🟨 Mittlere Priorität

  • Einen „Bewerten Sie Ihren Kauf“-Ping → 🟦 Niedrige oder sehr niedrige Priorität

Wie weise ich also einen Transport zu, wenn dieselbe Nachrichtenklasse völlig unterschiedliche Dringlichkeitsstufen darstellen kann?

Ein weiteres Problem: Eine Nachricht sollte mehr als eine Priorität haben können

Enter: TransportNamesStamp und und unser benutzerdefinierter PriorityStamp

Hier kommen TransportNamesStamp und unser benutzerdefinierter PriorityStamp ins Spiel. Glücklicherweise bietet Symfony Messenger bereits eine integrierte Möglichkeit, eine Nachricht zu zwingen, zu einem bestimmten Transport zu gehen: TransportNamesStamp. Aber um die Dinge sauberer (und semantischer) zu gestalten, führen wir unseren eigenen PriorityStamp ein:

namespace App\Messenger\Stamp;

use Symfony\Component\Messenger\Stamp\StampInterface;

readonly class PriorityStamp implements StampInterface
{
    public function __construct(private string $priority) {}

    public function getPriority(): string
    {
        return $this->priority;
    }
}

Und jetzt eine benutzerdefinierte Middleware, die sich in den Dispatch-Fluss einklinkt:

readonly class PriorityRoutingMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        // Check if the message has a PriorityStamp
        $priorityStamp = $envelope->last(PriorityStamp::class);

        if ($priorityStamp instanceof PriorityStamp) {
            $priority = $priorityStamp->getPriority();

            // Determine the transport based on priority
            $transport = match ($priority) {
                'high' => 'high_priority',
                'medium' => 'medium_priority',
                'low' => 'low_priority',
                'very_low' => 'very_low_priority',
                default => throw new \RuntimeException('Unknow priority level')
            };

            // Add a TransportNamesStamp to redirect the message
            $envelope = $envelope->with(new TransportNamesStamp([$transport]));
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

Vergessen Sie dann nicht, unsere neue benutzerdefinierte Middleware zur Messenger-Konfiguration hinzuzufügen:

framework:
    messenger:
      # ...
      buses:
        messenger.bus.default:
          middleware:
            - 'App\Messenger\Middleware\PriorityRoutingMiddleware'

Beim Senden einer Nachricht kann ich jetzt einfach ihre Priorität überschreiben – ohne eine neue Nachrichtenklasse zu erstellen oder alles umzugestalten.

$this->messageBus->dispatch(
    new SendEmailMessage($notificationEmail),
    [new PriorityStamp('medium')],
)

In diesem Fall sagen wir:

Hey, diese E-Mail ist nicht so dringend – 10 Minuten sind in Ordnung.

Dies hält kritische Nachrichten schnell im Fluss, ohne Ihre Hochprioritätswarteschlange mit weniger wichtigen Nachrichten zu überfluten.

Wo stehen wir jetzt?

✅ Ich kann den Transport dynamisch wählen

✅ Ich kann die Priorität zum Zeitpunkt des Versands anpassen

❌ Ich kann sicherstellen, dass jede Nachricht innerhalb ihrer maximal zulässigen Zeit bearbeitet wird ← Immer noch nicht erreicht.

Was ist denn jetzt das Problem?

Nehmen wir an, ich sende eine Nachricht, um den Preis jeder Produktvariante in meinem Katalog zu aktualisieren – oder eines großen Teils davon.

Kein großes Problem, wenn ich nur wenige Produktvarianten habe. Aber wenn ich 100.000 Varianten habe? 500.000 Varianten? Eine Million? Und jede einzelne macht einen Remote-API-Aufruf, um den Preis abzurufen?

Das passiert dann:

📨 Eine Nachricht gelangt in die Warteschlange.

🧠 Sie beginnt mit der Verarbeitung.

⏳ Es dauert 5, 10, 30… 60 Minuten.

🧵 Währenddessen steckt ein PHP-Worker fest.

Moone Boy watching his watch sitting on the ground with a suitcase next to himIch könnte versuchen, diese API-Aufrufe zu bündeln. Sicher, das hilft – aber es reduziert das Problem nur. Es löst es nicht. Nicht auf eine Weise, die wirklich skaliert.

Und das ist ein Problem, selbst mit unserer schönen Priorisierungslogik – denn lang laufende Nachrichten passen nicht gut in dieses Modell.

Letztes Problem: Manche Nachrichten brauchen ewig, bis sie bearbeitet werden

Nehmen wir an, ich habe diese Nachricht und den entsprechenden Handler in meinem Projekt, die dem Prozess der Variantenpreisaktualisierung entsprechen:

readonly class UpdatePrices
{
    public function __construct(
        public array $filters,
    ) {
    }
}
#[AsMessageHandler]
readonly class UpdatePricesHandler
{
    public function __invoke(UpdatePrices $message): void
    {
        foreach ($this->productVariantRepository->findAllByRegex($message->filters) as $product) {
            $this->priceUpdater->updatePriceForVariant($variant);
        }
        
        $this->em->flush();
    }
}

Seien wir ehrlich: Das Problem schreit uns an. Wenn 200.000 Varianten diesem Regex entsprechen und jede Aktualisierung 0,1 Sekunden dauert, sind das ~5 Stunden Verarbeitung auf einmal – weit über unser 1-Stunden-Limit hinaus.

Um ein Verstopfen der Warteschlange zu vermeiden, werden wir diesen Job in kleinere Nachrichten aufteilen. Gehen wir ins Extrem: eine Nachricht = eine Variantenaktualisierung.

Behalten Sie die Hauptnachricht bei:

readonly class UpdatePrices
{
    public function __construct(
        public array $filters,
    ) {
    }
}

Fügen Sie eine unitäre hinzu:

readonly class UpdateVariantPrice
{
    public function __construct(
        public int $variantId,
    ) {
    }
}

Ändern Sie nun den Handler, um eine Nachricht pro Variante zu versenden:

public function __invoke(UpdatePrices $message): void
{
	foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
        $this->messageBus->dispatch(new UpdateVariantPrice($product->getId()));
    }
}

Bonus: Wir können sogar unsere dynamische Prioritätslogik einstecken:

public function __invoke(UpdatePrices $message): void
{
	foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
    	if($variant->isSoldVeryOften()) {
        	$this->messageBus->dispatch(
            	new UpdateVariantPrice($variant->getId()),
            	[new PriorityStamp('medium')]
            );
        } else {
        	$this->messageBus->dispatch(new UpdateVariantPrice($variant->getId()));
        }
    }
}

Und der neue Handler für die unitäre Nachricht:

public function __invoke(UpdateVariantPrice $message): void
{
    $variant = $this->variantRepository->find($message->variantId);
    if (!$variant instanceof ProductVariant) {
        throw new UnrecoverableMessageHandlingException("Impossible to find the variant");
    }
    
    $this->priceUpdater->update($variant);
    $this->em->flush();
}

Sicher, die Gesamtzeit kann leicht steigen, aber die Warteschlange bleibt flüssig. Wenn eine Nachricht mit höherer Priorität eingeht, wird sie sofort bearbeitet.

Was ist jetzt?

✅ Ich kann den Transport dynamisch wählen

✅ Ich kann die Priorität zum Zeitpunkt des Versands anpassen

❌ Ich kann sicherstellen, dass jede Nachricht innerhalb ihrer maximal zulässigen Zeit bearbeitet wird ← Immer noch nicht erreicht.

…Warten Sie. Nichts hat sich geändert?

Anakin Skywalker angry screaming "Liar"Nehmen wir an, ich habe gesendet:

  • 3 Preis-Updates

  • 10 Exporte

  • Hunderte von Status-Updates

  • Einige Anonymisierungen

Müssen wir skalieren?

Wahrscheinlich. Aber das ist ein Thema für einen anderen Artikel – andere sind viel qualifizierter als ich, um tief in Skalierungsstrategien einzusteigen. Trotzdem meine zwei Cents:

  • Wenn Sie basierend auf der Anzahl der ausstehenden Nachrichten skalieren, weisen Sie jeder Priorität ein Gewicht zu. (Z.B. zähle ich 1 hochpriorisierte Nachricht als 1.200 sehr niedrigpriorisierte.)

  • Es wird am Anfang nicht perfekt sein. Monitoring ist Ihr Freund.

  • Beim Herunterskalieren verwenden Sie Hysterese, um ein Flackern zwischen zu vielen und zu wenigen Workern zu vermeiden.

Jetzt reden wir!

Jetzt kann ich es endlich wirklich sagen:

Ich bin in der Lage, dynamisch zu priorisieren und sicherzustellen, dass jede Nachricht innerhalb eines bestimmten Zeitraums bearbeitet wird.

Spickzettel zur Priorisierung

Sie möchten, dass Ihre Nachrichten pünktlich verarbeitet werden? Priorisieren Sie.

  • Teilen Sie Ihre Nachrichten nach Priorität auf: Definieren Sie Warteschlangen wie high, medium, low und very_low. Konsumieren Sie sie der Reihe nach.

  • Routen Sie dynamisch mit einem „Stamp“: Verwenden Sie TransportNamesStamp oder einen benutzerdefinierten.

  • Brechen Sie große Aufgaben in kleine auf: Lassen Sie eine einzelne Nachricht einen Worker nicht stundenlang blockieren. Teilen Sie sie in kleinere auf und versenden Sie diese.

Fazit

Das Priorisieren von Nachrichten in Symfony Messenger ist nicht Plug-and-Play, aber auch keine Raketenwissenschaft. Mit ein wenig Planung, etwas benutzerdefiniertem Code und einer Denkweise, die sich auf die Bearbeitungszeit (nicht nur den Durchsatz) konzentriert, können Sie ein System aufbauen, in dem wichtige Dinge zuerst erledigt werden – ohne den Rest zu ersticken.

Und wenn das erst einmal läuft? Dann versenden Sie nicht mehr nur Nachrichten – Sie orchestrieren den Fluss.

Möchten Sie, dass Ihre Symfony-Anwendung besser läuft?

Machen Sie jetzt die Nachrichten-Priorisierung mit dem Team des Schöpfers von Symfony. So stellen Sie sicher, dass Ihre wichtigsten Aufgaben immer zuerst erledigt werden.

Image