enqueue-dev: Sending message problem with replyNeed argument through Amqp and Redis transports
Suppose we have two application that have installed php-enqueue. When I try to send a message from one to other application I have to run a send action twice. The steps what I did to reproduce the problem:
- Run ./app/console enqueue:consume --setup-broker -vvv command for every application
- Run sending action for the first time and look at console of application that was used to send a message. It returned a description of sent message, but there is nothing in console of application that must receive the message and send the answer. I havent got reply message, but got hanging request in my browser or console. After timeout browser or console writes “Rpc call timeout is reached without receiving a reply message”
- Run the same action second time and get nothing in first application console (where I got a message description at the first time), but get message description in second application console (where I can see tha message was recieved and answer was sent). And eventualy I get result in thr browser or console. Important! I this step, after sending request second time I get message, that was sent in previous step,
Next I’ll show all configurations, producers and processors of both applications. I used Enqueue Symfony bundle for my applications with next configs of both applications
enqueue:
transport:
default: "amqp"
amqp:
driver: ext
host: "%env(RABBIT_HOST)%"
port: "%env(RABBIT_PORT)%"
user: "%env(RABBIT_USER)%"
pass: "%env(RABBIT_PASSWORD)%"
vhost: "%env(RABBIT_DEFAULT_VHOST)%"
receive_method: basic_consume
client:
app_name: "message-bus"
Producer in sending application is
<?php
namespace AcmeBundle\Service\Producer;
use Enqueue\Client\Message;
class RequestProducer {
/** @var Producer */
protected $producer;
/**
* RequestProducer constructor.
* @param ProducerInterface $producer
*/
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
/**
* Sent request and get response from a service
*
* @param $content
* @return object
*/
public function send($content)
{
$message = new Message($content);
$response = $this->producer->sendCommand('currentAppName', $message, true)->receive(5000)->getBody();
return $response;
}
}
Processor in receiving application
<?php
namespace AcmeBundle\Service\Processor;
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Consumption\Result;
use Enqueue\Client\CommandSubscriberInterface;
class RequestProcessor implements PsrProcessor, CommandSubscriberInterface
{
/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
{
return Result::reply(
$context->createMessage(
'This is an answer'
)
);
}
/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
{
return [
'processorName' => 'currentAppName',
'queueName' => 'currentAppName',
'queueNameHardcoded' => true,
'exclusive' => true,
];
}
}
When I put both processor and producer in one application they work perfect without wasting time and all request get response. If I put they in different applications I’ll get situation described above
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 23 (14 by maintainers)
@makasim Thanks! I’ll try it and, I promise, I’ll write a report 😃