client: Error DataTransferException code 65

Hello everyone,

In my project up to 1 time every 24 hours I come across this error:

[2022-05-04 22:58:49] production.ERROR: [65] Transferring data over socket failed: Sending data over the socket failed. Has it been closed? {"exception":"[object] (PhpMqtt\\Client\\Exceptions\\DataTransferException(code: 65): [65] Transferring data over socket failed: Sending data over the socket failed. Has it been closed? at /home/forge/livai.testsclaralockers.com/vendor/php-mqtt/client/src/MqttClient.php:1030)
[stacktrace]
#0 /home/forge/livai.testsclaralockers.com/vendor/php-mqtt/client/src/MqttClient.php(994): PhpMqtt\\Client\\MqttClient->writeToSocket()
#1 /home/forge/livai.testsclaralockers.com/vendor/php-mqtt/client/src/MqttClient.php(603): PhpMqtt\\Client\\MqttClient->ping()
#2 /home/forge/livai.testsclaralockers.com/app/Services/Mqtt/Mqtt.php(66): PhpMqtt\\Client\\MqttClient->loop()
#3 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Support/Facades/Facade.php(261): App\\Services\\Mqtt\\Mqtt->loop()
#4 /home/forge/livai.testsclaralockers.com/routes/console.php(36): Illuminate\\Support\\Facades\\Facade::__callStatic()
#5 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): Illuminate\\Foundation\\Console\\ClosureCommand->{closure}()
#6 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Container/Util.php(40): Illuminate\\Container\\BoundMethod::Illuminate\\Container\\{closure}()
#7 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(81): Illuminate\\Container\\Util::unwrapIfClosure()
#8 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): Illuminate\\Container\\BoundMethod::callBoundMethod()
#9 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): Illuminate\\Container\\BoundMethod::call()
#10 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Foundation/Console/ClosureCommand.php(55): Illuminate\\Container\\Container->call()
#11 /home/forge/livai.testsclaralockers.com/vendor/symfony/console/Command/Command.php(299): Illuminate\\Foundation\\Console\\ClosureCommand->execute()
#12 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): Symfony\\Component\\Console\\Command\\Command->run()
#13 /home/forge/livai.testsclaralockers.com/vendor/symfony/console/Application.php(978): Illuminate\\Console\\Command->run()
#14 /home/forge/livai.testsclaralockers.com/vendor/symfony/console/Application.php(295): Symfony\\Component\\Console\\Application->doRunCommand()
#15 /home/forge/livai.testsclaralockers.com/vendor/symfony/console/Application.php(167): Symfony\\Component\\Console\\Application->doRun()
#16 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Console/Application.php(92): Symfony\\Component\\Console\\Application->run()
#17 /home/forge/livai.testsclaralockers.com/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): Illuminate\\Console\\Application->run()
#18 /home/forge/livai.testsclaralockers.com/artisan(37): Illuminate\\Foundation\\Console\\Kernel->handle()
#19 {main}
"} 

Normally the connection has no reason to be cut. My mqtt broker is an IOT scaleway and the code is on a digital ocean server. I’m starting to run out of ideas, do you know where this can come from ?

console.php

include_once "toolsConsole.php";

Artisan::command('listen', function () {
    $subscriptionOpen = [];

    MqttListen::subscribe('livai/listen', function (string $topic, string $message) use (&$subscriptionOpen) {
        if ($message === 'restart') {
            foreach ($subscriptionOpen as $subscription) {
                MqttListen::unsubscribe($subscription);
            }
            $subscriptionOpen = subscribe();
        }
    });

    $subscriptionOpen = subscribe();

    MqttListen::loop();
});

toolsConsole.php

const CHANNELS = [
    '/locker/status',
    '/locker/status/module',
    '/locker/number/module',
    '/box/unlock',
    '/box/status'
];

function subscribe(): array
{
    $subscriptionOpen = [];

    foreach (Locker::all() as $locker) {
        if ($locker->mqttClientId) {
            foreach (CHANNELS as $channel) {
                MqttListen::subscribe('lockers/' . $locker->mqttClientId . '/return' . $channel, function (string $topic, string $message) {
                    $message = json_decode($message, true) ?: [];
                    if (!isset($message['requestId']) || !$request = Request::get($message['requestId'])) {
                        Log::error('Listen - No requestId or not found', [
                            'Class' => 'routes/console.php',
                            'Code' => 0,
                            'Time' => Carbon::now()->toISOString()
                        ]);
                    } else {
                        $request->update([
                            'receiveChannel' => $topic,
                            'return' => json_encode($message)
                        ]);
                    }
                });

                $subscriptionOpen[] = 'lockers/' . $locker->mqttClientId . '/return' . $channel;
            }

            MqttListen::subscribe('lockers/' . $locker->mqttClientId . '/webhook', function (string $topic, string $message) use ($locker) {
                foreach ($locker->consumers as $consumer) {
                    dispatch(new Webhook($consumer, [
                        'lockerId' => $locker->id,
                        'type' => 'LOCK',
                        'body' => json_decode($message, true)['body']
                    ]));
                }
            });

            MqttListen::subscribe('lockers/' . $locker->mqttClientId . '/log', function (string $topic, string $message) {
                $message = json_decode($message, true) ?: [];
                if (!isset($message['requestId']) || !$request = Request::get($message['requestId'])) {
                    Log::error('Listen - No requestId or not found', [
                        'Class' => 'routes/console.php',
                        'Code' => 0,
                        'Time' => Carbon::now()->toISOString()
                    ]);
                } else {
                    $request->update([
                        'log' => json_encode($message)
                    ]);
                }
            });

            $subscriptionOpen[] = 'lockers/' . $locker->mqttClientId . '/log';
        }
    }

    return $subscriptionOpen;
}

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 17

Most upvoted comments

Are you publishing within the callback of a subscription (using QoS > 0)? If so, you don’t have to (and also shouldn’t) start a second loop since other messages received for subscriptions will be delivered in the same context, potentially causing an infinite depth of callbacks through recursion which can cause a stack overflow and memory exhaustion.

Otherwise, the code looks fine. Only thing I noticed: if you are using it within Laravel, you should not access env() directly but through config() to avoid issues when caching the config.

Ok, thank you, I’ll take it into account 😉

@domainfun Thank you for your feedback, I will look on my side if I have this error in the future depending on the stability of my connection. I agree, a method allowing a new connection would be a good improvement.

I’ve given it a try, although it is not up to my standard regarding test coverage yet. Feel free to test it out and give feedback though. I’m also open to alternative implementations if you have one in mind!

Thank you for your responsiveness! It’s really cool that the project is well preserved!

I’ve given it a try, although it is not up to my standard regarding test coverage yet. Feel free to test it out and give feedback though. I’m also open to alternative implementations if you have one in mind!