parallel: Error using $output from Symfony Command

Hi, thanks for your effort and sharing this library.

I have optimized some of our Symfony CLI apps with this parallels extension. So far all works, but I can’t use the Symfony Output anymore.

Simple example for a task:

<?php

use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;

class MyTask implements Task {

  private $output;

  public function __construct($output) {
    $this->output = $output;
    $this->output->writeln("OUTPUT INSIDE CONSTRUCT WORKS");
  }

  public function run(Environment $environment) {
    echo "THIS WORKS\n";
    $this->output->writeln("OUTPUT INSIDE RUN BREAKS");
    $sleep = rand(1, 3);
    sleep($sleep); // Blocking call in thread.
    return 0
  }

}

And my Symfony Command looks like (I tried to simplify as much as possible):

<?php

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Formatter\OutputFormatterStyle;

use Amp\Loop;
use Amp\Parallel\Worker\DefaultPool;

use MyTask;

class MyCommand extends Command {

  protected function configure(){
    $this->setName("Process:Amp")
         ->setDescription("This will run some parallel processes.")
         ->addArgument('count', InputArgument::REQUIRED, 'Number of concurrent pool processes.');
  }

  protected function execute(InputInterface $input, OutputInterface $output){

    $output->writeln('START COMMAND ...');

    // We can first define tasks and then run them
    $tasks = [
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
    ];

    try {

      // get parameters
      $cnt_parallels = $input->getArgument('count');

      $pool = new DefaultPool($cnt_parallels);

      foreach ($tasks as $task) {
        $pool->enqueue($task);
      }

      // Event loop for parallel tasks
      Loop::run(function () use (&$pool) {
        return $pool->shutdown();
      });

    }
    finally {

      $output->writeln('WELL, COMMAND Finally reached!');

    }

  }

}

The above will immediately stop but not shown any exception.

If changing the loop code with Coroutines then following exception is shown and formatted via Symfony:

In TaskFailure.php line 55:

Uncaught Symfony\Component\Console\Exception\RuntimeException in worker with message “Unable to write output.” and code “0”

Thanks for some hints or solution in advance Cheers, Tom

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15 (6 by maintainers)

Most upvoted comments

Hi Niklas @kelunik

When I add

$this->output = new \Symfony\Component\Console\Output\StreamOutput(STDERR);

into function run() from MyTask it will work!

It does not work if I create that StreamOutput object outside or in __constructor()

Do you know why?

$this->stream will be emptied to int(0) when creating and setting the $output outside of the MyTask run function?