| plan 9 kernel history: overview | file list | diff list |
1993/1227/port/qio.c (diff list | history)
| port/qio.c on 1993/0526 | ||
| 1993/0526 | #include "u.h" #include "../port/lib.h" #include "mem.h" #include "dat.h" #include "fns.h" #include "../port/error.h" | |
| 1993/0530 | /* * interrupt level memory allocation */ | |
| 1993/0526 | typedef struct Chunk Chunk; typedef struct Chunkl Chunkl; typedef struct Arena Arena; enum { Minpow= 7, Maxpow= 12, }; struct Chunk { Chunk *next; }; struct Chunkl { Lock; Chunk *first; int have; int goal; int hist; | |
| 1993/0819 | int wanted; | |
| 1993/0526 | }; struct Arena { Chunkl alloc[Maxpow+1]; Chunkl freed; Rendez r; }; static Arena arena; /* | |
| 1993/0530 | * IO queues */ typedef struct Block Block; typedef struct Queue Queue; struct Block { Block *next; uchar *rp; /* first unconsumed byte */ uchar *wp; /* first empty byte */ uchar *lim; /* 1 past the end of the buffer */ uchar *base; /* start of the buffer */ uchar flag; }; #define BLEN(b) ((b)->wp - (b)->rp) struct Queue { Lock; Block *bfirst; /* buffer */ Block *blast; int len; /* bytes in queue */ int limit; /* max bytes in queue */ int state; | |
| 1993/0908 | int eof; /* number of eofs read by user */ | |
| 1993/0530 | void (*kick)(void*); /* restart output */ void *arg; /* argument to kick */ QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */ }; enum { /* Block.flag */ Bfilled=1, /* block filled */ /* Queue.state */ Qstarve= (1<<0), /* consumer starved */ Qmsg= (1<<1), /* message stream */ Qclosed= (1<<2), Qflow= (1<<3), }; | |
| 1993/0811 | void poison(Block *b) { b->next = (void*)0xdeadbabe; b->rp = (void*)0xdeadbabe; b->wp = (void*)0xdeadbabe; b->lim = (void*)0xdeadbabe; b->base = (void*)0xdeadbabe; } | |
| 1993/0530 | /* | |
| 1993/0526 | * Manage interrupt level memory allocation. */ static void iallockproc(void *arg) { Chunk *p, *first, **l; Chunkl *cl; int pow, x, i; USED(arg); for(;;){ tsleep(&arena.r, return0, 0, 500); /* really free what was freed at interrupt level */ cl = &arena.freed; if(cl->first){ x = splhi(); lock(cl); first = cl->first; cl->first = 0; unlock(cl); splx(x); | |
| 1993/0804 | while(first != 0) { | |
| 1993/0526 | p = first->next; free(first); | |
| 1993/0804 | first = p; | |
| 1993/0526 | } } /* make sure we have blocks available for interrupt level */ for(pow = Minpow; pow <= Maxpow; pow++){ cl = &arena.alloc[pow]; /* * if we've been ahead of the game for a while * start giving blocks back to the general pool */ if(cl->have >= cl->goal){ | |
| 1993/0819 | cl->hist = ((cl->hist<<1) | 1) & 0xffff; if(cl->hist == 0xffff && cl->goal > 32) | |
| 1993/0526 | cl->goal--; continue; } else cl->hist <<= 1; /* | |
| 1993/0819 | * increase goal if we've been drained. | |
| 1993/0526 | */ | |
| 1993/0819 | if(cl->have == 0){ | |
| 1993/1102 | i = cl->goal>>1; | |
| 1993/0819 | if(cl->wanted > i) | |
| 1993/1102 | cl->goal += 2*cl->wanted; | |
| 1993/0819 | else cl->goal += i; cl->wanted = 0; } | |
| 1993/0526 | first = 0; l = &first; | |
| 1993/0804 | i = cl->goal - cl->have; for(x = i; x > 0; x--){ | |
| 1993/0526 | p = malloc(1<<pow); if(p == 0) break; | |
| 1993/0804 | ||
| 1993/0526 | *l = p; l = &p->next; } | |
| 1993/0819 | i -= x; | |
| 1993/0526 | if(first){ x = splhi(); lock(cl); *l = cl->first; cl->first = first; cl->have += i; unlock(cl); splx(x); } } } } void iallocinit(void) { int pow; Chunkl *cl; for(pow = Minpow; pow <= Maxpow; pow++){ cl = &arena.alloc[pow]; | |
| 1993/1102 | cl->goal = Maxpow-pow + 16; | |
| 1993/0526 | } /* start garbage collector */ | |
| 1993/0725 | kproc("ialloc", iallockproc, 0); | |
| 1993/0526 | } | |
| 1993/0819 | void ixsummary(void) { int pow; Chunkl *cl; print("size have/goal\n"); for(pow = Minpow; pow <= Maxpow; pow++){ cl = &arena.alloc[pow]; print("%d %d/%d\n", 1<<pow, cl->have, cl->goal); } print("\n"); } | |
| 1993/0526 | Block* iallocb(int size) { int pow; Chunkl *cl; Chunk *p; Block *b; size += sizeof(Block); | |
| 1993/0804 | for(pow = Minpow; pow <= Maxpow; pow++){ | |
| 1993/0526 | if(size <= (1<<pow)){ cl = &arena.alloc[pow]; lock(cl); p = cl->first; | |
| 1993/0527 | if(p == 0){ | |
| 1993/0819 | cl->wanted++; | |
| 1993/0527 | unlock(cl); | |
| 1993/0819 | wakeup(&arena.r); | |
| 1993/0527 | return 0; | |
| 1993/0526 | } | |
| 1993/0527 | cl->have--; cl->first = p->next; | |
| 1993/0526 | unlock(cl); b = (Block *)p; | |
| 1993/0528 | memset(b, 0, sizeof(Block)); | |
| 1993/0526 | b->base = (uchar*)(b+1); | |
| 1993/0804 | b->wp = b->base; b->rp = b->base; | |
| 1993/0526 | b->lim = b->base + (1<<pow) - sizeof(Block); return b; } | |
| 1993/0804 | } | |
| 1993/0526 | panic("iallocb %d\n", size); return 0; /* not reached */ } void ifree(void *a) { Chunk *p; Chunkl *cl; cl = &arena.freed; p = a; lock(cl); p->next = cl->first; cl->first = p; unlock(cl); } /* * allocate queues and blocks */ Block* allocb(int size) { Block *b; b = malloc(sizeof(Block) + size); if(b == 0) exhausted("Blocks"); b->base = (uchar*)(b+1); | |
| 1993/0804 | b->rp = b->base; b->wp = b->base; | |
| 1993/0526 | b->lim = b->base + size; | |
| 1993/0527 | b->flag = 0; | |
| 1993/0526 | return b; } /* * Interrupt level copy out of a queue, return # bytes copied. If drop is * set, any bytes left in a block afer a consume are discarded. */ int | |
| 1993/0601 | qconsume(Queue *q, void *vp, int len) | |
| 1993/0526 | { Block *b; | |
| 1993/0528 | int n, dowakeup; | |
| 1993/0601 | uchar *p = vp; | |
| 1993/0526 | ||
| 1993/0527 | /* sync with qwrite */ | |
| 1993/0526 | lock(q); | |
| 1993/0527 | ||
| 1993/0526 | b = q->bfirst; if(b == 0){ q->state |= Qstarve; unlock(q); return -1; } | |
| 1993/0528 | ||
| 1993/0526 | n = BLEN(b); if(n < len) len = n; memmove(p, b->rp, len); | |
| 1993/0527 | if((q->state & Qmsg) || len == n) | |
| 1993/0526 | q->bfirst = b->next; else b->rp += len; q->len -= len; | |
| 1993/0528 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; | |
| 1993/0526 | unlock(q); | |
| 1993/0528 | if(dowakeup) wakeup(&q->wr); /* discard the block if we're done with it */ | |
| 1993/0811 | if((q->state & Qmsg) || len == n) { poison(b); | |
| 1993/0526 | ifree(b); | |
| 1993/0811 | } | |
| 1993/0526 | return len; } | |
| 1993/0528 | int | |
| 1993/0601 | qproduce(Queue *q, void *vp, int len) | |
| 1993/0526 | { Block *b; | |
| 1993/0528 | int dowakeup; | |
| 1993/0601 | uchar *p = vp; | |
| 1993/0526 | ||
| 1993/0527 | /* sync with qread */ | |
| 1993/0526 | lock(q); | |
| 1993/0527 | ||
| 1993/0526 | /* no waiting receivers, room in buffer? */ if(q->len >= q->limit){ unlock(q); return -1; } /* save in buffer */ b = q->bfirst; | |
| 1993/0527 | if((q->state & Qmsg) == 0 && b && b->lim - b->wp <= len){ | |
| 1993/0526 | memmove(b->wp, p, len); b->wp += len; | |
| 1993/0527 | b->flag |= Bfilled; | |
| 1993/0526 | } else { b = iallocb(len); if(b == 0){ unlock(q); | |
| 1993/1103 | return -2; | |
| 1993/0526 | } | |
| 1993/0527 | memmove(b->wp, p, len); | |
| 1993/0526 | b->wp += len; if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; } q->len += len; | |
| 1993/0528 | if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } else dowakeup = 0; | |
| 1993/0526 | unlock(q); | |
| 1993/0527 | ||
| 1993/0601 | if(dowakeup){ if(q->kick) (*q->kick)(q->arg); | |
| 1993/0528 | wakeup(&q->rr); | |
| 1993/0601 | } | |
| 1993/0528 | ||
| 1993/0526 | return len; } /* * called by non-interrupt code */ Queue* | |
| 1993/0530 | qopen(int limit, int msg, void (*kick)(void*), void *arg) | |
| 1993/0526 | { Queue *q; q = malloc(sizeof(Queue)); if(q == 0) | |
| 1993/0528 | return 0; memset(q, 0, sizeof(Queue)); | |
| 1993/0526 | q->limit = limit; q->kick = kick; q->arg = arg; | |
| 1993/0530 | q->state = msg ? Qmsg : 0; | |
| 1993/0601 | q->state |= Qstarve; | |
| 1993/0908 | q->eof = 0; | |
| 1993/0526 | return q; } static int | |
| 1993/0528 | notempty(void *a) | |
| 1993/0526 | { | |
| 1993/0528 | Queue *q = a; | |
| 1993/0526 | ||
| 1993/0528 | return q->bfirst != 0; | |
| 1993/0526 | } | |
| 1993/0528 | /* * read a queue. if no data is queued, post a Block * and wait on its Rendez. */ | |
| 1993/0526 | long | |
| 1993/0601 | qread(Queue *q, void *vp, int len) | |
| 1993/0526 | { | |
| 1993/0528 | Block *b; int x, n, dowakeup; | |
| 1993/0601 | uchar *p = vp; | |
| 1993/0526 | qlock(&q->rlock); | |
| 1993/0527 | if(waserror()){ qunlock(&q->rlock); nexterror(); } | |
| 1993/0526 | ||
| 1993/0528 | /* wait for data */ for(;;){ /* sync with qwrite/qproduce */ x = splhi(); lock(q); | |
| 1993/0526 | ||
| 1993/0728 | b = q->bfirst; if(b) break; | |
| 1993/0528 | if(q->state & Qclosed){ | |
| 1993/0527 | unlock(q); splx(x); | |
| 1993/0908 | poperror(); qunlock(&q->rlock); if(++q->eof > 3) error(Ehungup); | |
| 1993/0528 | return 0; | |
| 1993/0527 | } | |
| 1993/0528 | q->state |= Qstarve; | |
| 1993/0526 | unlock(q); splx(x); | |
| 1993/0528 | sleep(&q->rr, notempty, q); | |
| 1993/0526 | } | |
| 1993/0528 | /* remove a buffered block */ q->bfirst = b->next; n = BLEN(b); | |
| 1993/0526 | q->len -= n; | |
| 1993/0528 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; | |
| 1993/0526 | unlock(q); splx(x); | |
| 1993/0527 | /* do this outside of the lock(q)! */ | |
| 1993/0528 | if(n > len) n = len; memmove(p, b->rp, n); b->rp += n; | |
| 1993/0526 | ||
| 1993/0725 | /* free it or put what's left on the queue */ | |
| 1993/0811 | if(b->rp >= b->wp || (q->state&Qmsg)) { poison(b); | |
| 1993/0528 | free(b); | |
| 1993/0811 | } | |
| 1993/0526 | else { x = splhi(); lock(q); | |
| 1993/0528 | b->next = q->bfirst; q->bfirst = b; q->len += BLEN(b); | |
| 1993/0526 | unlock(q); splx(x); } | |
| 1993/0527 | ||
| 1993/0528 | /* wakeup flow controlled writers (with a bit of histeresis) */ if(dowakeup) wakeup(&q->wr); | |
| 1993/0527 | poperror(); | |
| 1993/0526 | qunlock(&q->rlock); return n; } | |
| 1993/0528 | static int qnotfull(void *a) | |
| 1993/0526 | { | |
| 1993/0528 | Queue *q = a; | |
| 1993/0526 | ||
| 1993/0528 | return q->len < q->limit; } | |
| 1993/0527 | ||
| 1993/0528 | /* * write to a queue. if no reader blocks are posted * queue the data. */ long | |
| 1993/0601 | qwrite(Queue *q, void *vp, int len, int nowait) | |
| 1993/0528 | { int x, dowakeup; Block *b; | |
| 1993/0601 | uchar *p = vp; | |
| 1993/0526 | ||
| 1993/0528 | b = allocb(len); memmove(b->wp, p, len); b->wp += len; | |
| 1993/0526 | ||
| 1993/0528 | /* flow control */ while(!qnotfull(q)){ | |
| 1993/0601 | if(nowait) return len; | |
| 1993/0528 | qlock(&q->wlock); | |
| 1993/1227 | if(waserror()) { qunlock(&q->wlock); nexterror(); } | |
| 1993/0528 | q->state |= Qflow; sleep(&q->wr, qnotfull, q); qunlock(&q->wlock); | |
| 1993/1227 | poperror(); | |
| 1993/0526 | } | |
| 1993/0528 | x = splhi(); lock(q); if(q->state & Qclosed){ | |
| 1993/0527 | unlock(q); splx(x); | |
| 1993/0528 | error(Ehungup); | |
| 1993/0527 | } | |
| 1993/0526 | ||
| 1993/0601 | if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; | |
| 1993/0526 | q->len += len; | |
| 1993/0528 | if(q->state & Qstarve){ | |
| 1993/0526 | q->state &= ~Qstarve; | |
| 1993/0528 | dowakeup = 1; } else dowakeup = 0; | |
| 1993/0526 | unlock(q); splx(x); | |
| 1993/0601 | if(dowakeup){ if(q->kick) (*q->kick)(q->arg); | |
| 1993/0528 | wakeup(&q->rr); | |
| 1993/0601 | } | |
| 1993/0528 | ||
| 1993/0526 | return len; } | |
| 1993/0528 | /* * Mark a queue as closed. No further IO is permitted. * All blocks are released. */ void qclose(Queue *q) | |
| 1993/0527 | { | |
| 1993/0528 | int x; Block *b, *bfirst; | |
| 1993/0527 | ||
| 1993/0528 | /* mark it */ x = splhi(); lock(q); q->state |= Qclosed; bfirst = q->bfirst; q->bfirst = 0; unlock(q); splx(x); | |
| 1993/0527 | ||
| 1993/0528 | /* free queued blocks */ | |
| 1993/0804 | while(bfirst){ b = bfirst->next; | |
| 1993/0811 | poison(bfirst); | |
| 1993/0804 | free(bfirst); bfirst = b; | |
| 1993/0526 | } | |
| 1993/0528 | /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr); } | |
| 1993/0527 | ||
| 1993/0528 | /* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. */ void qhangup(Queue *q) { int x; | |
| 1993/0526 | ||
| 1993/0528 | /* mark it */ x = splhi(); lock(q); q->state |= Qclosed; unlock(q); splx(x); | |
| 1993/0527 | ||
| 1993/0528 | /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr); } /* * mark a queue as no longer hung up */ void qreopen(Queue *q) { q->state &= ~Qclosed; | |
| 1993/0601 | q->state |= Qstarve; | |
| 1993/0908 | q->eof = 0; | |
| 1993/0530 | } /* * return bytes queued */ int qlen(Queue *q) { return q->len; | |
| 1993/0601 | } /* * return true if we can read without blocking */ int qcanread(Queue *q) { return q->bfirst!=0; | |
| 1993/0526 | } | |