tornado: getting many HTTP 599 errors for valid urls

I’m using tornado AsyncHTTPClient with the following code, I basically call the scrape function with a url generator list that contains 10K urls. I expect to have maximum 50 concurrent requests at any time, which doesn’t seem to work as the entire process ends in about 2 minutes.

I got ~200 valid responses and ~9000 HTTP 599 error. I checked many urls that threw this error and they do load in less than 10 sec’, I’m able to reach most urls using urllib2/requests with a smaller timeout (5 seconds).

All requests sent to different servers, running from ubuntu with python 2.7.3 & tornado version = “4.1”.

I suspect that something is wrong as I can fetch most urls using other (blocking) libraries.

import tornado.ioloop
import tornado.httpclient

class Fetcher(object):
    def __init__(self, ioloop):
        self.ioloop = ioloop
        self.client = tornado.httpclient.AsyncHTTPClient(io_loop=ioloop, max_clients=50)
        self.client.configure(None, defaults=dict(user_agent="Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.101 Safari/537.36",
                                                  connect_timeout=20,request_timeout=20, validate_cert=False))

    def fetch(self, url):
        self.client.fetch(url, self.handle_response)

    @property
    def active(self):
        """True if there are active fetching happening"""
        return len(self.client.active) != 0

    def handle_response(self, response):
        if response.error:
            print "Error: %s, time: %s, url: %s" % (response.error, response.time_info, response.effective_url)
        else:
           # print "clients %s" % self.client.active
            print "Got %d bytes" % (len(response.body))

        if not self.active:
            self.ioloop.stop()

def scrape(urls):
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(scrapeEverything, *urls)
    ioloop.start()

def scrapeEverything(*urls):
    fetcher = Fetcher(tornado.ioloop.IOLoop.instance())

    for url in urls:
        fetcher.fetch(url)

if __name__ == '__main__':
scrape()

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 20 (3 by maintainers)

Most upvoted comments

I just tested it and it works great. Here’s @Dalloriam example, within my full working example for posterity. I’ve moved over to this variation because I like self-contained classes. 😉

import collections
import tornado.httpclient


class BacklogClient(object):
    MAX_CONCURRENT_REQUESTS = 20

    def __init__(self, ioloop):
        self.ioloop = ioloop
        self.client = tornado.httpclient.AsyncHTTPClient(max_clients=self.MAX_CONCURRENT_REQUESTS)
        self.client.configure(None, defaults=dict(connect_timeout=20, request_timeout=30))
        self.backlog = collections.deque()
        self.concurrent_requests = 0

    def __get_callback(self, function):
        def wrapped(*args, **kwargs):
            self.concurrent_requests -= 1
            self.try_run_request()
            return function(*args, **kwargs)

        return wrapped

    def try_run_request(self):
        while self.backlog and self.concurrent_requests < self.MAX_CONCURRENT_REQUESTS:
            request, callback = self.backlog.popleft()
            self.client.fetch(request, callback=callback)
            self.concurrent_requests += 1

    def fetch(self, request, callback=None):
        wrapped = self.__get_callback(callback)

        self.backlog.append((request, wrapped))
        self.try_run_request()


import time
from tornado import ioloop, httpclient


class TornadoBacklog:
    def __init__(self):

        self.queue = 0
        self.debug = 1
        self.toProcess = [
            'http://google.com',
            'http://yahoo.com'
        ]

    def handle_request(self, response):

        # DO YOUR STUFF HERE

        if not self.backlog.backlog and self.backlog.concurrent_requests == 0:
            ioloop.IOLoop.instance().stop()

            # END STUFF, SUM UP EVERYTHING, FINAL THINGS FOR YOU

    def launch(self):

        self.ioloop = ioloop.IOLoop.current()
        self.backlog = BacklogClient(self.ioloop)

        for item in self.toProcess:
            self.backlog.fetch(
                httpclient.HTTPRequest(
                    item,
                    method='GET',
                    headers=None,
                ),
                self.handle_request
            )

        self.ioloop.start()


