A robust Laravel package for integrating with RabbitMQ, providing support for multiple connections, exchanges, queues, and consumers. This package simplifies the process of working with RabbitMQ in your Laravel applications.
- Multiple RabbitMQ connections support
- SSL/TLS connection support
- Easy configuration of exchanges and queues
- Flexible consumer system
- Built-in retry mechanism
- Command-line worker
- Event-driven architecture
- PHP 8.3 or higher
- Laravel 12.0 or higher
- RabbitMQ server
- php-amqplib/php-amqplib ^3.7
You can install the package via composer:
composer require juniorfontenele/laravel-rabbitmq
After installing, publish the configuration file:
php artisan rabbitmq:install
Add the following variables to your .env
file:
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
RABBITMQ_SSL=false
For SSL connections, you can also configure:
RABBITMQ_SSL=true
RABBITMQ_SSL_CAFILE=/path/to/ca.pem
RABBITMQ_SSL_CERTFILE=/path/to/cert.pem
RABBITMQ_SSL_KEYFILE=/path/to/key.pem
RABBITMQ_SSL_VERIFY_PEER=true
The published configuration file (config/rabbitmq.php
) contains sections for:
Define your RabbitMQ server connections:
'connections' => [
'default' => [
'host' => env('RABBITMQ_HOST', 'localhost'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'ssl' => [
'enabled' => env('RABBITMQ_SSL', false),
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_CERTFILE', null),
'local_key' => env('RABBITMQ_SSL_KEYFILE', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
],
],
// Add more connections as needed
]
Configure your RabbitMQ exchanges:
'exchanges' => [
'default' => [
'connection' => 'default', // Connection to use
'name' => 'app.default', // Exchange name
'type' => 'direct', // Type: direct, topic, fanout, headers
'passive' => false, // Don't create, error if doesn't exist
'durable' => true, // Survive broker restart
'auto_delete' => false, // Delete when no queues bound
'internal' => false, // No direct publishing
'arguments' => [], // Additional arguments
],
]
Configure your RabbitMQ queues:
'queues' => [
'default' => [
'exchange' => 'default', // Exchange to bind to
'name' => 'default_queue', // Queue name
'routing_key' => 'default_queue', // Routing key
'consumer_tag' => 'consumer_tag', // Consumer identifier
'passive' => false, // Don't create, error if doesn't exist
'durable' => true, // Survive broker restart
'exclusive' => false, // Only one connection can use
'auto_delete' => false, // Delete when no consumers
'arguments' => [], // Additional arguments
'prefetch' => [
'count' => 1, // Messages to prefetch
'size' => 0, // Total size in bytes
],
'retry' => [
'enabled' => true, // Enable retry mechanism
'max_attempts' => 3, // Maximum retry attempts
'delay' => 60000, // Delay between retries (ms)
],
],
]
Configure worker behavior:
'worker' => [
'memory_limit' => 128, // Memory limit in MB
'timeout' => 60, // Wait timeout in seconds
'sleep' => 3, // Sleep when no message (seconds)
'max_jobs' => 0, // Max jobs (0 = unlimited)
'tries' => 1, // Default retry attempts
],
You can publish messages using the RabbitMQ facade:
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ;
The package provides a standardized EventMessage
class that helps with formatting and handling messages:
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ;
use JuniorFontenele\LaravelRabbitMQ\Messages\EventMessage;
// Create a standardized event message
$message = EventMessage::make('user.created', [
'id' => 123,
'name' => 'John Doe',
'email' => 'john@example.com'
])
->routingKey('user.events')
->messageId(uniqid())
->correlationId('request-123');
// Publish the event message
RabbitMQ::publish('default', $message);
The EventMessage
automatically includes:
- Timestamp
- Application name
- Hostname
- Event name
- Payload data
Create a custom consumer by extending the base Consumer
class:
<?php
namespace App\Consumers;
use JuniorFontenele\LaravelRabbitMQ\Consumer;
use JuniorFontenele\LaravelRabbitMQ\Messages\EventMessage;
use PhpAmqpLib\Message\AMQPMessage;
class NotificationsConsumer extends Consumer
{
/**
* Process the message.
*
* @param AMQPMessage $message
* @return void
*/
public function consume(AMQPMessage $message): void
{
// Parse message as a standard EventMessage
try {
$eventMessage = EventMessage::tryFrom($message);
// Access standardized event data
$event = $eventMessage->getEvent();
$payload = $eventMessage->getPayload();
$messageId = $eventMessage->getMessageId();
$correlationId = $eventMessage->getCorrelationId();
// Process based on event type
match($event) {
'user.created' => $this->handleUserCreated($payload),
'user.updated' => $this->handleUserUpdated($payload),
default => $this->handleUnknownEvent($event, $payload),
};
// Or process as raw data
$data = json_decode($message->getBody(), true);
// Process the message
// ...
} catch (\Exception $e) {
// Handle error parsing the message
$this->failed($message, $e);
return;
}
// Acknowledge the message after successful processing
$message->ack();
}
protected function handleUserCreated(array $payload): void
{
// Handle user created event
}
protected function handleUserUpdated(array $payload): void
{
// Handle user updated event
}
protected function handleUnknownEvent(string $event, array $payload): void
{
// Handle unknown event
}
}
Register your consumer in a service provider:
<?php
namespace App\Providers;
use App\Consumers\NotificationsConsumer;
use Illuminate\Support\ServiceProvider;
use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager;
class AppServiceProvider extends ServiceProvider
{
public function boot(RabbitMQManager $manager)
{
// Register consumer for notifications queue
$manager->registerConsumer('notifications', NotificationsConsumer::class);
}
}
You can also auto-register consumers by adding them to the App\Consumers
folder. You have to extend the base JuniorFontenele\LaravelRabbitMQ\Consumer
class and use a studly name for class, e.g. NotificationsConsumer
for the notifications
queue. The package will automatically discover and register them.
You can start a worker via command line:
# Basic usage
php artisan rabbitmq:work default
# With options
php artisan rabbitmq:work notifications \
--memory=256 \
--timeout=120 \
--sleep=5 \
--max-jobs=1000 \
--tries=3 \
--once
Or programmatically:
use JuniorFontenele\LaravelRabbitMQ\Worker;
$worker = app(Worker::class);
$worker->work('notifications', [
'memory_limit' => 256,
'timeout' => 120,
'max_jobs' => 1000,
'verbose' => true
]);
The package dispatches the following events:
-
rabbitmq.processing
: Before processing a message -
rabbitmq.processed
: After successful processing -
rabbitmq.failed
: When processing fails
You can listen for these events in your EventServiceProvider
:
protected $listen = [
'rabbitmq.processing' => [
\App\Listeners\LogProcessingMessage::class,
],
'rabbitmq.processed' => [
\App\Listeners\LogProcessedMessage::class,
],
'rabbitmq.failed' => [
\App\Listeners\LogFailedMessage::class,
],
];
Or using the event facade:
use Illuminate\Support\Facades\Event;
Event::listen('rabbitmq.failed', function($message, $queue, $exception) {
Log::error("Failed to process message", [
'queue' => $queue,
'error' => $exception->getMessage()
]);
});
The package includes a built-in retry mechanism:
- When a consumer throws an exception, the
failed
method is called - The message retry count is checked against the configured maximum attempts
- If retries are available, the message is requeued
- If max retries are reached, the message is rejected (not requeued)
- A
rabbitmq.failed
event is dispatched
You can customize this behavior by overriding the failed
method in your consumer:
public function failed(AMQPMessage $message, Throwable $exception): void
{
// Custom failure handling
Log::error("Custom failure handling", [
'message' => $message->getBody(),
'exception' => $exception->getMessage()
]);
// Call parent implementation or handle completely custom
parent::failed($message, $exception);
}
- Use durable exchanges and queues for important messages
- Configure prefetch count appropriately for your workload
- Implement proper error handling in consumers
- Use meaningful routing keys for better message routing
- Configure retry policies based on your use case
- Monitor memory usage and set appropriate limits
- Use correlation IDs to track related messages
- Implement dead letter exchanges for failed messages
The package includes tests that you can run:
composer test
To test your own implementation, you can mock the RabbitMQManager
in your tests:
use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager;
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ;
// Mock the RabbitMQManager
$this->mock(RabbitMQManager::class, function ($mock) {
$mock->shouldReceive('publish')
->once()
->with('notifications', ['test' => 'data'], [])
->andReturn(null);
});
// Call your code that uses the RabbitMQ facade
RabbitMQ::publish('notifications', ['test' => 'data']);
Please see CHANGELOG for more information on what has changed recently.
Please see CONTRIBUTING for details.
Please review our security policy on how to report security vulnerabilities.
The MIT License (MIT). Please see License File for more information.