libzmq: Problem: msg_t ownership inconsistent, with some leaks
Given that there is no msg_t destructor there are only two options for resource release, the owner (creator) of the message must free based on result code or the code that generates and/or propagates the error must handle it.
A casual look at the code left me with the impression the the intent is the owner cleans up in the case of a send failure (but the sender does in the case of a success), and the reader cleans up in the case of a read failure (but the owner does in the case of success). It will take a little time to pin it down, since there are 52 message constructions, passed through various deep call stacks. But the first path I traced let to this:
bool zmq::pipe_t::read (msg_t *msg_)
{
if (unlikely (!in_active))
return false;
if (unlikely (state != active && state != waiting_for_delimiter))
return false;
read_message:
if (!inpipe->read (msg_)) {
in_active = false;
return false;
}
// If this is a credential, save a copy and receive next message.
if (unlikely (msg_->is_credential ())) {
const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
credential = blob_t (data, msg_->size ());
const int rc = msg_->close ();
zmq_assert (rc == 0);
goto read_message;
}
// If delimiter was read, start termination process of the pipe.
if (msg_->is_delimiter ()) {
process_delimiter ();
return false;
}
if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
send_activate_write (peer, msgs_read);
return true;
}
Notice that this section returns false with a message that has been read:
// If delimiter was read, start termination process of the pipe.
if (msg_->is_delimiter ()) {
process_delimiter ();
return false;
}
Popping back up the stack we are here:
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
if (zap_pipe == NULL) {
errno = ENOTCONN;
return -1;
}
if (!zap_pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
return 0;
}
Notice that this read failure causes the return of a -1 (fail) code. Popping up again we are here:
int zmq::curve_server_t::receive_and_process_zap_reply ()
{
int rc = 0;
msg_t msg [7]; // ZAP reply consists of 7 frames
// Initialize all reply frames
for (int i = 0; i < 7; i++) {
rc = msg [i].init ();
errno_assert (rc == 0);
}
for (int i = 0; i < 7; i++) {
rc = session->read_zap_msg (&msg [i]);
if (rc == -1)
break;
if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) {
// Temporary support for security debugging
////puts ("CURVE I: ZAP handler sent incomplete reply message");
errno = EPROTO;
rc = -1;
break;
}
}
if (rc != 0)
goto error;
[...]
error:
for (int i = 0; i < 7; i++) {
const int rc2 = msg [i].close ();
errno_assert (rc2 == 0);
}
return rc;
}
So in this case a read call failure requires the owner to close the message that was reported as not read. Yet in the same code path a failure of the read in this line:
if (!inpipe->read (msg_)) {
does not require that the message be closed by the caller. This then implies that it must always be safe to close an unclosed or a closed message, though this is unclear in zmq_msg_close documentation.
Documentation also states that is not required after any successful zmq_msg_send but is silent about whether it must be called after a failed zmq_msg_read (it appears that it must be), failed send or successful read.
In another section the behavior appears to leak on ceratin read failure:
void zmq::pgm_receiver_t::drop_subscriptions ()
{
msg_t msg;
msg.init ();
while (session->pull_msg (&msg) == 0)
msg.close ();
}
...
int zmq::session_base_t::pull_msg (msg_t *msg_)
{
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
incomplete_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
Notice that, unlike in the previous example, the same pipe->read (msg_) failure in this case does not lead to the caller performing a cleanup. Now maybe certain types of read failures don’t require the caller to clean up and others do, so it’s ok (i.e. not a leak) for the reader to successfully read, not close the message, and yet return a failure (such as for a delimiter message). But that’s not obvious (or commented) in the code.
IMO this C-style resource management (as opposed to RAII) makes the code fragile. By that I mean it’s very hard to see errors as resource management is a side concern of a large amount of code. Are there tests designed to detect leaks in failure conditions, or only in case of success?
I did go through each code path that allocates msg_t. This one appeared to be a potential problem.
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
if (rc != 0)
return -1;
// The algorithm below assumes ratio of requests and replies processed
// under full load to be 1:1.
int more;
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
int qt_poll_items = (control_ ? 3 : 2);
zmq_pollitem_t itemsout [] = {
{ frontend_, 0, ZMQ_POLLOUT, 0 },
{ backend_, 0, ZMQ_POLLOUT, 0 }
};
// Proxy can be in these three states
enum {
active,
paused,
terminated
} state = active;
while (state != terminated) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0))
return -1;
// Get the pollout separately because when combining this with pollin it maxes the CPU
// because pollout shall most of the time return directly.
// POLLOUT is only checked when frontend and backend sockets are not the same.
if (frontend_ != backend_) {
rc = zmq_poll (&itemsout [0], 2, 0);
if (unlikely (rc < 0)) {
return -1;
}
}
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more)
return -1;
// Copy message to capture socket if any
rc = capture(capture_, msg);
if (unlikely (rc < 0))
return -1;
[...]
}
// Process a request
if (state == active
&& items [0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
rc = forward(frontend_, backend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
}
// Process a reply
if (state == active
&& frontend_ != backend_
&& items [1].revents & ZMQ_POLLIN
&& itemsout [0].revents & ZMQ_POLLOUT) {
rc = forward(backend_, frontend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
}
}
return 0;
}
After a message is read successfully there are four conditions in which it will not be closed upon return.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 35 (35 by maintainers)
I agree that this is fragile. I don’t think we can use ctor and dtor to solve it. You are welcome to send a pull request.