/*
 * call-seq:
 *    conn.wait_for_notify( [ timeout ] ) -> String
 *    conn.wait_for_notify( [ timeout ] ) { |event, pid| block }
 *
 * Blocks while waiting for notification(s), or until the optional
 * _timeout_ is reached, whichever comes first.  _timeout_ is
 * measured in seconds and can be fractional.
 *
 * Returns +nil+ if _timeout_ is reached, the name of the NOTIFY
 * event otherwise.  If used in block form, passes the name of the
 * NOTIFY +event+ and the generating +pid+ into the block.
 * 
 */
static VALUE
pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
{
    PGconn *conn = get_pgconn(self);
    PGnotify *notify;
    int sd = PQsocket(conn);
    int ret;
    struct timeval timeout;
    struct timeval *ptimeout = NULL;
    VALUE timeout_in, relname = Qnil, be_pid = Qnil;
    double timeout_sec;
    fd_set sd_rset;

        if (sd < 0)
                rb_bug("PQsocket(conn): couldn't fetch the connection's socket!");

    if (rb_scan_args(argc, argv, "01", &timeout_in) == 1) {
        timeout_sec = NUM2DBL(timeout_in);
        timeout.tv_sec = (long)timeout_sec;
        timeout.tv_usec = (long)((timeout_sec - (long)timeout_sec) * 1e6);
        ptimeout = &timeout;
    }

    FD_ZERO(&sd_rset);
    FD_SET(sd, &sd_rset);
    ret = rb_thread_select(sd+1, &sd_rset, NULL, NULL, ptimeout);
    if (ret == 0) {
                return Qnil;
        } else if (ret < 0) {
                rb_sys_fail(0);
        }

    if ( (ret = PQconsumeInput(conn)) != 1 ) {
                rb_raise(rb_ePGError, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn));
        }

    while ((notify = PQnotifies(conn)) != NULL) {
        relname = rb_tainted_str_new2(notify->relname);
        be_pid = INT2NUM(notify->be_pid);
        PQfreemem(notify);
    }

    if (rb_block_given_p()) rb_yield( rb_ary_new3(2, relname, be_pid) );

    return relname;
}