google-cloud-python: PubSub: Errno 24: too many open files with multiple publishers

Ubuntu 16.04 Python version 3.5.2 google-cloud-pubsub==0.35.4 google-auth-oauthlib==0.2.0

I’m using multiprocessing + pubsub and after X amount of time get the stack trace shown below. Even stepping down to just 1 process causes it to fail. When I pull the publisher class out of the publish message function, the error stops being thrown, but the throughput plummets. My inclination is that publisher doesn’t properly close its grpc channels and so each new publisher created stacks until failure. There doesn’t seem to be a good way to pass a channel into the publisher to then manually close. Is there a way to make sure publishers are killed cleanly?

ERROR:root:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f304d6eb780>" raised exception!
Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 336, in ssl_wrap_socket
OSError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 343, in _make_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 849, in _validate_conn
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connection.py", line 356, in connect
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 338, in ssl_wrap_socket
urllib3.exceptions.SSLError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 445, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 638, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/retry.py", line 398, in increment
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 120, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 512, in request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 622, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 511, in send
requests.exceptions.SSLError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/grpc/_plugin_wrapping.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 65, in _get_authorization_headers
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/credentials.py", line 122, in before_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/service_account.py", line 322, in refresh
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 145, in jwt_grant
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 106, in _token_endpoint_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 124, in __call__
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

Code snippet:

import json
from google.cloud import pubsub_v1
import google.auth
import time
from multiprocessing import Process, Queue
import os


os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ryan/Downloads/Multicoin Alpha-0b2b4e33e22d.json"
creds, project = google.auth.default()

def publish_messages(topic_name, messages, project="elegant-device-154517"):
    """Publishes multiple messages to a Pub/Sub topic. Messages as a list"""


    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1e7,  # ten megabytes (pub/sub max)
        max_latency=0.33,  # in seconds
        max_messages=1000
    )
    publisher_defined = False
    while not publisher_defined:
        try:
            publisher = pubsub_v1.PublisherClient(batch_settings, credentials=creds)
            publisher_defined = True
        except:
            time.sleep(0.5)

    topic_path = publisher.topic_path(project, topic_name)

    def publish(data):
        return publisher.publish(topic_path, data=data)

    encoded_messages = []
    for m in messages:
        data = json.dumps(m)
        # Data must be a bytestring
        data = data.encode('utf-8')
        encoded_messages.append(data)

    results = []
    for m in encoded_messages:
        results.append(publish(m))

    return results


class Example(object):
    def __init__(self, num):
        self.data = [{"test":"test1"} for n in range(1000)]
        self.messages = []
        self.successful_messages = []
        self.failed_messages = []

    def push_data(self):
        self.messages.extend(publish_messages("topic_name", self.data))

    def check_messages(self):
        for m in self.messages:
            if m.done():
                try:
                    self.successful_messages.append(int(m.result()))
                    self.messages.remove(m)
                except Exception:
                    self.failed_messages.append(m)
                    self.messages.remove(m)
        if len(self.messages) > 0:
            return False
        else:
            return True

    def push_and_check(self):
        self.push_data()
        total_messages = 0
        total_errors = 0
        complete = False
        while not complete: # while the messages have not all been verified to be sent correctly
            while not self.check_messages(): # while the messages haven't finished attempting to push
                time.sleep(.1)
            total_messages += len(self.failed_messages) + len(self.successful_messages)
            if len(self.failed_messages) == 0:
                complete = True
            else:
                total_errors += len(self.failed_messages)
                self.messages = []
                self.successful_messages = []
                self.failed_messages = []
                self.push_data()
        return total_messages

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = job(func, args)
        output.put(result)
def job(func, args):
    result = func(*args)
    return result


import_queue, done_queue = Queue(), Queue()

test_num = 100000
print("building queue")
for num in range(test_num):
    import_queue.put((Example(num).push_and_check, []))

numprocesses = 5
print("starting processes")
p_count = 0
for i in range(numprocesses):
    p_count += 1
    Process(target=worker, args=(import_queue, done_queue)).start()
    print("started process #", p_count)

count = 0
for i in range(test_num):
    done_queue.get()
    count += 1
    if count % 10 == 0:
        print(count)

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 4
  • Comments: 16 (7 by maintainers)

Commits related to this issue

Most upvoted comments

Note for others reading this. I encountered this issue with other clients and the solution above is useful there too. For example, ImageAnnotatorClient does not close its connections but can be forced through client.transports.channel.close() just like above.

Hmm, we no longer expose PublisherClient.channel – instead, the channel is hidden away inside the stub of the transport. I think we need to provide an explicit mechanism for shutting down the transport / channel.