requests: Requests seems to stucks when using with futures.ThreadPoolExecutor

Hello,

I’m using Python 2.7.9 with futures (3.0.3) and requests (2.7.0) on Debian (also tested on Win8 and results are same).

The problem is, Requests doesn’t timeout and stucks, so it seems my threads never finish their jobs and stops processing queue.

I’m trying to make a multi-threaded web crawler and I’m fetching to-be-crawled URLs from frontier (which returns a json list of domains) and populating a queue with them.

After this I’m populating Thread Pool with the code below

while not url_queue.empty():
    queue_data = url_queue.get()
    task_pool.submit(processItem, queue_data)

In processItem() function, I’m fetching url with get_data() and marking the queue item with task_done()

My get_data() function is as follows

def get_data(fqdn):
    try:
         response = requests.get("http://"+fqdn, headers=headers, allow_redirects=True, timeout=3)

        if response.status_code == requests.codes.ok:
            result = response.text
        else:
            result = ""

    except requests.exceptions.RequestException as e:
        print "ERROR OCCURED:"
        print fqdn
        print e.message
        result  = ""

    return result

If I mark get_data() as comment in processItem(), all threads and queue works fine. If I uncomment it, works fine for most of requests but stucking for some and that affects all queue and script because queue.join() waits for threads to complete requests. I suppose it’s a bug of requests module as everything works fine without calling get_data() and as requests doesn’t time out the GET request.

Any help will be greatly appreciated… Thank you very much…

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Reactions: 2
  • Comments: 24 (11 by maintainers)

Most upvoted comments

@metrue to maintain a thread-safe/multiprocess-safe queue, you can use the standard library’s Queue implementation. If you’re on Python 2

import Queue

task_queue = Queue.Queue()

if you’re on Python 3

import queue

task_queue = queue.Queue()

If you are using a process pool executor you must not use a Session that is shared across those processes.

Same issue here.

Also developing a web crawler intended to process a continuous stream of URLs.

My code behaviour is something like the following:

from concurrent.futures import ThreadPoolExecutor
import logging
import random
import time

import requests

NTHREADS = 2
DELAY_SECONDS = 0.5
URLS = ['https://google.com', 'http://yahoo.com', 'http://github.com', 'https://bing.com']

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

def callback():
    response = requests.get(random.choice(URLS), timeout=120)
    logging.info('status_code=%d ok=%s', response.status_code, response.ok)

with ThreadPoolExecutor(NTHREADS) as executor:
    while True:
        time.sleep(DELAY_SECONDS) # do not hit the site too hard
        queued_works = executor._work_queue.qsize()
        logging.info('queued works: %s', queued_works)
        if queued_works < 10: # do not flood executor's queue
            executor.submit(callback)

I wasn’t able to reproduce this very same error for this small list of URLs, but on my production environment (after some time running - let’s say 2~3 hours), the log messages starts looking this:

2015-10-01 16:51:41,488 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,490 : INFO : queued works: 10
2015-10-01 16:51:41,490 : INFO : queued works: 10
2015-10-01 16:51:41,491 : INFO : queued works: 10
2015-10-01 16:51:41,491 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,493 : INFO : queued works: 10
.... (and goes like this forever - like, not even a few days it would stop)

I checked ThreadPoolExecutor’s implementation and I’m pretty convinced the problem is NOT related to it. The code just seems to get stuck on line 55:

result = self.fn(*self.args, **self.kwargs)

edit: by “the issue is not related to ThreadPoolExecutor”, I mean: it doesn’t matter if callback() raises an exception or not; it’s supposed to work just fine. The thing is that _WorkItem:run() method never stops.

edit 2: python 2.7