/* Linr: a lightweight parallel computation server. Copyright (C) 2011 Stéphane Gimenez. Redistribution, modification, and use are permitted. As of now, this software comes with no guaranties. */ #line 9 "headers.c" #define _XOPEN_SOURCE 500 #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define err(fmt, args...) fprintf(stderr, "error: " fmt, ##args), exit(1) #define wrn(fmt, args...) fprintf(stderr, "warning: " fmt, ##args) #define inf(fmt, args...) fprintf(stderr, fmt, ##args) #ifdef DEBUG #include int debug_level = 2; FILE *dbgfile = NULL; char id[16] = "main"; struct timespec dbgtime; #define dbgf(fmt, args...) { char path[255]; \ sprintf(id, fmt, ##args); sprintf(path, "log/" fmt, ##args); \ dbgfile = fopen(path, "w+"); } #define dbgc() { if (dbgfile) fclose(dbgfile); } #define dbg(i, fmt, args...) if (i <= debug_level && dbgfile) { \ clock_gettime(CLOCK_MONOTONIC, &dbgtime); \ fprintf(dbgfile, "%ld.%09ld %s: " fmt, \ (long)dbgtime.tv_sec, dbgtime.tv_nsec, id, ##args); } #else #define dbgf(fmt, args...) {}; #define dbgc() {}; #define dbg(i, fmt, args...) {}; #endif #line 2 "mem.c" #ifndef A #define A __WORDSIZE #endif #if A > __WORDSIZE || ! (A == 16 || A == 32 || A == 64) #error "address size not supported" #undef A #define A __WORDSIZE #endif #if A == 16 typedef uint16_t maddr; typedef uint_fast16_t addr; #define ASHIFT 3 // 3 is max #define PADDR PRIxFAST16 #elif A == 32 #define ASHIFT 4 // 4 is max typedef uint32_t maddr; typedef uint_fast32_t addr; #define PADDR PRIxFAST32 #elif A == 64 #define ASHIFT 0 // 5 is max, but unnecessary typedef uint64_t maddr; typedef uint_fast64_t addr; #define PADDR PRIxFAST64 #endif #define SSIZE (4 * sizeof(maddr)) #define SSHIFT (SSIZE >> ASHIFT) #define PA "%06" PADDR #define PT "%04" PADDR const int mem_maxbits = 8 * sizeof(maddr) + ASHIFT; uintptr_t mem_ptr; //absolute mapped memory base address uintptr_t mem_size; struct global { maddr slots[4]; maddr cuts[4]; sem_t sem_procs; sem_t sems[4]; maddr slot_base; maddr slot_spawn; maddr slot_limit; //stats #ifdef DEBUG volatile int stat_redex[3]; volatile int stat_alloc[3]; volatile int stat_client[3]; volatile int stat_bytes_in; volatile int stat_bytes_out; #endif }; struct global *glb; const addr glb_slots = 0 * SSHIFT; const addr glb_cuts = 1 * SSHIFT; int up; #ifdef DEBUG int stat_incr(volatile int *stat) { __sync_add_and_fetch(&stat[0], 1); int count = __sync_add_and_fetch(&stat[2], 1); if (count > stat[1]) stat[1] = count; return count; } int stat_decr(volatile int *stat) { return __sync_add_and_fetch(&stat[2], -1); } void stats() { inf("\nstats:\n"); inf(" redexes: total=%-6d max=%-6d cur=%-6d\n", glb->stat_redex[0], glb->stat_redex[1], glb->stat_redex[2]); inf(" allocations: total=%-6d max=%-6d cur=%-6d\n", glb->stat_alloc[0], glb->stat_alloc[1], glb->stat_alloc[2]); inf(" clients: total=%-6d max=%-6d cur=%-6d\n", glb->stat_client[0], glb->stat_client[1], glb->stat_client[2]); inf(" bytes: in=%-6d out=%-6d\n", glb->stat_bytes_in, glb->stat_bytes_out); } #endif inline maddr *ptr(addr a, int o) { return (maddr *)(mem_ptr + (uintptr_t)((a << ASHIFT) + o * sizeof(maddr))); } inline void assign(addr a, int o, addr p) { dbg(7, "asg a="PA" o=%d p="PA"\n", a, o, p); *ptr(a, o) = p; } inline addr deref(addr a, int o) { addr p = *ptr(a, o); dbg(7, "drf a="PA" o=%d p="PA"\n", a, o, p); return p; } inline addr lock_compare_and_swap(addr a, int o, addr old, addr p) { return __sync_val_compare_and_swap(ptr(a, o), (maddr)old, (maddr)p); } inline addr lock_test_and_set(addr a, int o, addr p) { return __sync_lock_test_and_set(ptr(a, o), (maddr)p); } inline void lock_release(addr a, int o) { __sync_lock_release(ptr(a, o)); } #line 2 "list.c" static void list_init(addr a, int sem) { assign(a, 0, 0); assign(a, 1, sem); } static void list_destroy(addr a) { } static void list_push(addr a, addr s) { sem_t *sem = &glb->sems[deref(a, 1)]; dbg(6, "lock %d\n", (int)deref(a, 1)); while (sem_wait(sem)); addr p = deref(a, 0); assign(s, 0, p); assign(a, 0, s); sem_post(sem); } static addr list_pop(addr a) { sem_t *sem = &glb->sems[deref(a, 1)]; dbg(6, "lock %d\n", (int)deref(a, 1)); while (sem_wait(sem)); addr s = deref(a, 0); if (s) assign(a, 0, deref(s, 0)); sem_post(sem); return s; } #line 2 "slot.c" addr slotb[16]; // slot buffer int nslots = 0; // slot quantity const int mslots = 16; // max quantity const int pslots = 8; // slot packet size void slot_spawn() { addr a = __sync_fetch_and_add(&glb->slot_spawn, pslots * SSHIFT); if (a + pslots * SSHIFT > glb->slot_limit) err("out of memory slots\n"); for (int i = 0; i < pslots; i++) slotb[nslots++] = a + i * SSHIFT; dbg(6, "slot_spawn a="PA"\n", a); } void slot_fill() { addr p = list_pop(glb_slots); if (!p) { slot_spawn(); return; } while (p) { slotb[nslots++] = p; p = deref(p, 1); } dbg(5, "slot_fill buf=%d\n", nslots); } void slot_flush() { assign(slotb[--nslots], 1, 0); for (int i = 1; i < pslots; i++) { assign(slotb[nslots - 1], 1, slotb[nslots]); nslots--; } list_push(glb_slots, slotb[nslots]); dbg(5, "slot_flush buf=%d\n", nslots); } addr slot_draw() { if (nslots == 0) slot_fill(); addr p = slotb[--nslots]; #ifdef DEBUG int count = stat_incr(glb->stat_alloc); dbg(5, "slot_draw a="PA" count=%d\n", p, count); #endif return p; } void slot_drop(addr a) { if (nslots >= mslots) slot_flush(); slotb[nslots++] = a; #ifdef DEBUG int count = stat_decr(glb->stat_alloc); dbg(5, "slot_drop a="PA" count=%d\n", a, count); #endif } #line 2 "cut.c" // todo: fifos addr cutsb[16]; // cut buffer int ncuts = 0; // cut quantity const int mcuts = 16; // max quantity void out_push(addr a, addr s) { list_push(a, s); #ifdef DEBUG int count = stat_incr(glb->stat_redex); dbg(3, "out_push a="PA" s="PA" count=%d\n", a, s, count); #endif } addr out_pop(addr a) { addr s = list_pop(a); if (s) { #ifdef DEBUG int count = stat_decr(glb->stat_redex); dbg(3, "out_pop a="PA" s="PA" count=%d\n", a, s, count); #endif } return s; } void cut_fill() { dbg(3, "---\n"); if (sem_wait(&glb->sem_procs) == 0) { dbg(3, "+++\n"); addr s = list_pop(glb_cuts); if (!s) dbg(1, "internal error: no cut\n"); cutsb[ncuts++] = s; dbg(5, "cut_fill buf=%d\n", ncuts); } } void cut_flush() { list_push(glb_cuts, cutsb[--ncuts]); sem_post(&glb->sem_procs); dbg(5, "cut_flush buf=%d\n", ncuts); } void cut_push(addr s) { int tasks; sem_getvalue(&glb->sem_procs, &tasks); if (ncuts >= mcuts || (ncuts > 0 && tasks < ncuts)) cut_flush(); cutsb[ncuts++] = s; #ifdef DEBUG int count = stat_incr(glb->stat_redex); dbg(3, "cut_push s="PA" count=%d\n", s, count); #endif } addr cut_pop() { if (ncuts == 0) cut_fill(); if (ncuts == 0) return 0; addr s = cutsb[--ncuts]; #ifdef DEBUG int count = stat_decr(glb->stat_redex); dbg(3, "cut_pop s="PA" count=%d\n", s, count); #endif return s; } #line 2 "cell.c" // pos0: sort, pos1: aux1, pos2: aux2, pos3: aux3 or lvl static void cell_link(addr c1, int o1, addr c2, int o2) { dbg(4, "cell_link c1="PA" o1=%d c2="PA" o2=%d\n", c1, o1, c2, o2); addr s = slot_draw(); assign(s, 0, 0); assign(c1, o1, s); assign(c2, o2, s); } static void cell_sync(addr c1, addr c2) { addr s = slot_draw(); assign(s, 1, c1); assign(s, 2, c2); cut_push(s); } static void cell_cast(addr c, addr s) { addr x = lock_test_and_set(s, 0, c); dbg(4, "cell_cast s="PA" c="PA" x="PA"\n", s, c, x); if (x) { assign(s, 1, c); assign(s, 2, x); cut_push(s); } } static void cell_cast_to(addr c, addr s, addr a) { addr x = lock_test_and_set(s, 0, c); dbg(4, "cell_cast s="PA" c="PA" x="PA"\n", s, c, x); if (x) { assign(s, 1, c); assign(s, 2, x); out_push(a, s); } } static void cell_join(addr c1, addr c2) { addr x1 = lock_test_and_set(c1, 0, 0); addr x2 = lock_test_and_set(c2, 0, 0); dbg(4, "cell_join s1="PA" s2="PA" x1="PA" x2="PA"\n", c1, c2, x1, x2); if (x1 && x2) { slot_drop(c1); slot_drop(c2); cell_sync(x1, x2); } else if (x1) { slot_drop(c1); cell_cast(x1, c2); } else if (x2) { slot_drop(c2); cell_cast(x2, c1); } else { // asymetric forward addr p = slot_draw(); assign(p, 0, 2); assign(p, 1, c2); cell_cast(p, c1); } } #line 2 "eng.c" uint64_t engine_formatid = 0x00000000726e696c; // "linr" uint64_t engine_version = 0; struct engine_s { uint64_t formatid; uint64_t version; uint64_t eng_table_offset; uint64_t eng_rules_offset; uint64_t eng_kinds; uint64_t eng_rules; uint64_t eng_shift; uint64_t unused; }; typedef struct engine_s engine_t; engine_t engine; uintptr_t eng_ptr; uintptr_t eng_size; uintptr_t eng_table_ptr; uintptr_t eng_rules_ptr; inline int leveled(addr code) { return (code >> 8) >= 64; } void eng_dforward(addr c1, addr c2) { dbg(5, "eng_rule dforward\n"); addr s1 = deref(c1, 1); addr s2 = deref(c2, 1); slot_drop(c1); slot_drop(c2); cell_join(s1, s2); } void eng_forward(addr c1, addr c2) { dbg(5, "eng_rule forward\n"); addr s = deref(c1, 1); slot_drop(c1); cell_cast(c2, s); } void eng_output(addr co, addr c) { dbg(5, "eng_rule output\n"); addr cs = deref(co, 1); addr s = slot_draw(); assign(s, 1, co); assign(s, 2, c); out_push(deref(cs, 1), s); kill(*(pid_t *)ptr(cs, 2), SIGIO); } void eng_mcross(addr c1, addr c2) { dbg(5, "eng_rule mcross\n"); addr k1 = deref(c1, 0); addr k2 = deref(c2, 0); int arity1 = k1 & 3; int arity2 = k2 & 3; addr c11 = 0, c12 = 0; addr c21 = 0, c22 = 0, c23 = 0; if (arity1 > 0) c11 = deref(c1, 1); if (arity1 > 1) c12 = deref(c1, 2); if (arity2 > 0) c21 = deref(c2, 1); if (arity2 > 1) c22 = deref(c2, 2); if (arity2 > 2) c23 = deref(c2, 3); int lvl = leveled(k1) ? deref(c1, 3) : -1; slot_drop(c1); slot_drop(c2); addr p11 = 0, p12 = 0; if (arity1 > 0) { p11 = slot_draw(); assign(p11, 0, k2); } if (arity1 > 1) { p12 = slot_draw(); assign(p12, 0, k2); } if (arity2 > 0) { addr p21 = slot_draw(); assign(p21, 0, k1); if (arity1 > 0) cell_link(p11, 1, p21, 1); if (arity1 > 1) cell_link(p12, 1, p21, 2); if (lvl >= 0) assign(p21, 3, lvl + 0); //todo cell_cast(p21, c21); } if (arity2 > 1) { addr p22 = slot_draw(); assign(p22, 0, k1); if (arity1 > 0) cell_link(p11, 2, p22, 1); if (arity1 > 1) cell_link(p12, 2, p22, 2); if (lvl >= 0) assign(p22, 3, lvl + 0); cell_cast(p22, c22); } if (arity2 > 2) { addr p23 = slot_draw(); assign(p23, 0, k1); if (arity1 > 0) cell_link(p11, 3, p23, 1); if (arity1 > 1) cell_link(p12, 3, p23, 2); if (lvl >= 0) assign(p23, 3, lvl + 0); cell_cast(p23, c23); } if (arity1 > 0) cell_cast(p11, c11); if (arity1 > 1) cell_cast(p12, c12); } void eng_mjoin(addr c1, addr c2) { dbg(5, "eng_rule mjoin\n"); addr k1 = deref(c1, 0); int arity = k1 & 3; addr c11 = 0, c12 = 0, c13 = 0; addr c21 = 0, c22 = 0, c23 = 0; if (arity > 0) { c11 = deref(c1, 1); c21 = deref(c2, 1); } if (arity > 1) { c12 = deref(c1, 2); c22 = deref(c2, 2); } if (arity > 2) { c13 = deref(c1, 3); c23 = deref(c2, 3); } slot_drop(c1); slot_drop(c2); if (arity > 0) cell_join(c11, c21); if (arity > 1) cell_join(c12, c22); if (arity > 2) cell_join(c13, c23); } addr get_link(addr *p) { if (*p) return *p; addr s = slot_draw(); assign(s, 0, 0); *p = s; return s; } void eng_table(addr c1, addr c2) { addr k1 = deref(c1, 0); addr k2 = deref(c2, 0); int arity1 = k1 & 3; int arity2 = k2 & 3; int kind1 = k1 >> 8; int kind2 = k2 >> 8; int index = ((kind1 - 1) << engine.eng_shift) + (kind2 - 1); uint_fast16_t offset = *((uint16_t *)eng_table_ptr + index); if (offset == (uint16_t)-1) { dbg(1, "reduction error k1="PT" k2="PT"\n", k1, k2); err("reduction error k1="PT" k2="PT"\n", k1, k2); return; } dbg(5, "eng_rule %d-%d\n", kind1, kind2); uint8_t *p = (uint8_t *)(eng_rules_ptr + offset); uint8_t *p_max = p + p[0]; p++; addr linkt[16] = {}; int i = 0; if (arity1 > 0) linkt[i++] = deref(c1, 1); if (arity1 > 1) linkt[i++] = deref(c1, 2); if (arity1 > 2) linkt[i++] = deref(c1, 3); if (arity2 > 0) linkt[i++] = deref(c2, 1); if (arity2 > 1) linkt[i++] = deref(c2, 2); if (arity2 > 2) linkt[i++] = deref(c2, 3); slot_drop(c1); slot_drop(c2); while (p < p_max) { if (p[0] == 0) { // link dbg(5, "eng_link n1=%d n2=%d a1="PA" a2="PA"\n", p[1], p[2], linkt[p[1]], linkt[p[2]]); cell_join(linkt[p[1]], linkt[p[2]]); p += 3; } else { int kind = p[0]; int arity = p[1]; addr code = (kind << 8) + arity; addr c = get_link(linkt + p[2]); dbg(5, "eng_cell k="PT" n=%d a="PA"\n", code, p[2], linkt[p[2]]); addr pc = slot_draw(); assign(pc, 0, code); if (arity > 0) assign(pc, 1, get_link(linkt + p[3])); if (arity > 1) assign(pc, 2, get_link(linkt + p[4])); if (arity > 2) assign(pc, 3, get_link(linkt + p[5])); if (leveled(code)) assign(pc, 3, 0); cell_cast(pc, c); p += arity + 3; } } } void reduce(addr cx, addr cy) { addr kx = deref(cx, 0); addr ky = deref(cy, 0); dbg(3, "eng_redex c1="PA" c2="PA" k1="PT" k2="PT"\n", cx, cy, kx, ky); if (kx == 2 && ky == 2) eng_dforward(cx, cy); else if (kx == 2) eng_forward(cx, cy); else if (ky == 2) eng_forward(cy, cx); else if (kx == 3) eng_output(cx, cy); else if (ky == 3) eng_output(cy, cx); else if (leveled(kx) && leveled(ky)) { addr lvlx = deref(cx, 3), lvly = deref(cy, 3); if (lvlx > lvly) eng_mcross(cx, cy); else if (lvlx < lvly) eng_mcross(cy, cx); else if (kx == ky) eng_mjoin(cx, cy); else { dbg(1, "reduction error\n"); err("reduction error\n"); } } else if (leveled(kx)) eng_mcross(cx, cy); else if (leveled(ky)) eng_mcross(cy, cx); else { // casual reduction if (kx >> 8 < ky >> 8) eng_table(cx, cy); else eng_table(cy, cx); } } void compute() { while (up) { addr s = cut_pop(); if (!s) continue; addr cx = deref(s, 1); addr cy = deref(s, 2); slot_drop(s); reduce(cx, cy); } } #line 2 "map.c" // todo: search trees addr map_get(addr a, int o, addr i) { addr p = a; a = deref(p, o); while (a != 0) { if (deref(a, 2) == i) { assign(p, o, deref(a, 3)); dbg(4, "map_out id=%d c="PA"\n", (int)i, a); return a; } p = a; o = 3; a = deref(p, o); } return 0; } addr map_put(addr a, int o, addr i) { addr s = slot_draw(); assign(s, 0, 0); assign(s, 2, i); assign(s, 3, deref(a, o)); assign(a, o, s); dbg(4, "map_in id=%d c="PA"\n", (int)i, s); return s; } addr map_bind(addr a, int o, addr i) { addr c = map_get(a, o, i); if (c) return c; else return map_put(a, o, i); } #line 2 "serve.c" const int server_version = 1; char *server_path = NULL; int server_client_max = 512; int server_listen_queue = 16; int server_buffers_size = 4096; struct client_s { int id; int fd; int r, w; unsigned char *buf_r, *buf_w; int ri, wi; int ic, oc; // inputs and outputs count addr s; // client slot addr cuts; // cut slot addr name_count; }; typedef struct client_s client_t; struct clientp_s { struct clientp_s *next; client_t *this; }; typedef struct clientp_s clientp_t; int sfd = -1; clientp_t *clientp; clientp_t *clientp_free; clientp_t *clientp_first; struct pollfd *ps; addr new_output(client_t *c, addr s) { addr arg = c->name_count; c->name_count += 2; addr p = slot_draw(); assign(p, 0, 3); assign(p, 1, c->s); assign(p, 2, (addr)arg); cell_cast_to(p, s, c->cuts); c->oc += 1; return arg; } void read_bufs(client_t *c) { int i = 0; while (i < c->ri) { unsigned char *p = c->buf_r + i; int length = p[0]; if (length < 2) break; // protocol error if (length > 64) err("unhandled\n"); if (i + length > c->ri) break; // not enough data addr args[5] = {}; int nargs = 0; int shift = 0; for (int x = 1; x < length; x++) { if (nargs >= 5) err("unhandled\n"); args[nargs] += (p[x] & 127) << shift; if (p[x] < 128) { nargs++; shift = 0; } else shift += 7; } #ifdef DEBUG char buf[64], *bufp = buf; for (int k = 0; k < nargs; k++) bufp += snprintf(bufp, 63, "%02d:", (int)args[k]); if (bufp > buf) *(bufp - 1) = 0; dbg(2, "c%d read data=%s\n", c->fd, buf); #endif if (args[0] == 0) { if (args[1] == 0) { // link addr s1 = map_bind(c->s, 0, args[2]); addr s2 = map_bind(c->s, 0, args[3]); cell_join(s1, s2); } if (args[1] == 1) { // get addr s = map_bind(c->s, 0, args[3]); addr p = slot_draw(); assign(p, 0, 3); assign(p, 1, c->s); assign(p, 2, args[2]); cell_cast(p, s); c->oc++; } } else { // node int arity = nargs - 2; addr code = (args[0] << 8) + (arity & 3); addr s = map_bind(c->s, 0, args[1]); addr p = slot_draw(); assign(p, 0, code); if (arity > 0) assign(p, 1, map_bind(c->s, 0, args[2])); if (arity > 1) assign(p, 2, map_bind(c->s, 0, args[3])); if (arity > 2) assign(p, 3, map_bind(c->s, 0, args[4])); if (leveled(code)) assign(p, 3, args[nargs - 1]); cell_cast(p, s); } i += length; } c->ri -= i; if (c->ri) memmove(c->buf_r, c->buf_r + i, c->ri); } void write_bufs(client_t *c) { while (c->wi + 64 <= server_buffers_size) { // ensures enough buffer space addr s = out_pop(c->cuts); if (!s) break; addr s1 = deref(s, 1); addr s2 = deref(s, 2); slot_drop(s); addr code = deref(s2, 0); dbg(3, "out_redex c1="PA" c2="PA" k1="PT" k2="PT"\n", s1, s2, deref(s1, 0), code); if (code == 2) { // forward addr p = deref(s2, 1); slot_drop(s2); cell_cast_to(s1, p, c->cuts); continue; } addr args[5] = { 0 }; int nargs; slot_drop(s1); if (code == 3) { // axiom args[0] = 0; args[1] = 0; args[2] = deref(s1, 2); args[3] = deref(s2, 2); if (deref(s2, 1) != c->s) { dbg(1, "internal error: output mismatch\n"); continue; } nargs = 4; c->oc -= 2; } else { // node int arity = code & 3; args[0] = code >> 8; args[1] = deref(s1, 2); if (arity > 0) args[2] = new_output(c, deref(s2, 1)); if (arity > 1) args[3] = new_output(c, deref(s2, 2)); if (arity > 2) args[4] = new_output(c, deref(s2, 3)); nargs = 2 + arity; if (leveled(code)) args[nargs++] = deref(s2, 3); c->oc -= 1; } slot_drop(s2); #ifdef DEBUG char buf[64], *bufp = buf; for (int k = 0; k < nargs; k++) bufp += snprintf(bufp, 63, "%02d:", (int)args[k]); if (bufp > buf) *(bufp - 1) = 0; dbg(2, "c%d write data=%s\n", c->fd, buf); #endif unsigned char *p = c->buf_w + c->wi; int i = 1; for (int k = 0; k < nargs; k++) { while (args[k] > 127) { p[i++] = 128 + (args[k] & 127); args[k] = args[k] >> 7; } p[i++] = args[k]; } if (i > 64) err("unhandled\n"); p[0] = i; c->wi += i; } } client_t *add_client(int fd) { dbg(1, "add c%d\n", fd); client_t *c = (client_t *)malloc(sizeof(client_t)); c->fd = fd; c->r = 1; c->w = 1; c->ri = 0; c->wi = 0; c->oc = 0; c->name_count = 1; c->buf_r = (unsigned char *)malloc(server_buffers_size); c->buf_w = (unsigned char *)malloc(server_buffers_size); c->cuts = slot_draw(); list_init(c->cuts, 2); c->s = slot_draw(); assign(c->s, 0, 0); // entries map assign(c->s, 1, c->cuts); // cut slot *(pid_t *)ptr(c->s, 2) = getpid(); // pid #ifdef DEBUG stat_incr(glb->stat_client); #endif return c; } void del_client(client_t *c) { dbg(1, "del c%d\n", c->fd); close(c->fd); free(c->buf_r); free(c->buf_w); list_destroy(c->cuts); slot_drop(c->cuts); slot_drop(c->s); free(c); #ifdef DEBUG stat_decr(glb->stat_client); #endif } void do_recv(client_t *c) { dbg(3, "c%d receiving...\n", c->fd); int ret = recv(c->fd, c->buf_r + c->ri, server_buffers_size - c->ri, 0); if (ret > 0) { c->ri += ret; #ifdef DEBUG glb->stat_bytes_in += ret; dbg(3, "c%d received %d\n", c->fd, ret); #endif } else if (ret == 0) c->r = 0; else if (ret == -1 && errno != EAGAIN) { dbg(1, "c%d recv error %d\n", c->fd, errno); c->r = 0; } } void do_send(client_t *c) { dbg(3, "c%d sending %d...\n", c->fd, c->wi); int ret = send(c->fd, c->buf_w, c->wi, 0); if (ret >= 0) { if (c->wi > server_buffers_size - 64) raise(SIGIO); c->wi -= ret; #ifdef DEBUG glb->stat_bytes_out += ret; dbg(3, "c%d sent %d\n", c->fd, ret); #endif if (c->wi) memmove(c->buf_w + ret, c->buf_w, c->wi); } else if (ret == 0) c->w = 0; else if (ret == -1 && errno != EAGAIN) { dbg(1, "c%d send error %d\n", c->fd, errno); c->w = 0; } } void socket_async(int fd) { int flag; if (fcntl(fd, F_SETOWN, getpid()) == -1 || (flag = fcntl(fd, F_GETFL, 0)) == -1 || fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_ASYNC) == -1) err("failed to set socket parameters, errno=%d\n", errno); } void io() { int n = 1; for (clientp_t *p = clientp_first; p; p = p->next) { client_t *c = p->this; ps[n].fd = c->fd; ps[n].events = POLLERR; if (c->r) ps[n].events |= POLLIN; if (c->w && c->wi) ps[n].events |= POLLOUT; n++; } if (poll(ps, n, 0) != -1) { clientp_t **p = &clientp_first; for (int i = 1; *p; i++) { clientp_t *cp = *p; client_t *c = cp->this; dbg(3, "c%d polled outputs=%d\n", c->fd, c->oc); if (ps[i].revents & POLLIN) do_recv(c); if (ps[i].revents & POLLOUT) do_send(c); if (ps[i].revents & POLLERR) { c->w = 0; dbg(1, "c%d error\n", c->fd); } if (c->oc == 0 && (c->w == 0 || (c->r == 0 && c->wi == 0))) { del_client(c); *p = cp->next; cp->next = clientp_free; clientp_free = cp; } else p = &(*p)->next; } if (ps[0].revents & POLLIN) { if (clientp_free) { int s = accept(sfd, NULL, NULL); if (s == -1) wrn("accept failed, errno=%d\n", errno); else { socket_async(s); clientp_t *new = clientp_free; clientp_free = clientp_free->next; new->this = add_client(s); new->this->id = new - clientp; new->next = clientp_first; clientp_first = new; do_recv(new->this); // input may not raise a sigio } } else wrn("maximum connections reached (%d)", server_client_max); } } else if (errno != EINTR && errno != EAGAIN) wrn("poll failed, errno=%d\n", errno); } void io_handler(int sig) { dbg(2, "io signal\n"); } int setup_socket() { struct sockaddr_un addr; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, server_path, sizeof(addr.sun_path) - 1); int sfd = socket(PF_UNIX, SOCK_STREAM, 0); if (sfd == -1) err("failed to allocate server socket, errno=%d\n", errno); socket_async(sfd); if (bind(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) err("failed to bind %s, errno=%d\n", server_path, errno); ps[0].fd = sfd; ps[0].events = POLLIN; return sfd; } void server() { ps = malloc((server_client_max + 1) * sizeof(struct pollfd)); clientp = malloc(server_client_max * sizeof(clientp_t)); int i; for (i = 0; i < server_client_max - 1; i++) clientp[i].next = &clientp[i+1]; clientp[i].next = NULL; clientp_free = clientp; clientp_first = NULL; sfd = setup_socket(); sigset_t sigempty_set; sigemptyset(&sigempty_set); sigset_t sigio_set; sigemptyset(&sigio_set); sigaddset(&sigio_set, SIGIO); struct sigaction siga; siga.sa_flags = 0; sigemptyset(&siga.sa_mask); siga.sa_handler = io_handler; sigaction(SIGIO, &siga, NULL); sigprocmask(SIG_BLOCK, &sigio_set, NULL); if (listen(sfd, server_listen_queue) == -1) err("cannot listen, errno=%d\n", errno); while (sigsuspend(&sigempty_set), up) { dbg(3, "+++\n"); for (clientp_t *p = clientp_first; p; p = p->next) write_bufs(p->this); dbg(2, "io <<<\n"); io(); dbg(2, "io >>>\n"); for (clientp_t *p = clientp_first; p; p = p->next) read_bufs(p->this); while (ncuts > 0) cut_flush(); dbg(3, "---\n"); } } void server_down() { free(ps); free(clientp); if (sfd >= 0) { close(sfd); if (unlink(server_path) == -1) wrn("failed to unlink server socket, errno=%d\n", errno); } } #line 2 "run.c" char *engine_path = NULL; uint64_t mem_min = 0; uint64_t mem_max = 64 * 1024 * SSIZE; uint64_t mem_pagesize = 0; int procs = 4; void sigh(int sig) { dbg(1, "signal %d\n", sig); #ifdef DEBUG if (dbgfile) fflush(dbgfile); #endif switch (sig) { case SIGTERM: case SIGINT: case SIGQUIT: up = 0; break; case SIGTSTP: raise(SIGSTOP); break; } } void workers() { fflush(stdout); fflush(stderr); for (int k = 1; k <= procs; k++) if (fork() == 0) { dbgf("p%d", k); compute(); dbgc(); exit(0); } } void help(char *bin) { inf("usage: %s engine socket\n" "options:\n" #ifdef DEBUG " -d=%-8d debug level\n" #endif " -p=%-8d number of worker processes\n" " -m=%-8"PRIu64" upper memory limit, in KiB\n" " -c=%-8d network connection limit\n" " -b=%-8d network buffers size\n" " -q=%-8d network listen queue size\n", bin, #ifdef DEBUG debug_level, #endif procs, mem_max / 1024, server_client_max, server_buffers_size, server_listen_queue ); } inline char *arg(char *a) { if (a[2] != '=') err("invalid options\n"); return a + 3; } void parse(char **argv) { char **c = argv; while (*(++c) != NULL) { if ((*c)[0] != '-') { if (!engine_path) engine_path = *c; else if (!server_path) server_path = *c; else err("invalid options\n"); } else switch ((*c)[1]) { case 'h': if ((*c)[2] != '\0') err("invalid options\n"); help(argv[0]); exit(0); break; #ifdef DEBUG case 'd': debug_level = atoi(arg(*c)); break; #endif case 'p': procs = atoi(arg(*c)); break; case 'm': mem_max = 1024 * atoll(arg(*c)); break; case 'c': server_client_max = atoi(arg(*c)); break; case 'b': server_buffers_size = atoi(arg(*c)); break; case 'q': server_listen_queue = atoi(arg(*c)); break; default: err("unknown option\n"); } } } uintptr_t map(int prot, int opts, int fd, size_t size) { void *p = mmap(0, size, prot, opts, fd, 0); if (p == MAP_FAILED) { if (errno == ENOMEM) err("mmap failed, not enough memory\n"); err("mmap failed, errno=%d\n", errno); } return (uintptr_t)p; } void init() { mem_pagesize = getpagesize(); mem_min = (1 + sizeof(glb) / mem_pagesize) * mem_pagesize; } void setup() { inf("arch: wordsize=%d pagesize=%d sem_t=%d pid_t=%d\n", __WORDSIZE, (int)mem_pagesize, 8 * (int)sizeof(sem_t), 8 * (int)sizeof(pid_t)); inf("sizes: addr=%d slot=%d quanta=%d\n", 8 * (int)sizeof(maddr), 8 * (int)SSIZE, 8 << ASHIFT); int fd_engine = open(engine_path, O_RDWR); if (fd_engine == -1) err("%s, open failed\n", engine_path); struct stat sb; if (fstat(fd_engine, &sb) < 0) err("%s, stat failed\n", engine_path); if (read(fd_engine, (char *)&engine, sizeof(engine_t)) == - 1) err("%s, read failed\n", engine_path); if (engine.formatid != engine_formatid) err("%s, unknown engine format\n", engine_path); if (engine.version != engine_version) err("%s, unknown engine version\n", engine_path); inf("engine: kinds=%d rules=%d (%zuKiB)\n", (int)engine.eng_kinds, (int)engine.eng_rules, sb.st_size / 1024); eng_ptr = map(PROT_READ, MAP_SHARED, fd_engine, sb.st_size); eng_size = sb.st_size; eng_table_ptr = eng_ptr + engine.eng_table_offset; eng_rules_ptr = eng_ptr + engine.eng_rules_offset; if (mem_maxbits < 64 && (mem_max - 1) >> mem_maxbits) err("requested memory cannot be addressed\n"); inf("slots: reserved=%"PRIu64" max=%"PRIu64" (%zuKiB) \n", mem_min / SSIZE, mem_max / SSIZE, mem_max / 1024); int flags = MAP_SHARED | MAP_ANONYMOUS | MAP_NORESERVE; mem_ptr = map(PROT_READ | PROT_WRITE, flags, -1, mem_max); mem_size = mem_max; glb = (struct global *)mem_ptr; glb->slot_base = mem_min >> ASHIFT; glb->slot_spawn = mem_min >> ASHIFT; glb->slot_limit = mem_max >> ASHIFT; if (sem_init(&glb->sem_procs, 1, 0)) err("semaphore initialization, errno=%d\n", errno); for (int i = 0; i < 4; i++) if (sem_init(&glb->sems[i], 1, 1)) err("semaphore initialization, errno=%d\n", errno); list_init(glb_slots, 0); list_init(glb_cuts, 1); close(fd_engine); } void cleanup() { list_destroy(glb_slots); list_destroy(glb_cuts); sem_destroy(&glb->sem_procs); for (int i = 0; i < 4; i++) sem_destroy(&glb->sems[i]); munmap((void *)mem_ptr, mem_size); munmap((void *)eng_ptr, eng_size); } void down() { server_down(); if (up) kill(0, SIGTERM); while (wait(0) != -1); dbg(1, "shutdown\n"); #ifdef DEBUG if (glb->stat_client[0]) stats(); #endif cleanup(); dbgc(); } int main(int argc, char **argv) { init(); parse(argv); if (!engine_path) err("no engine specified\n"); if (!server_path) err("no socket specified\n"); if (procs <= 0 || procs > 1000) err("invalid process quantity\n"); if (mem_max <= mem_min) err("unaffordable memory limit\n"); if (server_buffers_size < 128) err("insufficient buffers size\n"); if (server_listen_queue <= 0) err("invalid listen queue size\n"); setup(); inf("server: version=%d limit=%d queue=%d buffers=%d \n", server_version, server_client_max, server_listen_queue, server_buffers_size); inf("workers: procs=%d\n", procs); struct sigaction siga; siga.sa_flags = 0; sigemptyset(&siga.sa_mask); siga.sa_handler = sigh; sigaction(SIGHUP, &siga, NULL); sigaction(SIGTERM, &siga, NULL); sigaction(SIGINT, &siga, NULL); sigaction(SIGQUIT, &siga, NULL); sigaction(SIGTSTP, &siga, NULL); sigaction(SIGCONT, &siga, NULL); sigaction(SIGPIPE, &siga, NULL); up = 1; setsid(); workers(); dbgf("mn"); dbg(1, "startup\n"); atexit(down); server(); return 0; }