iceoryx: Possibility to block the publisher when subscriber queue is full

Brief feature description

Today our default is to use an “overflowing queue” for the subscribers. If the subscriber does not consume fast enough we start loosing samples. An option would be nice to block the publisher in this case for ensuring that no samples are lost.

Detailed information

The overflowing queue starts to drop the oldest sample in case of an overflow, so technically it behaves like a ring buffer. In many use cases this is fine as we want to have a “provide the last X samples” contract. E.g. if a subscriber is only interested in latest greatest data, they can set the queue size to 1 and we don’t waste memory chunks with samples that are not interesting for the subscriber. We often also do not want to have an interference from a subscriber back to a publisher. So if the subscriber is not fast enough to consume all samples solutions could be

  1. increase the frequency of the subscribing application if it operates in polling mode
  2. increase the queue size for the subscriber
  3. decrease the runtime for the subscribing application But there also might be use cases where it is fine to slow down the publisher to ensure that no data is lost in our system. The solution would be to block the publish() call when we detect a queue overflow until the subscriber popped samples and there is again a free slot in the queue. Sure, this has an influence on the publishing applications ans also other subscribers that are connected to this publisher. This is comparable to the DDS history QoS KeepAll. The normal behavior with our overflowing queue is comparable to the DDS history QoS KeepLastX

ToDo

When implemented implement the following integration tests:

  • Modified icedelivery where subscriber acquires no samples until publisher blocks. When publisher blocks press CTRL-c (for both sides)
  • Unsubscriber subscriber when publisher is in blocking push.
  • Destroy subscriber object when publisher is in blocking push.
  • Subscribe new subscriber when publisher is in blocking push.
  • Unsubscribe different subscriber when publisher is in blocking push with another subscriber.
  • Optimization in ChunkDistributor https://github.com/eclipse-iceoryx/iceoryx/pull/663#discussion_r606655415
  • Fix TriggerQueue https://github.com/eclipse-iceoryx/iceoryx/pull/663#discussion_r606653889
  • Ctrl+C on an application with an publisher blocked by a slow subscriber doesn’t shut down when a signal handler is installed; this is due to the while (!remainingQueues.empty()) in ChunkDistributor::deliverToAllStoredQueues which is not stopped when SIG_TERM has a custom signal handler
    • a Runtime::unblockShutdown could be implemented
    • this could call stop offer on all the publisher
    • it must be carefully checked what the stop offer call does since only a limited number of functions are allowed to be called in the signal handler (https://man7.org/linux/man-pages/man7/signal-safety.7.html)
  • An application with a blocked publisher slows down the RouDi shutdown due to the 45s processKillDelay in RouDi::shutdown method
    • after m_prcMgr->requestShutdownOfAllProcesses(); RouDi has to make all the publisher stop offering so that the discovery loop can remove the subscriber queues from the ChunkDistributor of the publisher

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 17 (16 by maintainers)

Commits related to this issue

Most upvoted comments

I would suggest the following approach.

  1. adjust the trigger queue
    • add a template parameter to select the queue type (SoFi, FiFo etc.)
    • add a template policy to select the waiting strategy (e.g. semaphore, busy wait, condition variable)
    • implement the easiest policy for the trigger queue (more sophisticated waiting policies can come later)
    • integrate the trigger queue into the variant queue
    • adjust the trigger queue interface (add blockingPush, timedPush, tryPush as pendant to blockingPop, timedPop, tryPop
  2. use the trigger queue flavor in subscriber when option is set to I_WANT_IT_ALL
  3. use blocking push in publisher when option is not DONT_STOP_ME_NOW or is set to THE_SHOW_MUST_GO_ON
  4. Think about the fact that Freddy Mercury seems to know more about lock free programming then we do?!

@budrus with an impossible task

@elBoberido @mossmaurice. I would also vote for option 1. I fear that ugly things could happen if we do another bookkeeping. having a runtime.shutdown() that sends a command over UDS and ends up in doing the things on RouDi side you made to solve the challenge their is maybe the best for now. So we have a bit of reuse. Setting something in an individual publisher to release it feels even more ugly and more ideas I do not have

@budrus regarding the issue with stopping an application with a blocked publisher. I think there are two options

* option 1
  
  * use a flag in the runtime set by the signal handler
  * use that flag in the keep alive thread to send a IPC message to RouDi
  * RouDi disconnects the publisher ports

I think I prefer option 1 as we should avoid to duplicate the publisher list. However, I have mixed feelings about this topic. It feels very hacky in a way. Would it be possible to only do this fix it on the release_1.0 branch and solve it on master altogether in #611 with our new concept for object creation in shared memory?

Yes, it’s kind of hacky but I wouldn’t do it in only in the release_1.0 branch in order to keep the branches in sync for as long as possible and also to not keep this regression in master.

This is the make it work -> make it beautiful -> make it fast cycle 😉

@budrus with an impossible task

@ithier at least you realized that I got the hardest job