def main():
    start_time = time.time()

    scraper = TornadoBacklog()
    scraper.launch()

    elapsed_time = time.time() - start_time
    print('Process took %f seconds processed %d items.' % (elapsed_time, len(scraper.toProcess)))


if __name__ == "__main__":
    main()

@Dalloriam like the idea, though I implemented the flush version. Care to finish yours off to get the run_request() in there to remove duplication?

For those interested in the flush alternative, here’s a full working example:

import collections
import tornado.httpclient

class BacklogClient(object):
    MAX_CONCURRENT_REQUESTS = 20

    def __init__(self, ioloop):
        self.ioloop = ioloop
        self.client = tornado.httpclient.AsyncHTTPClient(max_clients=self.MAX_CONCURRENT_REQUESTS)
        self.client.configure(None, defaults=dict(connect_timeout=20, request_timeout=30))
        self.backlog = collections.deque()
        self.concurrent_requests = 0
        self.callback = {}

    def __get_callback(self, function):
        def wrapped(*args, **kwargs):
            self.concurrent_requests -= 1
            return function(*args, **kwargs)

        return wrapped

    def flush(self):
        while self.backlog and self.concurrent_requests < self.MAX_CONCURRENT_REQUESTS:
            request, callback = self.backlog.popleft()
            self.client.fetch(request, callback=callback)
            self.concurrent_requests += 1

    def fetch(self, request, callback=None):
        wrapped = self.__get_callback(callback)
        self.backlog.append((request, wrapped))
        self.flush()



import time
from tornado import ioloop, httpclient


class TornadoBacklog:

    def __init__(self):

        self.queue = 0
        self.debug = 1
        self.toProcess = [
            'http://google.com',
            'http://yahoo.com'
        ]

    def handle_request(self, response):

        # DO YOUR STUFF HERE

        if self.backlog.backlog and self.backlog.concurrent_requests > 0:
            self.backlog.flush()
        else:
            ioloop.IOLoop.instance().stop()

            # END STUFF, SUM UP EVERYTHING, FINAL THINGS FOR YOU

    def launch(self):

        self.ioloop = ioloop.IOLoop.current()
        self.backlog = BacklogClient(self.ioloop)

        for item in self.toProcess:
            self.backlog.fetch(
                httpclient.HTTPRequest(
                    item,
                    method='GET',
                    headers=None,
                ),
                self.handle_request
            )

        self.ioloop.start()


def main():
    start_time = time.time()

    scraper = TornadoBacklog()
    scraper.launch()

    elapsed_time = time.time() - start_time
    print('Process took %f seconds processed %d items.' % (elapsed_time, len(scraper.toProcess)))


if __name__ == "__main__":
    main()

As @akellehe pointed out to me, using the flush method allows for async callbacks making sure you’re never bound by a single response, but always filling the queue to process at the maximum capacity. Though I am very interested in the solution @Dalloriam has.

@akellehe I just stumbled on your BacklogClient implementation (great idea by the way) and for the sake of completeness, I suggest daisy-chaining requests when the callback is executed instead of implementing a flush() method, as this guarantees the queue will be emptied. Something like this:

class BacklogClient(object):
  MAX_CONCURRENT_REQUESTS = 20

  def __init__(self, ioloop):
    self.ioloop = ioloop
    self.client = httpclient.AsyncHTTPClient(max_clients=self.MAX_CONCURRENT_REQUESTS)
    self.client.configure(None, defaults=dict(connect_timeout=20, request_timeout=30))
    self.backlog = collections.deque()
    self.concurrent_requests = 0

  def __get_callback(self, function):
    def wrapped(*args, **kwargs):
      self.concurrent_requests -= 1
      if self.backlog:
        request, callback = self.backlog.popleft()
        self.client.fetch(request, callback=callback)
        self.concurrent_requests += 1
      return function(*args, **kwargs)
    return wrapped

  def fetch(self, request, callback=None):
    wrapped = self.__get_callback(callback)

    self.backlog.append((request, wrapped))

    while self.backlog and self.concurrent_requests < self.MAX_CONCURRENT_REQUESTS:
      request, callback = self.backlog.popleft()
      self.client.fetch(request, callback=callback)
      self.concurrent_requests += 1

