libzmq: Problem: msg_t ownership inconsistent, with some leaks

Given that there is no msg_t destructor there are only two options for resource release, the owner (creator) of the message must free based on result code or the code that generates and/or propagates the error must handle it.

A casual look at the code left me with the impression the the intent is the owner cleans up in the case of a send failure (but the sender does in the case of a success), and the reader cleans up in the case of a read failure (but the owner does in the case of success). It will take a little time to pin it down, since there are 52 message constructions, passed through various deep call stacks. But the first path I traced let to this:

bool zmq::pipe_t::read (msg_t *msg_)
{
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
        return false;

read_message:
    if (!inpipe->read (msg_)) {
        in_active = false;
        return false;
    }

    //  If this is a credential, save a copy and receive next message.
    if (unlikely (msg_->is_credential ())) {
        const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
        credential = blob_t (data, msg_->size ());
        const int rc = msg_->close ();
        zmq_assert (rc == 0);
        goto read_message;
    }

    //  If delimiter was read, start termination process of the pipe.
    if (msg_->is_delimiter ()) {
        process_delimiter ();
        return false;
    }

    if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
        msgs_read++;

    if (lwm > 0 && msgs_read % lwm == 0)
        send_activate_write (peer, msgs_read);

    return true;
}

Notice that this section returns false with a message that has been read:

    //  If delimiter was read, start termination process of the pipe.
    if (msg_->is_delimiter ()) {
        process_delimiter ();
        return false;
    }

Popping back up the stack we are here:

int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

Notice that this read failure causes the return of a -1 (fail) code. Popping up again we are here:

int zmq::curve_server_t::receive_and_process_zap_reply ()
{
    int rc = 0;
    msg_t msg [7];  //  ZAP reply consists of 7 frames

    //  Initialize all reply frames
    for (int i = 0; i < 7; i++) {
        rc = msg [i].init ();
        errno_assert (rc == 0);
    }

    for (int i = 0; i < 7; i++) {
        rc = session->read_zap_msg (&msg [i]);
        if (rc == -1)
            break;
        if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) {
            //  Temporary support for security debugging
            ////puts ("CURVE I: ZAP handler sent incomplete reply message");
            errno = EPROTO;
            rc = -1;
            break;
        }
    }

    if (rc != 0)
        goto error;

[...]

error:
    for (int i = 0; i < 7; i++) {
        const int rc2 = msg [i].close ();
        errno_assert (rc2 == 0);
    }

    return rc;
}

So in this case a read call failure requires the owner to close the message that was reported as not read. Yet in the same code path a failure of the read in this line:

    if (!inpipe->read (msg_)) {

does not require that the message be closed by the caller. This then implies that it must always be safe to close an unclosed or a closed message, though this is unclear in zmq_msg_close documentation.

Documentation also states that is not required after any successful zmq_msg_send but is silent about whether it must be called after a failed zmq_msg_read (it appears that it must be), failed send or successful read.

In another section the behavior appears to leak on ceratin read failure:

void zmq::pgm_receiver_t::drop_subscriptions ()
{
    msg_t msg;
    msg.init ();
    while (session->pull_msg (&msg) == 0)
        msg.close ();
}
...
int zmq::session_base_t::pull_msg (msg_t *msg_)
{
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    incomplete_in = msg_->flags () & msg_t::more ? true : false;

    return 0;
}

Notice that, unlike in the previous example, the same pipe->read (msg_) failure in this case does not lead to the caller performing a cleanup. Now maybe certain types of read failures don’t require the caller to clean up and others do, so it’s ok (i.e. not a leak) for the reader to successfully read, not close the message, and yet return a failure (such as for a delimiter message). But that’s not obvious (or commented) in the code.

IMO this C-style resource management (as opposed to RAII) makes the code fragile. By that I mean it’s very hard to see errors as resource management is a side concern of a large amount of code. Are there tests designed to detect leaks in failure conditions, or only in case of success?

I did go through each code path that allocates msg_t. This one appeared to be a potential problem.

int zmq::proxy (
    class socket_base_t *frontend_,
    class socket_base_t *backend_,
    class socket_base_t *capture_,
    class socket_base_t *control_)
{
    msg_t msg;
    int rc = msg.init ();
    if (rc != 0)
        return -1;

    //  The algorithm below assumes ratio of requests and replies processed
    //  under full load to be 1:1.

    int more;
    size_t moresz;
    zmq_pollitem_t items [] = {
        { frontend_, 0, ZMQ_POLLIN, 0 },
        { backend_, 0, ZMQ_POLLIN, 0 },
        { control_, 0, ZMQ_POLLIN, 0 }
    };
    int qt_poll_items = (control_ ? 3 : 2);
    zmq_pollitem_t itemsout [] = {
        { frontend_, 0, ZMQ_POLLOUT, 0 },
        { backend_, 0, ZMQ_POLLOUT, 0 }
    };

    //  Proxy can be in these three states
    enum {
        active,
        paused,
        terminated
    } state = active;

    while (state != terminated) {
        //  Wait while there are either requests or replies to process.
        rc = zmq_poll (&items [0], qt_poll_items, -1);
        if (unlikely (rc < 0))
            return -1;

        //  Get the pollout separately because when combining this with pollin it maxes the CPU
        //  because pollout shall most of the time return directly.
        //  POLLOUT is only checked when frontend and backend sockets are not the same.
        if (frontend_ != backend_) {
            rc = zmq_poll (&itemsout [0], 2, 0);
            if (unlikely (rc < 0)) {
                return -1;
            }
        }

        //  Process a control command if any
        if (control_ && items [2].revents & ZMQ_POLLIN) {
            rc = control_->recv (&msg, 0);
            if (unlikely (rc < 0))
                return -1;

            moresz = sizeof more;
            rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
            if (unlikely (rc < 0) || more)
                return -1;

            //  Copy message to capture socket if any
            rc = capture(capture_, msg);
            if (unlikely (rc < 0))
                return -1;
[...]
        }
        //  Process a request
        if (state == active
        &&  items [0].revents & ZMQ_POLLIN
        &&  (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
            rc = forward(frontend_, backend_, capture_,msg);
            if (unlikely (rc < 0))
                return -1;
        }
        //  Process a reply
        if (state == active
        &&  frontend_ != backend_
        &&  items [1].revents & ZMQ_POLLIN
        &&  itemsout [0].revents & ZMQ_POLLOUT) {
            rc = forward(backend_, frontend_, capture_,msg);
            if (unlikely (rc < 0))
                return -1;
        }
    }
    return 0;
}

After a message is read successfully there are four conditions in which it will not be closed upon return.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 35 (35 by maintainers)

Most upvoted comments

I agree that this is fragile. I don’t think we can use ctor and dtor to solve it. You are welcome to send a pull request.