In this example, we use#
Create DomainEvent Entity#
First of all, create an abstract Entity of which they will inherit all domain events. This entity will be placed on a shared domain as it will be used in different contexts and of this example it will be placed in the writing layer as we implement CQRS.
src/Shared/Domain/Write/Event/DomainEvent.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| <?php
declare(strict_types=1);
namespace App\Shared\Domain\Write\Event;
use DateTimeImmutable;
abstract class DomainEvent
{
private const DATE_FORMAT = 'Y-m-d H:i:s';
public function __construct(private ?string $occurredOn = null)
{
$this->occurredOn = $occurredOn ?? (new DateTimeImmutable())->format(self::DATE_FORMAT);
}
public function occurredOn(): string
{
return $this->occurredOn;
}
}
|
Create AggregateRoot Entity#
Create an abstract class called AggregateRoot
witch inherit all root entities.
The AggregateRoot
is used to define the main entity from aggregate and can store on memory and publish the domain events from the entity.
src/Shared/Domain/Write/Aggregate/AggregateRoot.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| <?php
declare(strict_types=1);
namespace App\Shared\Domain\Write\Aggregate;
use App\Shared\Domain\Write\Event\DomainEvent;
abstract class AggregateRoot
{
private array $domainEvents = [];
final protected function recordEvent(DomainEvent $domainEvent): void
{
$this->domainEvents[] = $domainEvent;
}
final public function domainEventsEmpty(): bool
{
return empty($this->domainEvents);
}
final public function pullDomainEvents(): array
{
$recordedEvents = $this->domainEvents;
$this->domainEvents = [];
return $recordedEvents;
}
}
|
And create a simple root entity to use which uses the above class and record an event when construct.
Firstly create a simple Event like a DTO called FooWasCreated
.
src/Context/Foo/Domain/Write/Event/FooWasCreated.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| <?php
declare(strict_types=1);
namespace App\Context\Foo\Domain\Write\Event;
use App\Shared\Domain\Write\Event\DomainEvent;
final class FooWasCreated extends DomainEvent
{
public function __construct(
public readonly string $id,
public readonly string $name,
public readonly string $createdAt,
?string $occurredOn = null
) {
parent::__construct($occurredOn);
}
}
|
Then can create a root entity to record the event and store it in memory when construct the entity.
src/Context/Foo/Domain/Write/Foo.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| <?php
declare(strict_types=1);
namespace App\Context\Foo\Domain\Write;
use App\Context\Foo\Domain\Write\Event\FooWasCreated;
use App\Shared\Domain\Write\Aggregate\AggregateRoot;
use DateTimeImmutable;
final class Foo extends AggregateRoot
{
public function __construct(
private string $id,
private string $name,
private DateTimeImmutable $createdAt
) {
$this->recordEvent(
new FooWasCreated($id, $name, $createdAt->format('Y-m-d H:i:s'))
);
}
public function id(): string
{
return $this->id;
}
public function name(): string
{
return $this->name;
}
public function createdAt(): DateTimeImmutable
{
return $this->createdAt;
}
}
|
NOTE: This entity must define as Doctrine mapping because when persist this entity trigger the events. In this example not define doctrine infrastructure layer to persist the entity, I think you can find a lot of tutorials about that and I don’t want to make this tutorial longer.
Define bus for events with messenger#
Define the interface on a shared domain in the write layer.
src/Shared/Domain/Write/Bus/EventBus.php
1
2
3
4
5
6
7
8
9
10
11
12
| <?php
declare(strict_types=1);
namespace App\Shared\Domain\Write\Bus\Event;
use App\Shared\Domain\Write\Event\DomainEvent;
interface EventBus
{
public function publish(DomainEvent ...$domainEvents): void;
}
|
Implement the interface with the concrete Symfony Messenger on the infrastructure layer. This bus is quite simple just has one method publish
is responsible for dispatching messages on MessageBus
.
src/Shared/Infrastructure/Bus/MessengerEventBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| <?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Bus;
use App\Shared\Domain\Write\Bus\EventBus;
use App\Shared\Domain\Write\Event\DomainEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
final readonly class MessengerEventBus implements EventBus
{
public function __construct(private MessageBusInterface $messageBus)
{
}
public function publish(DomainEvent ...$domainEvents): void
{
foreach ($domainEvents as $currentEvent) {
$this->messageBus->dispatch(
(new Envelope($currentEvent))->with(new DispatchAfterCurrentBusStamp())
);
}
}
}
|
Define Bus service messenger on messenger.yaml
in this example define async
bus to publish events on the ampqp
transport with RabbitMQ.
In this case, we use ampqp
because no need other dependences than Symfony Messenger but you can define other transports like a kafka
.
NOTE: You can find more information on: https://symfony.com/doc/current/messenger.html#transports-async-queued-messages
.env
1
| MESSENGER_TRANSPORT_DSN=amqp://guest:guest@rabbitmq:5672/%2f/messag
|
config/packages/messenger.yaml
To simplify the example I just define de basic configuration of the event bus but you should define the other bus like a query or command and the correct retry policy.
1
2
3
4
5
6
7
8
9
10
| framework:
messenger:
transports:
ampqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
buses:
async.event.bus:
default_middleware: allow_no_handlers
routing:
'App\Shared\Domain\Write\Event\DomainEvent': ampqp
|
config/services.yaml
To simplify this example I just defined the minimum services definition for this case but you need to define your different services need for your application.
1
2
3
4
5
6
7
8
9
10
11
12
13
| parameters:
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/'
App\Shared\Infrastructure\Bus\MessengerEventBus:
arguments:
- '@async.event.bus'
|
Create Doctrine Listener to publish DomainEvents when flush#
The idea is quite simple when flush the entity the listener gets the entities to update from Doctrine UnitOfWork
and publish the domain events if they have.
Once the events are published Symfony Messenger takes care of sending the Queue system.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| <?php
declare(strict_types=1);
namespace App\Shared\Infrastructure\Listener;
use App\Shared\Domain\Write\Bus\Event\EventBus;
use App\Shared\Domain\Write\Aggregate\AggregateRoot;
use Doctrine\ORM\Event\OnFlushEventArgs;
final readonly class DoctrinePublishDomainEventsOnFlushListener
{
public function __construct(private EventBus $eventBus)
{
}
public function onFlush(OnFlushEventArgs $eventArgs): void
{
$unitOfWork = $eventArgs->getObjectManager()->getUnitOfWork();
foreach ($unitOfWork->getScheduledEntityInsertions() as $entity) {
$this->publishDomainEvent($entity);
}
foreach ($unitOfWork->getScheduledEntityUpdates() as $entity) {
$this->publishDomainEvent($entity);
}
foreach ($unitOfWork->getScheduledEntityDeletions() as $entity) {
$this->publishDomainEvent($entity);
}
foreach ($unitOfWork->getScheduledCollectionDeletions() as $collection) {
foreach ($collection as $entity) {
$this->publishDomainEvent($entity);
}
}
foreach ($unitOfWork->getScheduledCollectionUpdates() as $collection) {
foreach ($collection as $entity) {
$this->publishDomainEvent($entity);
}
}
}
private function publishDomainEvent(object $entity): void
{
if ($entity instanceof AggregateRoot && !$entity->domainEventsEmpty()) {
$this->eventBus->publish(...$entity->pullDomainEvents());
}
}
}
|
And define the listener on a service definition.
config/services.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| parameters:
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/'
App\Shared\Infrastructure\Bus\MessengerEventBus:
arguments:
- '@async.event.bus'
App\Shared\Infrastructure\Listener\DoctrinePublishDomainEventsOnFlushListener:
tags:
- { name: doctrine.event_listener, event: onFlush }
|
NOTE: You can find more doctrine events on: https://www.doctrine-project.org/projects/doctrine-orm/en/2.15/reference/events.html#events-overview
Conclusion#
In this example, you can see how to implement an EventBus with Symfony Messenger and implement doctrine listener to publish domain events on flush at RabbitMQ following clean architectures.
You can read the article on Medium