event-loop: Memory leak if callbacks suspend

reproduce:

  1. create script.php with the following content:
<?php

declare(strict_types=1);

use Revolt\EventLoop;

require __DIR__ . '/../../vendor/autoload.php';

$write_line = static fn(string $m, ...$args) => printf($m . "\n", ...$args);

$write_line('Server is listening on http://localhost:3030');

// Error reporting suppressed since stream_socket_server() emits an E_WARNING on failure (checked below).
$server = @stream_socket_server('tcp://localhost:3030', $errno, $_, flags: STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, context: stream_context_create([
    'socket' => [
        'ipv6_v6only' => true,
        'so_reuseaddr' => false,
        'so_reuseport' => false,
        'so_broadcast' => false,
        'tcp_nodelay' => false,
    ]
]));
if (!$server || $errno) {
    throw new RuntimeException('Failed to listen localhost 3030.', $errno);
}

$watcher = null;
EventLoop::unreference(EventLoop::onSignal(SIGINT, static function () use ($server, &$watcher) {
    EventLoop::cancel((string) $watcher);
    fclose($server);
}));

$watcher = EventLoop::onReadable($server, function ($watcher, $resource) {
    $stream = @stream_socket_accept($resource, timeout: 0.0);
    if (false === $stream) {
        EventLoop::cancel($watcher);

        return;
    }

    stream_set_read_buffer($stream, 0);
    stream_set_blocking($stream, false);
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onReadable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    $request = stream_get_contents($stream);
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "HTTP/1.1 200 OK\n");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "Server: TCP Server\n");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "Connection: close\n");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "Content-Type: text/html; charset=utf-8\n\n");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "<h3>Hello, World!</h3>");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, "<pre><code>" . htmlentities($request) . "</code></pre>");
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, sprintf('memory usage: %dMiB<br />', round(memory_get_usage() / 1024 / 1024, 1)));
    $suspension = EventLoop::createSuspension();
    $watcher = EventLoop::onWritable($stream, function () use ($suspension) {
        $suspension->resume();
    });
    $suspension->suspend();
    EventLoop::cancel($watcher);
    fwrite($stream, sprintf('peak memory usage: %dMiB<br />', round(\memory_get_peak_usage() / 1024 / 1024, 1)));
    @fclose($stream);
});

EventLoop::run();

$write_line('');
$write_line('Goodbye šŸ‘‹');
  1. run php script.php

memory keeps going up with each request ( noticeable with a request batch of 10k )

It seems suspensions are not being destructed properly and taking space in memory?

I’m not really sure what is happening, i first noticed the leak in https://github.com/azjezz/hack-php-async-io/blob/main/src/server.php, and went on simplifying the code to isolate it, until i reached this step, where no PSL code was involved, which means the leak is happening in revolt, or I’m doing something wrong.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 16 (15 by maintainers)

Most upvoted comments

The memory leak in PSL is unrelated, this issue seems to be fixed.

Thank you @trowski and @kelunik šŸŽ‰

I have the same issue on Linux/Ubuntu. if you comment out readHeaders it is not leaking. Here is my example:

<?php

use Revolt\EventLoop;

require __DIR__ . '/vendor/autoload.php';


function readHttpLine($stream): string
{
    $stop = false;
    $chars = '';
    $suspension = EventLoop::getSuspension();
    $watcher = EventLoop::onReadable($stream, fn () => $suspension->resume());
    while(!feof($stream) && !$stop) {
        do {
            $byte = fread($stream, 1);
            $chars .= $byte;
        } while($byte && $byte != "\n");
        $stop = (strlen($chars) > 1 && $chars[-1] == "\n");
        if (!$stop) {
            $suspension->suspend();
        }
    }
    EventLoop::cancel($watcher);
    return $chars;
}

function readHeaders($stream)
{
    $stop = false;
    while(!$stop) {
        $line = readHttpLine($stream);
        echo trim($line) . PHP_EOL;
        // end-of-headers is one line with \r\n
        $stop = (strlen($line) == 2 && $line[0] == "\r" && $line[1] == "\n");
    }
}

function run()
{
    // start TCP/IP server on localhost:8080
    // for illustration purposes only, should use socket abstracting instead
    $server = \stream_socket_server('tcp://0.0.0.0:8000');
    if (!$server) {
        exit(1);
    }
    \stream_set_blocking($server, false);

    echo "Visit http://localhost:8080/ in your browser." . PHP_EOL;

    // wait for incoming connections on server socket
    EventLoop::onReadable($server, function ($watcher, $server) {
        $conn = \stream_socket_accept($server);
        \stream_set_blocking($conn, false);
        readHeaders($conn);
        $data = "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 3\r\n\r\nHi\n";
        EventLoop::onWritable($conn, function ($watcher, $conn) use (&$data) {
            $written = \fwrite($conn, $data);
            if ($written === \strlen($data)) {
                \fclose($conn);
                EventLoop::cancel($watcher);
            } else {
                $data = \substr($data, $written);
            }
        });
    });

    EventLoop::repeat(2, function () {
        $memory = \memory_get_usage() / 1024;
        $formatted = \number_format($memory) . ' KiB';
        echo "Current memory usage: {$formatted}\n";
    });

    EventLoop::run();
}

run();