How To Prioritize Messages When Building Asynchronous Applications With Symfony Messenger

Asynchronous processing offers benefits like decoupled processes and faster response times, but managing message priorities can become a challenge. When dealing with tasks ranging from password resets to complex exports, ensuring timely delivery of critical messages is essential. This article explores common asynchronous processing issues and provides solutions using Symfony Messenger, allowing you to optimize your application without extensive refactoring.
Going asynchronous sounds like a dream: decoupled processes, faster response times, and no more users staring at spinning wheels. But pretty quickly, reality hits — some messages take forever, others are way too important to be delayed, and suddenly you’re drowning in a swamp of priorities.
Whether you're firing off password reset emails or triggering complex exports, you need to make sure the right messages get through at the right time. This article dives into the problems you’ll face — and how to solve them using Symfony Messenger, without rewriting your app from scratch or crying into your logs at 3 AM.
The problem: prioritize dynamically every message
When you start queueing messages in your Symfony app, one thing becomes obvious real fast: not all messages are equal. Some are critical and time-sensitive. Others… not so much.
Some transports already offer a way to handle priorities like:
🐰 RabbitMQ has x-priority
🌱 Beanstalkd has built-in tube priority
Nice — but what if I want to switch to another transport tomorrow without rewriting half my code?
Symfony Messenger has a way
The official documentation shows how to split messages into multiple transports based on priority. Think of it like assigning lanes on a highway: one for ambulances, one for scooters.
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: high
async_priority_medium:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: medium
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
async_priority_very_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: very_low
routing:
'App\Message\ExportMessage': async_priority_low
'App\Message\UpdateStateMessage': async_priority_high
💡 Note: The queue names are up to you — just make sure they reflect your actual use case. These are fictional examples.
Now that we’ve split messages by priority, consuming them in the right order is just as important. Thankfully, Symfony Messenger makes this super easy:
php bin/console messenger:consume async_priority_high async_priority_medium async_priority_low async_priority_very_low
The worker will first consume from async_priority_high
. If it’s empty, it will try the next one. And so on. So even if there’s a backlog of non-urgent messages, high-priority ones don't get stuck waiting behind them.
How do I choose the right queue?
This was honestly the trickiest part for me. There’s no one-size-fits-all formula, and this table is crucial — everything else in this article depends on it.
👉 It obviously needs to be filled with the client or the product owner and not just dev gut feelings.
The question you’re answering here is simple, but powerful:
“What’s the maximum acceptable delay (including queue time and actual handling) for each kind of message?”
And from there, you get your priority mapping:
Priority | High | Medium | Low | Very low |
Time before handling | 1 minute | 10 minutes | 1 hour | 1 day |
Example | - State update | CMS update | - Prices update - Analytics processing | - Export - Anonymization |
Now that I’ve split the messages into queues, everything should be perfect, right?
Well… not quite.
🙃 One problem remains: message types can be too generic
Let’s say I have an EmailMessage. Sounds fine. But I might use it for:
A password reset 🟥 High priority
A delivery notification 🟨 Medium
A “rate your purchase” ping 🟦 Low or very low
So... how do I assign a transport when the same message class can represent totally different levels of urgency?
Another problem : A message should be able to have more than one priority
Enter: TransportNamesStamp and our custom PriorityStamp
Luckily, Symfony Messenger already has a built-in way to force a message to go to a specific transport: TransportNamesStamp
. But to make things cleaner (and more semantic), let’s introduce our own PriorityStamp
:
namespace App\Messenger\Stamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
readonly class PriorityStamp implements StampInterface
{
public function __construct(private string $priority) {}
public function getPriority(): string
{
return $this->priority;
}
}
And now, a custom middleware to hook into the dispatch flow:
readonly class PriorityRoutingMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// Check if the message has a PriorityStamp
$priorityStamp = $envelope->last(PriorityStamp::class);
if ($priorityStamp instanceof PriorityStamp) {
$priority = $priorityStamp->getPriority();
// Determine the transport based on priority
$transport = match ($priority) {
'high' => 'high_priority',
'medium' => 'medium_priority',
'low' => 'low_priority',
'very_low' => 'very_low_priority',
default => throw new \RuntimeException('Unknow priority level')
};
// Add a TransportNamesStamp to redirect the message
$envelope = $envelope->with(new TransportNamesStamp([$transport]));
}
return $stack->next()->handle($envelope, $stack);
}
}
Then, don’t forget to add our new custom middleware to the messenger configuration:
framework:
messenger:
# ...
buses:
messenger.bus.default:
middleware:
- 'App\Messenger\Middleware\PriorityRoutingMiddleware'
Now, when sending a message, I can easily override its priority — no need to create a new message class or refactor everything.
$this->messageBus->dispatch(
new SendEmailMessage($notificationEmail),
[new PriorityStamp('medium')],
)
In this case, we’re saying:
“Hey, this email isn’t that urgent — 10 minutes is fine.”
This keeps critical messages flowing fast, without flooding your high-priority queue with lower-stakes noise.
So where are we now?
✅ I can dynamically choose the transport
✅ I can adjust the priority at dispatch time
❌ I can ensure every message is handled within its max allowed time ← Still not there yet.
So… what’s the problem now?
Let’s say I send a message to update the price of every product variant in my catalog — or a large subset.
Not a big deal if I’ve got a few product variants. But if I have 100,000 variants? 500,000 variants? A million ? And each one makes a remote API call to fetch the price?
Here what happens:
📨 A message enters the queue.
🧠 It starts processing.
⏳ It takes 5, 10, 30… 60 minutes.
🧵 Meanwhile, one PHP worker is stuck.
I could try to batch those API calls. Sure, that helps — but it only reduces the problem. It doesn’t solve it. Not in a way that truly scales.
And that’s a problem, even with all our beautiful prioritization logic — because long-running messages don’t play well in this model.
Last problem: Some messages take ages to be handled
Let’s say I have this message and corresponding handler in my project corresponding to the variant price update process:
readonly class UpdatePrices
{
public function __construct(
public array $filters,
) {
}
}
#[AsMessageHandler]
readonly class UpdatePricesHandler
{
public function __invoke(UpdatePrices $message): void
{
foreach ($this->productVariantRepository->findAllByRegex($message->filters) as $product) {
$this->priceUpdater->updatePriceForVariant($variant);
}
$this->em->flush();
}
}
Let’s be honest: the problem screams at us.
If 200,000 variants match this regex, and each update takes 0.1 second, that’s ~5 hours of processing in one go — way beyond our 1-hour limit.
To avoid jamming the queue, we’ll break this job into smaller messages. Let’s go to the extreme: one message = one variant update.
Keep the main message:
readonly class UpdatePrices
{
public function __construct(
public array $filters,
) {
}
}
Add a unitary one:
readonly class UpdateVariantPrice
{
public function __construct(
public int $variantId,
) {
}
}
Now change the handler to dispatch one message per variant:
public function __invoke(UpdatePrices $message): void
{
foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
$this->messageBus->dispatch(new UpdateVariantPrice($product->getId()));
}
}
Bonus: we can even plug in our dynamic priority logic:
public function __invoke(UpdatePrices $message): void
{
foreach ($this->variantRepository->findByComplexQuery($message->filters) as $variant) {
if($variant->isSoldVeryOften()) {
$this->messageBus->dispatch(
new UpdateVariantPrice($variant->getId()),
[new PriorityStamp('medium')]
);
} else {
$this->messageBus->dispatch(new UpdateVariantPrice($variant->getId()));
}
}
}
And the new handler for the unitary message:
public function __invoke(UpdateVariantPrice $message): void
{
$variant = $this->variantRepository->find($message->variantId);
if (!$variant instanceof ProductVariant) {
throw new UnrecoverableMessageHandlingException("Impossible to find the variant");
}
$this->priceUpdater->update($variant);
$this->em->flush();
}
Sure, the total time may increase slightly, but the queue stays fluid. If a higher-priority message comes in, it’s picked up right away.
So what about now :
✅ I can dynamically choose the transport
✅ I can adjust the priority at dispatch time
❌ I can ensure every message is handled within its max allowed time ← Still not there yet.
…Wait. Nothing changed?
Let’s say I’ve sent:
3 price updates
10 exports
Hundreds of state updates
A few anonymizations
Do we need to scale?
Probably. But that’s a topic for another article — others are way more qualified than me to go deep on scaling strategies.
Still, here’s my two cents:
If you scale based on the number of pending messages, assign a weight per priority. (E.g. I count 1 high message as 1,200 very low ones.)
It won’t be perfect at first. Monitoring is your friend.
When downscaling, use hysteresis to avoid flapping between too many and too few workers.
Now we're talking
Now I can finally say it, for real:
I am able to prioritize dynamically and ensure that every message is handled during a given time period.
Prioritization cheat sheet
You want your messages to be processed in time? Prioritize.
Split your messages by priority: Define queues like high, medium, low, and very_low. Consume them in order.
Route dynamically with a stamp: Using
TransportNamesStamp
or a custom one.Break big tasks into small ones: Don’t let a single message hog a worker for hours. Split it into smaller ones, and dispatch them.
In conclusion
Prioritizing messages in Symfony Messenger isn’t plug-and-play, but it’s not rocket science either. With a bit of planning, some custom code, and a mindset focused on time-to-handle (not just throughput), you can build a system where important things get done first — without choking the rest.
And once you’ve got that running? You’re not just dispatching messages anymore — you’re orchestrating flow.