imagezmq: [PUB/SUB] Subscriber slow motion video (queue keeps growing)

Hi there,

Thanks for your work on ImageZMQ, it’s been very useful for me!

I’m using a project setup which has:

  • A server that sends processed opencv frames via ImageZMQ to a receiver
  • A pi that shows the received frames on screen

I’ve successfully implemented the REQ/REP pattern into my project and it works well. The only issue for me is that the REQ/REP pattern is blocking the server from processing as many images as it can. Because it’s waiting for the receiver for the OK reply at every frame.

This is when I started trying the PUB/SUB pattern. For the server this works great. However, when I use PUB/SUB the video plays in slow motion on the receiver. With slow motion I mean that it’s queue’ing all the frames it gets, but probably isn’t fast enough to display all the frames it gets from the server. This creates an every growing queue of images. I’ve also tried it on a stronger machine (macbook), but it’s the same result.

Any tips or ideas on how I could solve my issue? Any help is much appreciated!

Edit I’ve changed my code a bit by re-instantiating the ImageHub object every loop iteration (instead of just once before the while (true) loop), and it seems to get rid of the queue-problem. It doesn’t play in slowmotion anymore! However, I wonder if this is really the best solution; because re-instancing ImageHub every loop doesn’t seem the most efficient way?

Before (queue/latency growing):

imageHub = imagezmq.ImageHub(open_port='tcp://{}:5555'.format(args["server"]), REQ_REP=False)
while true:
     rpiName, frame = imageHub.recv_image()
     cv2.imshow("Window", frame)

After (steady latency):

while true:
     imageHub = imagezmq.ImageHub(open_port='tcp://{}:5555'.format(args["server"]), REQ_REP=False)
     rpiName, frame = imageHub.recv_image()
     cv2.imshow("Window", frame)

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 36 (20 by maintainers)

Most upvoted comments

Hello @BrandonYuen, @oggyjack, @hjinlee88, @zarar7576, @DaveXanatos, I just merged @philipp-schmidt’s example programs and documentation into the Master branch. They work well even with deliberately inserted receiver / subscriber delays. There is no growing ZMQ queue or other slowdowns. His examples use an elegant threaded class that fixes the “slow subscriber” problem discussed in this issue. His example programs are in the /examples folder. His documentation file is Advanced PUB/SUB example with multithreaded fast subscribers for realtime processing. The main README.rst has the link to the documentation file, also. I tested his example programs on multiple combinations of RPi’s and Macs with both webcams and PiCameras. They worked well in all combinations I tried. Try it out and provide updates in this thread if it fixes your problem. Or let us know if it doesn’t. Thanks, (and thanks to @philipp-schmidt!) Jeff

Hey,

put together a minimal example and docu, pull request #34.

Let me know what you think.

Best, Philipp

This should do the trick:

class VideoStreamSubscriber:

    def __init__(self, hostname, port):
        self.hostname = hostname
        self.port = port
        self._stop = False
        self._data_ready = threading.Event()
        self._thread = threading.Thread(target=self._run, args=())
        self._thread.daemon = True
        self._thread.start()

    def receive(self, timeout=15.0):
        flag = self._data_ready.wait(timeout=timeout)
        if not flag:
            raise TimeoutError(
                f"Timeout while reading from subscriber tcp://{self.hostname}:{self.port}")
        self._data_ready.clear()
        return self._data

    def _run(self):
        self.zmq_context = SerializingContext()
        self.zmq_socket = self.zmq_context.socket(zmq.SUB)
        self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
        self.zmq_socket.connect(f"tcp://{self.hostname}:{self.port}")
        while not self._stop:
            self._data = self.zmq_socket.recv_array()
            self._data_ready.set()
        self.zmq_socket.close()

    def close(self):
        self._stop = True

As @jeffbass correctly pointed out though:

I have learned that multiprocessing may be a better choice, since the RPi has 4 cores and with Python threading, only 1 core is used for all the threads.

So this might not be the ultimate solution for a pi, but maybe worth a try? The pi will still receive all frames, but only the most recent is returned in receive(). The events make sure a frame can not be read twice.

maybe ZMQ_CONFLATE ? Somewhere or somehow may solve the issue?