autokeras: Got 'NotImplementedError' on macOS

Bug Description

Traceback (most recent call last): File "test.py", line 29, in <module> clf.fit(x_train, y_train, time_limit=60 * 60) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/image/image_supervised.py", line 114, in fit super().fit(x, y, time_limit) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/supervised.py", line 129, in fit self.cnn.fit(self.get_n_output_node(), x_train.shape, train_data, test_data, time_limit) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/net_module.py", line 65, in fit self.searcher.search(train_data, test_data, int(time_remain)) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/search.py", line 200, in search generated_other_info, generated_graph = self.generate(remaining_time, q) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/search.py", line 251, in generate remaining_time, multiprocessing_queue) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/autokeras/bayesian.py", line 350, in generate if multiprocessing_queue.qsize() != 0: File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 117, in qsize return self._maxsize - self._sem._semlock._get_value() NotImplementedError

Reproducing Steps

Just run ‘Data with numpy array (.npy) format.’ example

Setup Details

Include the details about the versions of:

  • OS type and version: macOS 10.14.2
  • Python: 3.6

Additional context

seems this is causing the issue: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.qsize

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 22 (7 by maintainers)

Commits related to this issue

Most upvoted comments

I’m getting this error on Big Sur, python3.8 when calling multiprocessing.queues.Queue.qsize(), why is the issue closed? Something feels weird about this issue that no one tried to fix. Even in the queues.py module, there’s a comment marking the line as problematic and yet nothing seems to change over time.

def qsize(self):
    # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
    return self._maxsize - self._sem._semlock._get_value()

Since this is an old problem, I’ve found a posible solution that other project (lemon) has taken: to subclass the Queue class in order to make it portable. This is the code (comes from https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f):

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value


class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        super(Queue, self).__init__(*args, **kwargs)
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(Queue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)
        return super(Queue, self).get(*args, **kwargs)

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self.size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

@nanshihui It is multiprocessing.queues, not multiprocessing.Queues. Just tried on Python3.7 and Python2.7.

EDIT: snap NO!!! It works if you try to import it in the Python shell, but referencing it directly like the code given doesn’t work.

EDIT: do this (Python2)

import multiprocessing
from multiprocessing.queues import Queue as mp_queue

class Queue(mp_queue):
  # ...

EDIT: in Python3, this code has a problem:

  File "...[redacted].../shared_queue.py", line 53, in __init__
    super(Queue, self).__init__(*args, **kwargs)
TypeError: __init__() missing 1 required keyword-only argument: 'ctx'

Solution: https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue