From 4bcb38051362aa48c7406690196fe62b0d0b3cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Pillevesse?= Date: Thu, 16 Jan 2025 16:42:21 +0100 Subject: [PATCH] [Messenger] [AMQP] Add TransportMessageIdStamp logic for AMQP --- .../Amqp/Tests/Transport/AmqpReceiverTest.php | 72 ++++++++++++++++++- .../Amqp/Tests/Transport/AmqpSenderTest.php | 44 ++++++++++++ .../Bridge/Amqp/Transport/AmqpReceiver.php | 7 ++ .../Bridge/Amqp/Transport/AmqpSender.php | 5 ++ 4 files changed, 127 insertions(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php index 9dd86dcd07b42..53089084a2476 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php @@ -18,10 +18,13 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; /** @@ -74,13 +77,80 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage() $receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')])); } - private function createAMQPEnvelope(): \AMQPEnvelope + public function testTransportMessageIdStampIsCreatedWhenMessageIdIsSet() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $id = '01946fcb-4bcb-7aa7-9727-dac1c0374443'; + $amqpEnvelope = $this->createAMQPEnvelope($id); + + $connection = $this->createMock(Connection::class); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('get')->with('queueName')->willReturn($amqpEnvelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); + + /** @var AmqpReceivedStamp $amqpReceivedStamp */ + $amqpReceivedStamp = $actualEnvelope->last(AmqpReceivedStamp::class); + $this->assertNotNull($amqpReceivedStamp); + $this->assertSame($amqpEnvelope->getBody(), $amqpReceivedStamp->getAmqpEnvelope()->getBody()); + $this->assertSame($amqpEnvelope->getHeaders(), $amqpReceivedStamp->getAmqpEnvelope()->getHeaders()); + $this->assertSame($amqpEnvelope->getMessageId(), $amqpReceivedStamp->getAmqpEnvelope()->getMessageId()); + + /** @var TransportMessageIdStamp $transportMessageIdStamp */ + $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class); + $this->assertNotNull($transportMessageIdStamp); + $this->assertSame($id, $transportMessageIdStamp->getId()); + } + + public function testTransportMessageIdStampIsNotCreatedWhenMessageIdIsNotSet() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelope(); + + $connection = $this->createMock(Connection::class); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('get')->with('queueName')->willReturn($amqpEnvelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); + + /** @var AmqpReceivedStamp $amqpReceivedStamp */ + $amqpReceivedStamp = $actualEnvelope->last(AmqpReceivedStamp::class); + $this->assertNotNull($amqpReceivedStamp); + $this->assertSame($amqpEnvelope->getBody(), $amqpReceivedStamp->getAmqpEnvelope()->getBody()); + $this->assertSame($amqpEnvelope->getHeaders(), $amqpReceivedStamp->getAmqpEnvelope()->getHeaders()); + $this->assertSame($amqpEnvelope->getMessageId(), $amqpReceivedStamp->getAmqpEnvelope()->getMessageId()); + + /** @var TransportMessageIdStamp $transportMessageIdStamp */ + $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class); + $this->assertNull($transportMessageIdStamp); + } + + private function createAMQPEnvelope(?string $messageId = null): \AMQPEnvelope { $envelope = $this->createMock(\AMQPEnvelope::class); $envelope->method('getBody')->willReturn('{"message": "Hi"}'); $envelope->method('getHeaders')->willReturn([ 'type' => DummyMessage::class, ]); + $envelope->method('getMessageId')->willReturn($messageId); return $envelope; } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php index b1dda969fb49b..74529eda1fa15 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php @@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** @@ -118,4 +119,47 @@ public function testItThrowsATransportExceptionIfItCannotSendTheMessage() $sender = new AmqpSender($connection, $serializer); $sender->send($envelope); } + + public function testTransportMessageIdStampIsCreatedIfMessageIdIsSet() + { + $id = '01946fcb-4bcb-7aa7-9727-dac1c0374443'; + $stamp = new AmqpStamp(null, \AMQP_NOPARAM, ['message_id' => $id]); + + $envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); + + $connection = $this->createMock(Connection::class); + + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); + + $sender = new AmqpSender($connection, $serializer); + $returnedEnvelope = $sender->send($envelope); + + $transportMessageIdStamp = $returnedEnvelope->last(TransportMessageIdStamp::class); + $this->assertSame($id, $transportMessageIdStamp->getId()); + } + + public function testTransportMessageIdStampIsNotCreatedIfMessageIdIsNotSet() + { + $stamp = new AmqpStamp(null, \AMQP_NOPARAM, []); + + $envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); + + $connection = $this->createMock(Connection::class); + + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); + + $sender = new AmqpSender($connection, $serializer); + $returnedEnvelope = $sender->send($envelope); + + $transportMessageIdStamp = $returnedEnvelope->last(TransportMessageIdStamp::class); + $this->assertNull($transportMessageIdStamp); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 3c855e9ee46ce..1030223d625fb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -84,6 +85,12 @@ private function getEnvelope(string $queueName): iterable throw $exception; } + if (null !== $amqpEnvelope->getMessageId()) { + $envelope = $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with(new TransportMessageIdStamp($amqpEnvelope->getMessageId())); + } + yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)); } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php index 0e57671c662b1..bcf28a861bd5d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -54,6 +55,10 @@ public function send(Envelope $envelope): Envelope } } + if ($amqpStamp instanceof AmqpStamp && isset($amqpStamp->getAttributes()['message_id'])) { + $envelope = $envelope->with(new TransportMessageIdStamp($amqpStamp->getAttributes()['message_id'])); + } + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { $amqpStamp = AmqpStamp::createFromAmqpEnvelope(