enqueue-dev: Sending message problem with replyNeed argument through Amqp and Redis transports

Suppose we have two application that have installed php-enqueue. When I try to send a message from one to other application I have to run a send action twice. The steps what I did to reproduce the problem:

  1. Run ./app/console enqueue:consume --setup-broker -vvv command for every application
  2. Run sending action for the first time and look at console of application that was used to send a message. It returned a description of sent message, but there is nothing in console of application that must receive the message and send the answer. I havent got reply message, but got hanging request in my browser or console. After timeout browser or console writes “Rpc call timeout is reached without receiving a reply message”
  3. Run the same action second time and get nothing in first application console (where I got a message description at the first time), but get message description in second application console (where I can see tha message was recieved and answer was sent). And eventualy I get result in thr browser or console. Important! I this step, after sending request second time I get message, that was sent in previous step,

Next I’ll show all configurations, producers and processors of both applications. I used Enqueue Symfony bundle for my applications with next configs of both applications

enqueue:
    transport:
        default: "amqp"
        amqp:
            driver: ext
            host: "%env(RABBIT_HOST)%"
            port: "%env(RABBIT_PORT)%"
            user: "%env(RABBIT_USER)%"
            pass: "%env(RABBIT_PASSWORD)%"
            vhost: "%env(RABBIT_DEFAULT_VHOST)%"
            receive_method: basic_consume
    client:
        app_name: "message-bus"

Producer in sending application is

<?php
namespace AcmeBundle\Service\Producer;

use Enqueue\Client\Message;

class RequestProducer {
    /** @var Producer */
    protected $producer;

    /**
     * RequestProducer constructor.
     * @param ProducerInterface $producer
     */
    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    /**
     * Sent request and get response from a service
     *
     * @param $content
     * @return object
     */
    public function send($content)
    {
        $message = new Message($content);
        $response = $this->producer->sendCommand('currentAppName', $message, true)->receive(5000)->getBody();

        return $response;
    }
}

Processor in receiving application

<?php
namespace AcmeBundle\Service\Processor;

use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Consumption\Result;
use Enqueue\Client\CommandSubscriberInterface;

class RequestProcessor implements PsrProcessor, CommandSubscriberInterface
{
    /**
     * {@inheritdoc}
     */
    public function process(PsrMessage $message, PsrContext $context)
    {
        return Result::reply(
            $context->createMessage(
                'This is an answer'
            )
        );
    }

    /**
     * {@inheritdoc}
     */
    public static function getSubscribedCommand()
    {
        return [
            'processorName' => 'currentAppName',
            'queueName' =>  'currentAppName',
            'queueNameHardcoded' => true,
            'exclusive' => true,
        ];
    }
}

When I put both processor and producer in one application they work perfect without wasting time and all request get response. If I put they in different applications I’ll get situation described above

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 23 (14 by maintainers)

Most upvoted comments

@makasim Thanks! I’ll try it and, I promise, I’ll write a report 😃