ccxt: How to restart a coroutine after a websocket stream stops receiving data? (1006 error code)
Hello,
I’m writing an asyncio application to monitor prices and trade/order events, but for an unknown reason some streams stop receiving data after few hours. I’m not familiar with the asyncio package and I would appreciate help in finding a solution.
Basically, the code below establishs websocket connections with Binance to listen streams of six symbols (ETH/USD, BTC/USD, BNB/USD for both spot and future) and trades events from two accounts (user1, user2). The application uses the library ccxtpro. The public method watch_ohlcv get price steams, while private methods watchMyTrades and watchOrders get new orders and trades events at account level.
The problem is that one or several streams are interrupted after few hours, and the object response get empty or None. I would like to detect and restart these streams after they stops working, how can I do that ?
# tasks.py
@app.task(bind=True, name='Start websocket loops')
def start_ws_loops(self):
ws_loops()
# methods.py
def ws_loops():
async def method_loop(client, exid, wallet, method, private, args):
exchange = Exchange.objects.get(exid=exid)
if private:
account = args['account']
else:
symbol = args['symbol']
while True:
try:
if private:
response = await getattr(client, method)()
if method == 'watchMyTrades':
do_stuff(response)
elif method == 'watchOrders':
do_stuff(response)
else:
response = await getattr(client, method)(**args) # <-------- Empty object after some times !?
if method == 'watch_ohlcv':
do_stuff(response)
# await asyncio.sleep(3)
except Exception as e:
print(str(e))
break
await client.close()
async def clients_loop(loop, dic):
exid = dic['exid']
wallet = dic['wallet']
method = dic['method']
private = dic['private']
args = dic['args']
exchange = Exchange.objects.get(exid=exid)
parameters = {'enableRateLimit': True, 'asyncio_loop': loop, 'newUpdates': True}
if private:
log.info('Initialize private instance')
account = args['account']
client = exchange.get_ccxt_client_pro(parameters, wallet=wallet, account=account)
else:
log.info('Initialize public instance')
client = exchange.get_ccxt_client_pro(parameters, wallet=wallet)
mloop = method_loop(client, exid, wallet, method, private, args)
await gather(mloop)
await client.close()
async def main(loop):
lst = []
private = ['watchMyTrades', 'watchOrders']
public = ['watch_ohlcv']
for exid in ['binance']:
for wallet in ['spot', 'future']:
# Private
for method in private:
for account in ['user1', 'user2']:
lst.append(dict(exid=exid,
wallet=wallet,
method=method,
private=True,
args=dict(account=account)
))
# Public
for method in public:
for symbol in ['ETH/USD', 'BTC/USD', 'BNB/USD']:
lst.append(dict(exid=exid,
wallet=wallet,
method=method,
private=False,
args=dict(symbol=symbol,
timeframe='5m',
limit=1
)
))
loops = [clients_loop(loop, dic) for dic in lst]
await gather(*loops)
loop = asyncio.new_event_loop()
loop.run_until_complete(main(loop))
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 15 (1 by maintainers)
Just to let you know that I had zero disconnection since I implemented the periodic ping few days ago and I assume it’s a working solution. I think this ticket can be closed now.
https://github.com/ccxt/ccxt/issues/14086#issuecomment-1172070291
I created a Celery task (see above) in my Django application and schedule it every 5 seconds with beat. It seems to solve the issue.
Hi, @carlosmiei, do you think it coul be implemented natively in ccxt ? or maybe the ping mechanism could be improved for Binance and some other exchanges ?
Hi @Kinzowa, let me share with you my experience since I am dealing with the same problem.
Unfortunately practice and theory are different and error 1006 happens quite often. I am using Binance, OKX, Bitmex and BTSE ( BTSE is not supported by CCXT) and my code runs on AWS server so I should not have any connection issue. Binance and OKX are the worst as far as error 1006 is concerned… Honestly, after researching it on google, I have only understood 1006 is a NetworkError and I know CCXT tries to resubscribe the channel automatically. All other explanations I found online did not convince me. If somebody of CCXT staff could give us more info about this error I think the community would appreciate it.
In any case, every time an exception is raised, I put it in an
exception_listas a dictionary containing info like time in mls, method, exchange, description ecc. Theexception_listis then passed to ahandle_exceptionmethod. In this case, if the list contains two 1006 exception within X timehandle_exceptionreturns we are not on sync with market data and trading must stop. I cancel all my limit order and I emit a beep ( calling human intervention).As for your second question:
remember that you are Running Tasks Concurrently¶
here you can find info about restarting individual task in a a gather()
In your case, since you are using a single exchange (Binance)and unsubscribe is not implemented, as @carlosmiei pointed out you will have to close the connection and restart all the task. You can still use the above for automating it. In case you are using more then one exchange you can design your code in a way that let you close and restart only the Exchange that failed.
Another option for you would be defining the tasks with more granularity in the main so that every task is related to a single and well defined exchange/user/method/symbol and every task subscribes a single channel. This will result in a more verbose and less elegant code but it will help you catching the exception and eventually restart only a specific coroutine.
I am obviously assuming that after error 1006 you can resubscribe a channel without previously unsubscribe it ( does error 1006 unsubscribe a channel?)
final thought:
Professional market makers with a team of engineers working in London do not go to the pub while their algos ( usually co-located within the exchange ) execute thousands of trades.
I hope this can help you and I would be happy to continue the discussion.
@Kinzowa, CCXT has an integer
keep-aliverate in milliseconds. you can see it usingThe default for OKX is 20000 mls. You can override it to 5000 so you would have the same effect as with your Celery task. Binance does not have this property. I found
'listenKeyRefreshRate': 1200000, # 20 minsbut I honestly do not know what it does. You might be in the right direction. let us know if the problem happens again. TxHello @carlosmiei, thank you for your response. Please let us know if you find anything in the code that could cause the stream to crash. On my end, I’ll try to catch an exception or something that might give a clue as to what’s going on.