You could, of course, add an additional run_request() method to get rid of the duplication between the fetch() and __get_callback() methods.

I realize this might be useful for others experiencing the same problem. This solution works by only realizing those 599s when there is a timeout on the server/network not, for example, when the client becomes CPU bound. Here’s an example:

import collections
import tornado.httpclient
​
​
class BacklogClient(object):
    MAX_CONCURRENT_REQUESTS = 20
​
    def __init__(self, ioloop):
        self.ioloop = ioloop
        self.client = tornado.httpclient.AsyncHTTPClient(max_clients=self.MAX_CONCURRENT_REQUESTS)
        self.client.configure(None, defaults=dict(connect_timeout=20, request_timeout=30))
        self.backlog = collections.deque()
        self.concurrent_requests = 0
​
    def __get_callback(self, function):
        def wrapped(*args, **kwargs):
            self.concurrent_requests -= 1
            return function(*args, **kwargs)
        return wrapped
​
    def fetch(self, request, callback=None):
        wrapped = self.__get_callback(callback)
​
        self.backlog.append((request, wrapped))
​
        while self.backlog and self.concurrent_requests < self.MAX_CONCURRENT_REQUESTS:
            request, callback = self.backlog.popleft()
            self.client.fetch(request, callback=callback)
            self.concurrent_requests += 1

Thanks, guys, this is a helpful discussion. I didn’t realize time on the client’s queue counted against the request_timeout. With that in mind I created a separate queue to manage a backlog of requests as you guys mentioned and problem solved 🚀 🚀

Updated examples with if not self.backlog.backlog and self.backlog.concurrent_requests == 0: to ensure the last thread has completed before stopping the ioloop.

@dovy Here is my implementation without flush():

class BacklogClient(object):
  MAX_CONCURRENT_REQUESTS = 20

  def __init__(self, ioloop):
    self.ioloop = ioloop
    self.client = httpclient.AsyncHTTPClient(max_clients=self.MAX_CONCURRENT_REQUESTS)
    self.client.configure(None, defaults=dict(connect_timeout=20, request_timeout=30))
    self.backlog = collections.deque()
    self.concurrent_requests = 0

  def __get_callback(self, function):
    def wrapped(*args, **kwargs):
      self.concurrent_requests -= 1
      self.try_run_request()
      return function(*args, **kwargs)
    return wrapped

  def try_run_request(self):
    while self.backlog and self.concurrent_requests < self.MAX_CONCURRENT_REQUESTS:
      request, callback = self.backlog.popleft()
      self.client.fetch(request, callback=callback)
      self.concurrent_requests += 1

  def fetch(self, request, callback=None):
    wrapped = self.__get_callback(callback)

    self.backlog.append((request, wrapped))
    self.try_run_request()

One could adapt your example by replacing

if self.backlog.backlog:
  self.backlog.flush()
else:
  ioloop.IOLoop.instance().stop()

by

if not self.backlog.backlog:
  ioloop.IOLoop.instance().stop()

(altough I haven’t tested it). Cheers!

Experimented with and used the BacklogClient with success. 👍

You’re starting all the fetches at once but telling AsyncHTTPClient to give up and return a 599 Timeout if it can’t complete the request in 20 seconds (the request_timeout option). You need to either increase request_timeout to the amount of time you’re willing to wait for the response (including time spent waiting in the queue), or maintain your own queue and feed urls into AsyncHTTPClient gradually (the queue and semaphore classes that are being introduced in the upcoming Tornado 4.2 can help here; until then you can use Toro: http://toro.readthedocs.org/en/stable/examples/web_spider_example.html)