| plan 9 kernel history: overview | file list | diff list |
1990/0321/port/sturp.c (diff list | history)
| port/sturp.c on 1990/0312 | ||
| 1990/0312 | #include "u.h" #include "lib.h" #include "mem.h" #include "dat.h" #include "fns.h" #include "io.h" #include "errno.h" | |
| 1990/0227 | enum { Nurp= 32, MSrexmit= 1000, | |
| 1990/0312 | Nmask= 0x7, | |
| 1990/0227 | }; | |
| 1990/0312 | #define DPRINT if(0) | |
| 1990/0227 | typedef struct Urp Urp; | |
| 1990/0312 | #define NOW (MACHP(0)->ticks*MS2HZ) | |
| 1990/0227 | /* * URP status */ struct urpstat { ulong input; /* bytes read from urp */ ulong output; /* bytes output to urp */ ulong rxmit; /* retransmit rejected urp msg */ ulong rjtrs; /* reject, trailer size */ ulong rjpks; /* reject, packet size */ ulong rjseq; /* reject, sequence number */ ulong levelb; /* unknown level b */ ulong enqsx; /* enqs sent */ ulong enqsr; /* enqs rcved */ } urpstat; struct Urp { | |
| 1990/0312 | QLock; | |
| 1990/0227 | short state; /* flags */ | |
| 1990/0312 | Rendez r; /* process waiting for close */ | |
| 1990/0227 | /* input */ | |
| 1990/0321 | QLock ack; /* ack lock */ | |
| 1990/0312 | Queue *rq; /* input queue */ | |
| 1990/0227 | uchar iseq; /* last good input sequence number */ uchar lastecho; /* last echo/rej sent */ uchar trbuf[3]; /* trailer being collected */ short trx; /* # bytes in trailer being collected */ | |
| 1990/0321 | int blocks; | |
| 1990/0227 | /* output */ | |
| 1990/0312 | QLock xmit; /* output lock, only one process at a time */ Queue *wq; /* output queue */ int maxout; /* maximum outstanding unacked blocks */ | |
| 1990/0227 | int maxblock; /* max block size */ | |
| 1990/0312 | int next; /* next block to send */ int unechoed; /* first unechoed block */ int unacked; /* first unacked block */ int nxb; /* next xb to use */ | |
| 1990/0227 | Block *xb[8]; /* the xmit window buffer */ | |
| 1990/0312 | QLock xl[8]; ulong timer; /* timeout for xmit */ int kstarted; | |
| 1990/0227 | }; | |
| 1990/0312 | #define WINDOW(u) ((u->unechoed + u->maxout - u->next)%8) #define IN(x, f, n) (f<=n ? x>=f && x<n : x<n || x>=f) #define NEXT(x) (((x)+1)&Nmask) | |
| 1990/0227 | /* * Protocol control bytes */ #define SEQ 0010 /* sequence number, ends trailers */ #undef ECHO #define ECHO 0020 /* echos, data given to next queue */ #define REJ 0030 /* rejections, transmission error */ #define ACK 0040 /* acknowledgments */ #define BOT 0050 /* beginning of trailer */ #define BOTM 0051 /* beginning of trailer, more data follows */ #define BOTS 0052 /* seq update algorithm on this trailer */ #define SOU 0053 /* start of unsequenced trailer */ #define EOU 0054 /* end of unsequenced trailer */ #define ENQ 0055 /* xmitter requests flow/error status */ #define CHECK 0056 /* xmitter requests error status */ #define INITREQ 0057 /* request initialization */ #define INIT0 0060 /* disable trailer processing */ #define INIT1 0061 /* enable trailer procesing */ #define AINIT 0062 /* response to INIT0/INIT1 */ #undef DELAY #define DELAY 0100 /* real-time printing delay */ #define BREAK 0110 /* Send/receive break (new style) */ | |
| 1990/0312 | #define REJECTING 0x1 #define INITING 0x2 #define HUNGUP 0x4 #define OPEN 0x8 #define CLOSING 0x10 | |
| 1990/0227 | Urp urp[Nurp]; /* * predeclared */ | |
| 1990/0312 | static void urpciput(Queue*, Block*); static void urpiput(Queue*, Block*); static void urpoput(Queue*, Block*); | |
| 1990/0227 | static void urpopen(Queue*, Stream*); static void urpclose(Queue *); | |
| 1990/0312 | static void output(Urp*); static void sendblock(Urp*, int); | |
| 1990/0227 | static void rcvack(Urp*, int); static void flushinput(Urp*); | |
| 1990/0312 | static void sendctl(Urp*, int); | |
| 1990/0321 | static void sendack(Urp*); static void sendrej(Urp*); | |
| 1990/0227 | static void initoutput(Urp*, int); static void initinput(Urp*, int); | |
| 1990/0312 | static void urpkproc(void *arg); | |
| 1990/0227 | ||
| 1990/0312 | Qinfo urpinfo = { urpciput, urpoput, urpopen, urpclose, "urp" }; | |
| 1990/0227 | ||
| 1990/0312 | static void | |
| 1990/0227 | urpopen(Queue *q, Stream *s) { Urp *up; int i; | |
| 1990/0312 | char name[128]; | |
| 1990/0227 | /* * find a free urp structure */ for(up = urp; up < &urp[Nurp]; up++){ qlock(up); if(up->state == 0) break; qunlock(up); } if(up == &urp[Nurp]) error(0, Egreg); | |
| 1990/0312 | q->ptr = q->other->ptr = up; | |
| 1990/0227 | up->rq = q; | |
| 1990/0312 | up->wq = q->other; up->state = OPEN; | |
| 1990/0227 | qunlock(up); initinput(up, 0); initoutput(up, 0); | |
| 1990/0312 | /* * start the ack/(re)xmit process */ if(up->kstarted == 0){ up->kstarted = 1; sprint(name, "**urp%d**", up - urp); kproc(name, urpkproc, up); } | |
| 1990/0227 | } /* | |
| 1990/0312 | * Shut down the connection and kill off the kernel process | |
| 1990/0227 | */ static int | |
| 1990/0312 | isflushed(void *a) | |
| 1990/0227 | { Urp *up; up = (Urp *)a; | |
| 1990/0312 | return (up->state&HUNGUP) || (up->unechoed==up->next && up->wq->len==0); | |
| 1990/0227 | } | |
| 1990/0312 | static int isdead(void *a) { Urp *up; up = (Urp *)a; return up->kstarted == 0; } static void | |
| 1990/0227 | urpclose(Queue *q) { Block *bp; Urp *up; int i; up = (Urp *)q->ptr; /* | |
| 1990/0312 | * wait for all outstanding messages to drain, tell kernel * process we're closing. | |
| 1990/0227 | */ | |
| 1990/0312 | up->state |= CLOSING; sleep(&up->r, isflushed, up); /* * ack all outstanding messages */ qlock(&up->xmit); up->state |= HUNGUP; i = up->next - 1; if(i < 0) i = 7; rcvack(up, ECHO+i); qunlock(&up->xmit); /* * kill off the kernel process */ wakeup(&up->rq->r); | |
| 1990/0227 | } /* * upstream control messages */ static void urpctliput(Urp *up, Queue *q, Block *bp) { switch(bp->type){ case M_HANGUP: | |
| 1990/0312 | up->state |= HUNGUP; wakeup(&up->r); wakeup(&up->rq->r); break; | |
| 1990/0227 | } PUTNEXT(q, bp); } /* | |
| 1990/0312 | * character mode input. * * the first byte in every message is a ctl byte (which belongs at the end). | |
| 1990/0227 | */ void | |
| 1990/0312 | urpciput(Queue *q, Block *bp) | |
| 1990/0227 | { Urp *up; | |
| 1990/0312 | int i; int ctl; | |
| 1990/0227 | up = (Urp *)q->ptr; if(bp->type != M_DATA){ urpctliput(up, q, bp); return; } /* * get the control character */ | |
| 1990/0312 | ctl = *bp->rptr++; if(ctl < 0) return; | |
| 1990/0227 | /* | |
| 1990/0312 | * take care of any data | |
| 1990/0227 | */ | |
| 1990/0312 | if(BLEN(bp)>0 && q->next->len<Streamhi) | |
| 1990/0227 | PUTNEXT(q, bp); else freeb(bp); /* * handle the control character */ switch(ctl){ case 0: break; case ENQ: urpstat.enqsr++; | |
| 1990/0312 | sendctl(up, up->lastecho); sendctl(up, ACK+up->iseq); | |
| 1990/0227 | break; case CHECK: | |
| 1990/0312 | sendctl(up, ACK+up->iseq); | |
| 1990/0227 | break; case AINIT: up->state &= ~INITING; flushinput(up); | |
| 1990/0312 | wakeup(&up->rq->r); | |
| 1990/0227 | break; case INIT0: case INIT1: | |
| 1990/0312 | sendctl(up, AINIT); if(ctl == INIT1) q->put = urpiput; | |
| 1990/0227 | initinput(up, 0); break; case INITREQ: initoutput(up, 0); break; case BREAK: break; | |
| 1990/0312 | case REJ+0: case REJ+1: case REJ+2: case REJ+3: case REJ+4: case REJ+5: case REJ+6: case REJ+7: rcvack(up, ctl); break; | |
| 1990/0227 | case ACK+0: case ACK+1: case ACK+2: case ACK+3: case ACK+4: case ACK+5: case ACK+6: case ACK+7: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7: rcvack(up, ctl); break; case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7: | |
| 1990/0321 | qlock(&up->ack); | |
| 1990/0312 | i = ctl & Nmask; if(q->next->len < Streamhi) sendctl(up, up->lastecho = ECHO+i); up->iseq = i; | |
| 1990/0321 | qunlock(&up->ack); | |
| 1990/0227 | break; } } /* | |
| 1990/0312 | * block mode input. * * the first byte in every message is a ctl byte (which belongs at the end). * * Simplifying assumption: one put == one message && the control byte * is in the first block. If this isn't true, strange bytes will be * used as control bytes. | |
| 1990/0227 | */ void | |
| 1990/0312 | urpiput(Queue *q, Block *bp) | |
| 1990/0227 | { Urp *up; | |
| 1990/0312 | int i; int ctl; | |
| 1990/0227 | up = (Urp *)q->ptr; if(bp->type != M_DATA){ urpctliput(up, q, bp); return; } /* * get the control character */ | |
| 1990/0312 | ctl = *bp->rptr++; | |
| 1990/0227 | /* * take care of any block count(trx) */ | |
| 1990/0312 | while(up->trx){ if(BLEN(bp)<=0) break; | |
| 1990/0227 | switch (up->trx) { case 1: case 2: up->trbuf[up->trx++] = *bp->rptr++; continue; default: up->trx = 0; break; } } /* | |
| 1990/0312 | * queue the block(s) | |
| 1990/0227 | */ | |
| 1990/0312 | if(BLEN(bp) > 0){ putq(q, bp); q->last->flags &= ~S_DELIM; if(q->len > 4*1024){ | |
| 1990/0227 | flushinput(up); return; } } else freeb(bp); /* * handle the control character */ switch(ctl){ case 0: break; case ENQ: | |
| 1990/0321 | DPRINT("rENQ %d %uo %uo\n", up->blocks, up->lastecho, ACK+up->iseq); up->blocks = 0; | |
| 1990/0227 | urpstat.enqsr++; | |
| 1990/0312 | sendctl(up, up->lastecho); sendctl(up, ACK+up->iseq); | |
| 1990/0227 | flushinput(up); break; case CHECK: | |
| 1990/0312 | sendctl(up, ACK+up->iseq); | |
| 1990/0227 | break; case AINIT: up->state &= ~INITING; flushinput(up); | |
| 1990/0312 | wakeup(&up->rq->r); | |
| 1990/0227 | break; case INIT0: case INIT1: | |
| 1990/0312 | sendctl(up, AINIT); if(ctl == INIT0) | |
| 1990/0227 | q->put = urpciput; initinput(up, 0); break; case INITREQ: initoutput(up, 0); break; case BREAK: break; case BOT: case BOTS: case BOTM: up->trx = 1; up->trbuf[0] = ctl; break; case REJ+0: case REJ+1: case REJ+2: case REJ+3: case REJ+4: case REJ+5: case REJ+6: case REJ+7: | |
| 1990/0321 | DPRINT("rREJ\n"); | |
| 1990/0227 | rcvack(up, ctl); break; case ACK+0: case ACK+1: case ACK+2: case ACK+3: case ACK+4: case ACK+5: case ACK+6: case ACK+7: case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3: case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7: rcvack(up, ctl); break; /* | |
| 1990/0312 | * if the seuence number is the next expected * and te trailer length == 3 * and the block count matches the bytes received * then send the bytes upstream. | |
| 1990/0227 | */ case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3: case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7: | |
| 1990/0312 | i = ctl & Nmask; | |
| 1990/0227 | if(up->trx != 3){ urpstat.rjtrs++; | |
| 1990/0321 | sendrej(up); | |
| 1990/0227 | break; } else if(q->len != up->trbuf[1] + (up->trbuf[2]<<8)){ urpstat.rjpks++; | |
| 1990/0321 | sendrej(up); | |
| 1990/0227 | break; | |
| 1990/0312 | } else if(i != ((up->iseq+1)&Nmask)) { | |
| 1990/0227 | urpstat.rjseq++; | |
| 1990/0321 | sendrej(up); | |
| 1990/0227 | break; } /* * send data upstream */ if(q->first) { | |
| 1990/0312 | if(up->trbuf[0] != BOTM) q->last->flags |= S_DELIM; | |
| 1990/0227 | while(bp = getq(q)) | |
| 1990/0312 | PUTNEXT(q, bp); | |
| 1990/0227 | } else { bp = allocb(0); | |
| 1990/0312 | bp->flags |= S_DELIM; | |
| 1990/0227 | PUTNEXT(q, bp); | |
| 1990/0312 | } | |
| 1990/0227 | up->trx = 0; /* * acknowledge receipt */ | |
| 1990/0321 | qlock(&up->ack); | |
| 1990/0312 | up->iseq = i; | |
| 1990/0321 | if(q->next->len < Streamhi) sendctl(up, up->lastecho = ECHO|i); qunlock(&up->ack); | |
| 1990/0227 | break; } } /* * downstream control */ static void urpctloput(Urp *up, Queue *q, Block *bp) { | |
| 1990/0312 | char *fields[2]; | |
| 1990/0227 | int n; int inwin=0, outwin=0; switch(bp->type){ case M_CTL: if(streamparse("init", bp)){ | |
| 1990/0312 | switch(getfields((char *)bp->rptr, fields, 2, ' ')){ | |
| 1990/0227 | case 2: inwin = strtoul(fields[1], 0, 0); case 1: outwin = strtoul(fields[0], 0, 0); } | |
| 1990/0312 | /* initinput(up, inwin); */ | |
| 1990/0227 | initoutput(up, outwin); freeb(bp); return; } } PUTNEXT(q, bp); } /* | |
| 1990/0312 | * accept data from a writer | |
| 1990/0227 | */ | |
| 1990/0312 | static void | |
| 1990/0227 | urpoput(Queue *q, Block *bp) { Urp *up; up = (Urp *)q->ptr; if(bp->type != M_DATA){ urpctloput(up, q, bp); return; } | |
| 1990/0312 | urpstat.output += BLEN(bp); putq(q, bp); output(up); | |
| 1990/0227 | } /* | |
| 1990/0312 | * start output | |
| 1990/0227 | */ | |
| 1990/0312 | static void output(Urp *up) | |
| 1990/0227 | { | |
| 1990/0312 | Block *bp, *nbp; ulong now; Queue *q; int n; | |
| 1990/0227 | ||
| 1990/0312 | if(!canqlock(&up->xmit)) return; | |
| 1990/0227 | ||
| 1990/0312 | if(waserror()){ print("urp output error\n"); qunlock(&up->xmit); nexterror(); } | |
| 1990/0227 | ||
| 1990/0312 | /* * if still initing and it's time to rexmit, send an INIT1 */ now = NOW; if(up->state & INITING){ if(now > up->timer){ sendctl(up, INIT1); up->timer = now + MSrexmit; | |
| 1990/0227 | } | |
| 1990/0312 | qunlock(&up->xmit); poperror(); return; | |
| 1990/0227 | } /* | |
| 1990/0312 | * fill the transmit buffers | |
| 1990/0227 | */ | |
| 1990/0312 | q = up->wq; for(bp = getq(q); bp && up->xb[up->nxb]==0; up->nxb = NEXT(up->nxb)){ if(BLEN(bp) > up->maxblock){ nbp = up->xb[up->nxb] = allocb(0); nbp->rptr = bp->rptr; nbp->wptr = bp->rptr = bp->rptr + up->maxblock; } else { up->xb[up->nxb] = bp; bp = getq(q); | |
| 1990/0227 | } } | |
| 1990/0312 | if(bp) putbq(q, bp); /* print("output w(%d) up->xb[%d](%ux) up->nxb(%d) up->state(%ux)\n", WINDOW(up), up->next, up->xb[up->next], up->nxb, up->state); /**/ /* * if a retransmit time has elapsed since a transmit, send an ENQ */ if(up->unechoed != up->next && NOW > up->timer){ | |
| 1990/0321 | DPRINT("sENQ\n"); | |
| 1990/0312 | up->timer = NOW + MSrexmit; up->state &= ~REJECTING; sendctl(up, ENQ); qunlock(&up->xmit); poperror(); return; } /* * if there's a window open, push some blocks out */ while(WINDOW(up)>0 && up->xb[up->next]!=0 && canqlock(&up->xl[up->next])){ if(up->xb[up->next]) sendblock(up, up->next); qunlock(&up->xl[up->next]); up->next = NEXT(up->next); } qunlock(&up->xmit); poperror(); | |
| 1990/0227 | } /* | |
| 1990/0312 | * send a control byte, put the byte at the end of the allocated * space in case a lower layer needs header room. | |
| 1990/0227 | */ | |
| 1990/0312 | static void sendctl(Urp *up, int ctl) | |
| 1990/0227 | { | |
| 1990/0312 | Block *bp; | |
| 1990/0227 | ||
| 1990/0312 | if(up->wq->next->len > Streamhi) | |
| 1990/0227 | return; | |
| 1990/0312 | bp = allocb(1); bp->wptr = bp->lim; bp->rptr = bp->lim-1; *bp->rptr = ctl; bp->flags |= S_DELIM; PUTNEXT(up->wq, bp); | |
| 1990/0227 | } | |
| 1990/0312 | /* | |
| 1990/0321 | * send a reject */ static void sendrej(Urp *up) { flushinput(up); qlock(&up->ack); if((up->lastecho&~Nmask) == ECHO){ DPRINT("REJ %d\n", up->iseq); sendctl(up, up->lastecho = REJ|up->iseq); } qunlock(&up->ack); } /* * send an acknowledge */ static void sendack(Urp *up) { Block *bp; /* * check the precondition for acking */ if(up->rq->next->len>=Streamhi || (up->lastecho&Nmask)==up->iseq) return; if(!canqlock(&up->ack)) return; /* * check again now that we've locked */ if(up->rq->next->len>=Streamhi || (up->lastecho&Nmask)==up->iseq){ qunlock(&up->ack); return; } /* * send the ack */ sendctl(up, up->lastecho = ECHO|up->iseq); qunlock(&up->ack); } /* | |
| 1990/0312 | * send a block. */ static void sendblock(Urp *up, int bn) | |
| 1990/0227 | { | |
| 1990/0312 | Block *bp, *m, *nbp; int n; | |
| 1990/0227 | ||
| 1990/0312 | up->timer = NOW + MSrexmit; if(up->wq->next->len > Streamhi) return; /* * message 1, the BOT and the data */ bp = up->xb[bn]; m = allocb(1); m->rptr = m->lim - 1; m->wptr = m->lim; *m->rptr = (bp->flags & S_DELIM) ? BOT : BOTM; nbp = m->next = allocb(0); nbp->rptr = bp->rptr; nbp->wptr = bp->wptr; nbp->flags |= S_DELIM; PUTNEXT(up->wq, m); /* * message 2, the block length and the SEQ */ m = allocb(3); m->rptr = m->lim - 3; m->wptr = m->lim; n = BLEN(bp); m->rptr[0] = SEQ | bn; m->rptr[1] = n; m->rptr[2] = n<<8; m->flags |= S_DELIM; PUTNEXT(up->wq, m); | |
| 1990/0227 | } /* * receive an acknowledgement */ static void rcvack(Urp *up, int msg) { int seqno; int next; | |
| 1990/0312 | seqno = msg&Nmask; next = NEXT(seqno); | |
| 1990/0227 | ||
| 1990/0312 | /* * release any acknowledged blocks */ if(IN(seqno, up->unacked, up->next)){ for(; up->unacked != next; up->unacked = NEXT(up->unacked)){ qlock(&up->xl[up->unacked]); if(up->xb[up->unacked]) freeb(up->xb[up->unacked]); up->xb[up->unacked] = 0; qunlock(&up->xl[up->unacked]); } } | |
| 1990/0227 | switch(msg & 0370){ case ECHO: | |
| 1990/0312 | if(IN(seqno, up->unechoed, up->next)) { up->unechoed = next; | |
| 1990/0227 | } | |
| 1990/0312 | /* * the next reject at the start of a window starts a * retransmission. */ up->state &= ~REJECTING; | |
| 1990/0227 | break; case REJ: | |
| 1990/0312 | if(IN(seqno, up->unechoed, up->next)) up->unechoed = next; /* * ... FALL THROUGH ... */ | |
| 1990/0227 | case ACK: | |
| 1990/0312 | /* * start a retransmission if we aren't retransmitting * and this is the start of a window. */ if(up->unechoed==next && !(up->state & REJECTING)){ up->state |= REJECTING; up->next = next; | |
| 1990/0227 | } break; } | |
| 1990/0312 | wakeup(&up->rq->r); | |
| 1990/0227 | } /* * throw away any partially collected input */ static void flushinput(Urp *up) { Block *bp; while (bp = getq(up->rq)) freeb(bp); up->trx = 0; } /* * initialize output */ static void initoutput(Urp *up, int window) { | |
| 1990/0312 | int i; | |
| 1990/0227 | /* * set output window */ up->maxblock = window/4; if(up->maxblock < 64) up->maxblock = 64; if(up->maxblock > Streamhi/4) up->maxblock = Streamhi/4; | |
| 1990/0312 | up->maxblock -= 4; up->maxout = 3; | |
| 1990/0227 | /* * set sequence varialbles */ | |
| 1990/0312 | up->unechoed = 1; up->unacked = 1; up->next = 1; up->nxb = 1; | |
| 1990/0227 | /* | |
| 1990/0312 | * free any outstanding blocks */ for(i = 0; i < 8; i++){ qlock(&up->xl[i]); if(up->xb[i]) freeb(up->xb[i]); | |
| 1990/0315 | up->xb[i] = 0; | |
| 1990/0312 | qunlock(&up->xl[i]); } /* | |
| 1990/0227 | * tell the other side we've inited */ | |
| 1990/0312 | up->state |= INITING; up->timer = NOW + MSrexmit; sendctl(up, INIT1); | |
| 1990/0227 | } /* * initialize input */ static void initinput(Urp *up, int window) { /* * restart all sequence parameters */ | |
| 1990/0321 | up->blocks = 0; | |
| 1990/0227 | up->trx = 0; up->iseq = 0; up->lastecho = ECHO+0; flushinput(up); | |
| 1990/0312 | } /* * do retransmissions etc */ static int todo(void *arg) { Urp *up; up = (Urp *)arg; return (WINDOW(up)>0 && up->wq->len>0 && !(up->state&INITING)); } static void urpkproc(void *arg) { Urp *up; up = (Urp *)arg; for(;;){ if(up->state & (HUNGUP|CLOSING)){ if(isflushed(up)) wakeup(&up->r); if(up->state & HUNGUP) break; } | |
| 1990/0321 | sendack(up); | |
| 1990/0312 | output(up); tsleep(&up->rq->r, todo, up, MSrexmit/2); } up->kstarted = 0; up->state = 0; | |
| 1990/0227 | } | |