next up previous contents
Next: Purely Synchronous Protocol Up: More Example Protocols Previous: Reassembly

Synchrony, Timeouts, and Blocking

We now show an example of a hybrid protocol, named CHAN, that turns an underlying asynchronous communication service into a synchronous communication service. That is, CHAN supports the xPush/xDemux/xPop from below, and the xCall/xCallDemux/xCallPop from above. CHAN is also interesting because it illustrates how timeouts are scheduled in the x-kernel , and how how a protocol blocks, waiting for a reply message.

CHAN supports request/reply channels between a pair of hosts. The client sends a request message on a channel and blocks waiting for a reply message. The server accepts the request message and responds with a reply message. The protocol defines two key data structures: ChanHdr and ChanState. Both of these data structures are defined in a private .h file (e.g., chan_internal.h), rather than the chan.h file included by other protocols.

The fields in ChanHdr are fairly straightforward. The Type field specifies the type of the message; in this case, the possible types are REQ, REP, ACK and PROBE. The ProtNum field identifies the high-level protocol that depends on CHAN. The CID field uniquely identifies the logical channel to which this message belongs. This is a 16-bit field, meaning that CHAN supports up to 64K concurrent request/reply transactions between any pair of hosts. The MID field uniquely identifies each request/reply pair; the reply message has the same MID as the request. Finally, the BID field gives the boot id for the host. A machine's boot id is a number that is incremented each time the machine reboots; this number is read from disk, incremented, and written back to disk during the machine's startup procedure. This number is then put in every message sent by that host.

The fields in ChanState will be explained by the code that follows. The one thing to note at this point is that ChanState includes a hdr_template field, which is a copy of the CHAN header. Many of the fields in the CHAN header remain the same for all messages sent out over this channel. These fields are filled in when the channel is created (not shown); only the fields that change are modified before a given message is transmitted.

typedef struct {
    u_short Type;       /* message type: REQ, REP, ACK, PROBE */
    u_short CID;        /* unique channel id */
    int     MID;        /* unique message id */
    int     BID;        /* unique boot id */
    int     Length;     /* length of message */
    int     ProtNum;    /* high-level protocol number */
} ChanHdr;

typedef struct {
    u_char    type;             /* type of session: CLIENT or SERVER */
    u_char    status;           /* status of session: BUSY or IDLE */
    Event     event;            /* place to save timeout event */
    int       retries;          /* number of times retransmitted */
    int       timeout;          /* timeout value */
    XkReturn  ret_val;          /* place to save return value */
    Msg       *request;         /* place to save request message */
    Msg       *reply;           /* place to save reply message */
    Semaphore reply_sem;        /* semaphore the client blocks on */
    int       mid;              /* message id for this channel */
    int       bid;              /* boot id for this channel */
    ChanHdr   hdr_template;     /* header template for this channel */
} ChanState;

The CHAN-specific implementation of xCall is given by the following routine, named chanCall. The first thing to notice is that ChanState includes a field named status that indicates whether or not this channel is being used. If the channel is currently in use, then chanCall returns failure.

The next thing to notice about chanCall is that after filling out the message header and transmitting the request message, the calling process is blocked on a semaphore ( reply_sem); semWait is the x-kernel semaphore operation introduced in Section 2. When the reply message eventually arrives, it is processed by CHAN's xPop routine (see below), which copies the reply message into state variable reply, and signals this blocked process. The process then returns. Should the reply message not arrive, then timeout routine retransmit is called (see below). This event is scheduled in the body of chanCall.

static XkReturn
chanCall(Sessn self, Msg *msg, Msg *rmsg)
{
    ChanState *state = (ChanState *)self->state;
    ChanHdr   *hdr;
    char      *buf;

    /* ensure only one transaction per channel */
    if (state->status != IDLE)
        return XK_FAILURE;
    state->status = BUSY;

    /* save a copy of request msg and pointer to reply msg*/
    msgConstructCopy(&state->request, msg);
    state->reply = rmsg;

    /* fill out header fields */
    hdr = state->hdr_template;
    hdr->Length = msgLength(msg);
    if (state->mid == MAX_MID)
        state->mid = 0;
    hdr->MID = ++state->mid;

    /* attach header to msg and send it */
    buf = msgPush(msg, HDR_LEN);
    chan_hdr_store(hdr, buf, HDR_LEN);
    xPush(xGetDown(self, 0), msg);

    /* schedule first timeout event */
    state->retries = 1;
    state->event   = evSchedule(retransmit, self, state->timeout);

    /* wait for the reply msg */
    semWait(&state->reply_sem);

    /* clean up state and return */
    flush_msg(state->request);
    state->status = IDLE;
    return state->ret_val;
}

