python-binance: Queue overflow error, change MAX_QUEUE_SIZE value

My code:

# callback
def process_message_price(msg):
   print(msg)
# websocket
bm = ThreadedWebsocketManager()
bm.start()
# listOfPairings: all pairs with USDT (over 250 items in list)
for pairing in listOfPairings:
     conn_key = bm.start_trade_socket(callback=process_message_price,symbol=pairing)

bm.join()

hovewer after shor time, i am getting following error: ‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’

which is caused by MAX_QUEUE_SIZE in streams.py being too small for my program

How can i change this value outside of streams.py file ?

Thx

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 1
  • Comments: 21

Most upvoted comments

I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.

What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:

async def run_listener():
    while True:
        await socket.__aenter__()
        msg = await socket.recv()
        await socket.__aexit__(None, None, None)
        try:
            frame = create_frame(msg)
            frame.to_sql('BTCUSDT', engine, if_exists='append', index=False)
            print(frame)
        except:
            print(f'Error: {msg["m"]}')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_listener())

Its the line await socket.__aexit__(None, None, None) that sorted out the issue for me

soo… is there a way, how to not get this error ? what should i change ?

from threading import Thread

class Stream():
        
    def start(self):
        self.bm = ThreadedWebsocketManager()
        self.bm.start()
        self.stream_error = False
        self.multiplex_list = list()
            
        # listOfPairings: all pairs with USDT (over 250 items in list)
        for pairing in listOfPairings:
            self.multiplex_list.append(pairing.lower() + '@trade')
        self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
        
        # monitoring the error
        stop_trades = threading.Thread(target = stream.restart_stream, daemon = True)
        stop_trades.start()
        
    def realtime(self, msg):
        if 'data' in msg:
            # Your code
        else:
            self.stream_error = True
        
    def restart_stream(self):
        while True:
            time.sleep(1)
            if self.stream_error == True:
                self.bm.stop_socket(self.multiplex)
                time.sleep(5)
                self.stream_error = False
                self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)

stream = Stream()
stream.start()
stream.bm.join()

And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)

Calling the restart of the stream from except did not work for me either, only in the function as a separate thread

Hello,

FYI, I have test this solution and it is not really working, just the part below have no “else” statement in the v1.0.13:

if res and self._queue.qsize() < 100:
                await self._queue.put(res)

but the queue is still overflow and we still lose some messages,

My solution is: I increase the limit to 10000 and I have change in the threaded_stream.py the wait_for from 3 to 7:

while self._socket_running[path]:
                try:
                    msg = await asyncio.wait_for(s.recv(), 7)

I have test for 4 hours straight and I had the “else” statement to log when the queue is overflow, and it never happen again. everything looks good for me

Best regards

What fixed this for me was i used streams.py script from v1.0.13

could you please specify what you mean by this? Sharing a code snippet would be highly appreciated

You have to download the python-binance package from source and use it in your code https://github.com/sammchardy/python-binance/tree/master/binance

then replace the streams.py with an outdated version (v1.0.13) of the file from source https://github.com/sammchardy/python-binance/tree/v1.0.13/binance

I think there is still some issue/bug in ThreadedWebsocketManager but I did not have the time yet to investigate. I also assumed first that the issue is my callback being too slow, so I’ve implemented a reconnect/reset whenever the error appears, however it only appears once at the very beginning, and afterwards it runs fine without reconnect/reset.

Maybe I’m wrong, so if somebody has the knowledge and time please have a look at my solution:

#!/usr/bin/env python3.9

import logging
from time import sleep
from binance import ThreadedWebsocketManager

CH = logging.StreamHandler()
CH.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(message)s'))

SYMBOLS = ['XRP/BNB', 'ETH/BTC', 'LTC/BTC', 'XRP/BUSD', 
            'XRP/BTC', 'LTC/USDT', 'BNB/BUSD', 'XRP/USDT', 
            'ETH/USDT', 'LTC/BNB', 'LTC/BUSD', 'BNB/USDT', 
            'BTC/USDT', 'BTC/BUSD', 'BNB/BTC', 'ETH/BUSD']

class QueueManager():
    _log = logging.getLogger(__name__)
    _log.setLevel(logging.WARNING)
    _log.addHandler(CH)

    def __init__(self, symbols:list=[]) -> None:
        self._twm = ThreadedWebsocketManager()
        self._streams = list(map(lambda s: f"{s.replace('/','').lower()}@bookTicker", symbols))
        self._twm.start()
        self._log.warning(f"Start listening to {len(self._streams)} streams")
        self._listener:str = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)

    def _handle_socket_message(self, message):
        if ('e' in message):
          if (message['m']=='Queue overflow. Message not filled'):
            self._log.warning("Socket queue full. Resetting connection.")
            self.reset_socket()
            return
          else:
            self._log.error(f"Stream error: {message['m']}")
            exit(1)

        (u, s, b, B, a, A) = message['data'].values()
        # do something with the message
        self._log.debug(f"{s}: buy - {b}, bid - {a}")
        
        if False:
          # in case your internal logic invalidates the items in the queue 
          # (e.g. your business logic ran too long and items in queue became "too old")
          reset_socket()

    def reset_socket(self):
        self._twm.stop_socket(self._listener)
        self._listener = self._twm.start_multiplex_socket(callback=self._handle_socket_message, streams=self._streams)
        if (self._log.isEnabledFor(logging.DEBUG)):
          self._log.debug("Reconnecting. Waiting for 5 seconds...")
          sleep(5)

    def join(self):
        self._twm.join()

def main():
    manager = QueueManager(SYMBOLS)
    manager.join()

if __name__ == "__main__":
    main()

This issue is a feature rather than a bug(Preventing from memory leak). It only means the msg handler can not consume the msg as fast as the receiver. Memory usage will increase unlimitiedly without the MAX_QUEUE_SIZE. The solution other comments has proposed went the wrong way.

And the solution is obvious, just imporve the efficiency of the msg handler, or split pairs into multiple processes.

Saved my day!! Thanks.

But why are we getting this error? Is it because of the asynchronous function we used?

I tried to directly use the websocket, seems good for me. ws = websocket.WebSocketApp(f"wss://fstream.binance.com/ws/!bookTicker", on_message=on_message, on_error=on_error) ws.run_forever()

Even if you make MAX_QUEUE_SIZE equal to 10000 (or more, it doesn’t matter), in the end you will still get this error or the stream stops working