The next routine ( retransmit) is called whenever the retransmit timer fires. It is scheduled for the first time in chanCall, but each time it is called, it reschedules itself. Once the request message has been retransmitted four times, CHAN gives up: it sets the return value to XK_FAILURE and wakes up the blocked client process. Finally, the reason retransmit first checks to see if the event was cancelled is that there is a potential race condition between when evCancel is invoked and when the event actually executes. Note that each time retransmit executes and sends another copy of the request message, it needs to re-save the message in state variable request. This is because each time a protocol calls the xPush operation on a message, it loses its reference to the message.

static void
retransmit(Event ev, int *arg)
{
    Sessn     s = (Sessn)arg;
    ChanState *state = (ChanState *)s->state;
    Msg       tmp;

    /* see if event was cancelled */
    if (evIsCancelled(ev))
        return;

    /* unblock the client process if we have retried 4 times */
    if (++state->retries > 4) {
       state->ret_val = XK_FAILURE;
       semSignal(state->rep_sem);
       return;
    }

    /* retransmit request message */
    msgConstructCopy(&tmp, &state->request);
    xPush(xGetDown(s, 0), &tmp);

    /* reschedule event with exponential backoff */
    evDetach(state->event);
    state->timeout = 2*state->timeout;
    state->event = evSchedule(retransmit, s, state->timeout);
}

CHAN's chanPop routine is very simple. This is because CHAN is an asymmetric protocol: the code that implements CHAN on the client machine is completely distinct from the code that implements CHAN on the server machine. In fact, any given CHAN session will always be a purely client session or a purely server session, and this fact is stored in a session state variable ( type). Thus, all chanPop does is check to see whether it is a server session (one that expects REQ messages), or a client session (one that expects REP messages), and calls the appropriate client- or server-specific routine. In this case, we show only the client-specific routine.

static XkReturn
chanPop(Sessn self, Sessn lls, Msg *msg, void *inHdr)
{
    /* see if this is a CLIENT or SERVER session */
    if (self->state->type == SERVER)
        return(chanServerPop(self, lls, msg, inHdr));
    else
        return(chanClientPop(self, lls, msg, inHdr));
}

The client-specific pop routine ( chanClientPop) is given below. This routine first checks to see if it has received the expected message, for example, that it has the right MID, the right BID, and is of type REP or ACK. This check is made in subroutine clnt_msg_ok (not shown). If it is a valid acknowledgment message, then chanClientPop cancels the retransmit timer and schedules the probe timer. The probe timer is not shown, but would be similar to the retransmit timer given above. If the message is a valid reply, then chanClientPop cancels the retransmit timer, saves a copy of the reply message in state variable reply, and wakes up the blocked client process. It is this client process that actually returns the reply message to the high-level protocol; the process that called chanClientPop simply returns back down the protocol stack.

static XkReturn
chanClientPop(Sessn self, Sessn lls, Msg *msg, void *inHdr)
{
    ChanState *state = (ChanState *)self->state;
    ChanHdr   *hdr = (ChanHdr *)inHdr;

    /* verify correctness of msg header */
    if (!clnt_msg_ok(state, hdr))
        return XK_FAILURE;

    /* cancel retransmit timeout event */
    evCancel(state->event);

    /* if this is an ACK, then schedule PROBE timer and exit*/
    if (hdr->Type == ACK) {
       state->event = evSchedule(probe, s, PROBE);
       return XK_SUCCESS;
    }

    /* msg must be a REP, so save it and signal blocked client process */
    msgAssign(state->reply, msg);
    state->ret_val = XK_SUCCESS;
    semSignal(&state->reply_sem);

    return XK_SUCCESS;
}



next up previous contents
Next: Purely Synchronous Protocol Up: More Example Protocols Previous: Reassembly



Larry Peterson
Wed Feb 21 13:58:06 MST 1996