added lpp
[libfirm] / ir / lpp / lpp_server.c
1 /**
2  * @file   lpp_server.c
3  * @date   19.07.2005
4  * @author Sebastian Hack
5  *
6  * lpp_solving server.
7  *
8  * Copyright (C) 2005 Universitaet Karlsruhe
9  * Released under the GPL
10  */
11
12 #ifdef _WIN32
13 #error Sorry, lpp_server is for UNIX only.
14 #endif
15
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <sys/time.h>
19 #include <sys/resource.h>
20 #include <sys/wait.h>
21 #include <sys/ipc.h>
22 #include <sys/sem.h>
23
24 #include <netinet/in.h>
25 #include <netdb.h>
26
27 #include <unistd.h>
28 #include <pthread.h>
29
30 #include <time.h>
31 #include <time.h>
32 #include <limits.h>
33 #include <errno.h>
34 #include <signal.h>
35 #include <stdlib.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <stdarg.h>
39
40 #include "debug.h"
41 #include "list.h"
42 #include "irtools.h"
43 #include "util.h"
44
45 #include "lpp_t.h"
46 #include "lpp_comm.h"
47 #include "lpp_cplex.h"
48
49 #define MAX_JOBS 128
50
51 /* stack size of the solver thread. */
52 static size_t solver_stack_size = PTHREAD_STACK_MIN;
53
54 /* master listening socket. */
55 static int msock;
56
57 /* master semaphore */
58 static int sem;
59
60 /* title of the current process */
61 static char *title;
62 static int title_length;
63
64 static int n_children = 0;
65
66 extern char** environ;
67
68 typedef void (solver_func_t)(lpp_t *lpp);
69
70 typedef struct _job_t {
71         struct list_head list;
72         int id;
73         pthread_t solver;
74         pthread_t session;
75         lpp_comm_t *comm;
76         lpp_t *lpp;
77         solver_func_t *solver_func;
78         time_t received;
79         int csock;
80 } job_t;
81
82 #define set_solver_stack_size(size) solver_stack_size = MAX(PTHREAD_STACK_MIN, (size))
83
84 #define setproctitle(name, args...) snprintf(title,title_length,(name),##args)
85
86 static void initproctitle(int argc, char **argv) {
87         int i;
88         char **envp = environ;
89
90         /*
91          * Move the environment so we can reuse the memory.
92          * (Code borrowed from sendmail.)
93          * WARNING: ugly assumptions on memory layout here;
94          *          if this ever causes problems, #undef DO_PS_FIDDLING
95          */
96         for (i = 0; envp[i] != NULL; i++)
97                 continue;
98         environ = (char **) malloc(sizeof(char *) * (i + 1));
99         if (environ == NULL)
100                 return;
101         for (i = 0; envp[i] != NULL; i++)
102                 if ((environ[i] = strdup(envp[i])) == NULL)
103                         return;
104         environ[i] = NULL;
105
106         title = argv[0];
107         if (i > 0)
108                 title_length = envp[i-1] + strlen(envp[i-1]) - argv[0];
109         else
110                 title_length = argv[argc-1] + strlen(argv[argc-1]) - argv[0];
111
112         argv[1] = NULL;
113         memset(title, 0, title_length);
114         --title_length;
115 }
116
117 static void job_init(job_t *job, lpp_comm_t *comm, lpp_t *lpp, solver_func_t *solver_func)
118 {
119   /* TODO MAXJOBS */
120   memset(job, 0, sizeof(job[0]));
121   job->lpp         = lpp;
122   job->solver_func = solver_func;
123   job->comm        = comm;
124   job->csock       = lpp_comm_fileno(comm);
125   job->received    = time(NULL);
126   job->session     = pthread_self();
127   job->id          = getpid();
128 }
129
130 static firm_dbg_module_t *dbg = NULL;
131
132 /**
133  * Set up a socket.
134  */
135 static int passive_tcp(uint16_t port, int queue_len)
136 {
137   int s;
138   int one = 1;
139   struct protoent    *ppe;
140   struct sockaddr_in sin;
141
142   memset(&sin, 0, sizeof(sin));
143   sin.sin_family       = AF_INET;
144   sin.sin_addr.s_addr  = INADDR_ANY;
145   sin.sin_port         = htons(port);
146
147   ppe = getprotobyname("tcp");
148   s = socket(PF_INET, SOCK_STREAM, ppe->p_proto);
149   setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
150
151   if(bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) {
152           perror("bind server socket");
153           exit(1);
154   }
155
156   if(listen(s, queue_len) < 0) {
157           perror("listen server socket");
158           exit(1);
159   }
160
161   return s;
162 }
163
164 static void dummy_solver(lpp_t *lpp)
165 {
166   int i;
167
168   for(i = 0; i < lpp->var_next; ++i) {
169     lpp->vars[i]->value = i;
170     lpp->vars[i]->value_kind = lpp_value_solution;
171   }
172
173   if(lpp->log)
174           fprintf(lpp->log, "dummy solver exiting now.\n");
175
176   sleep(1);
177   lpp->sol_time = 0.0;
178   lpp->iterations = 0;
179   lpp->sol_state = lpp_optimal;
180 }
181
182 static void segv_solver(lpp_t *lpp)
183 {
184   int i;
185
186   for(i = 0; i < lpp->var_next; ++i) {
187     lpp->vars[i]->value = i;
188     lpp->vars[i]->value_kind = lpp_value_solution;
189   }
190
191   if(lpp->log)
192           fprintf(lpp->log, "segv dummy solver exiting now.\n");
193
194   sleep(1);
195   *((int *) 0) = 1;
196 }
197
198 #define DEFAULT_SOLVER lpp_solve_cplex
199
200 struct {
201   solver_func_t *solver;
202   const char    *name;
203   int           n_instances;
204 } solvers[] = {
205   { lpp_solve_cplex,   "cplex",   1 },
206   { dummy_solver,      "dummy",   2 },
207   { segv_solver,       "segv",    2 },
208 };
209
210
211 static solver_func_t *find_solver(const char *name)
212 {
213   unsigned i;
214
215   for(i = 0; i < sizeof(solvers) / sizeof(solvers[0]); ++i)
216     if(strcmp(solvers[i].name, name) == 0)
217       return solvers[i].solver;
218
219   return NULL;
220 }
221
222 static void *solver_thread(void * data)
223 {
224         job_t *job = data;
225         DBG((dbg, LEVEL_1, "starting solver thread pid %d tid %d\n", getpid(), pthread_self()));
226         setproctitle("lpp_server [problem solving: %s]", job->lpp->name);
227
228         /* I may be cancelled at every time. */
229         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
230
231         /* solve */
232         job->solver_func(job->lpp);
233
234         /* notify the session thread that we are finished. */
235         fclose(job->lpp->log);
236
237         return NULL;
238 }
239
240 static int solve(lpp_comm_t *comm, job_t *job)
241 {
242         char buf[1024];
243         struct timeval tv;
244         fd_set rfds;
245         int fds[2];
246         FILE *rd, *log;
247         int res = 0;
248         int retval, fd_rd, fd_comm;
249         pthread_attr_t solver_thr_attr;
250
251         struct sembuf semops;
252         /* try to acquire a lock for the solver resource. */
253         semops.sem_num = 0;
254         semops.sem_op = -1;
255         semops.sem_flg = SEM_UNDO;
256         retval = semop(sem, &semops, 1);
257         if(retval < 0) {
258                 perror("free semaphore");
259         }
260         DBG((dbg, LEVEL_1, "job %d: solving problem %s\n", job->id, job->lpp->name));
261
262         /* set the log file of the lpp to the pipe. */
263         pipe(fds);
264         DBG((dbg, LEVEL_4, "pipe read %d write %d\n", fds[0], fds[1]));
265
266         fd_comm = lpp_comm_fileno(comm);
267         fd_rd   = fds[0];
268         rd      = fdopen(fd_rd, "r");
269         log     = fdopen(fds[1], "w");
270
271         lpp_set_log(job->lpp, log);
272
273         /* also set the stack size of the solver thread to a considerable amount */
274         pthread_attr_init(&solver_thr_attr);
275         pthread_attr_setstacksize(&solver_thr_attr, solver_stack_size);
276         pthread_create(&job->solver, &solver_thr_attr, solver_thread, job);
277
278         while(1) {
279                 /* set select timeout to 10 seconds */
280                 tv.tv_sec  = 10;
281                 tv.tv_usec = 0;
282
283                 /* set the file descriptors. */
284                 FD_ZERO(&rfds);
285                 FD_SET(fd_rd, &rfds);
286                 FD_SET(fd_comm, &rfds);
287                 DBG((dbg, LEVEL_4, "select %d %d\n", fd_rd, fd_comm));
288                 retval = select(MAX(fd_rd, fd_comm) + 1, &rfds, NULL, NULL, &tv);
289                 DBG((dbg, LEVEL_4, "retval %d\n", retval));
290
291                 /* an event on one of the descriptors arrived. */
292                 if(retval > 0) {
293                         /* A log message arrived. */
294                         if(FD_ISSET(fd_rd, &rfds)) {
295
296                                 /* if there is nothing more to read, the child died; we go home now. */
297                                 if(feof(rd))
298                                         break;
299
300                                 if(fgets(buf, sizeof(buf), rd)) {
301                                         DBG((dbg, LEVEL_4, "receiving log message %s\n", buf));
302                                         /* send the message over the net. */
303                                         lpp_writel(comm, LPP_CMD_INFO);
304                                         lpp_writes(comm, buf);
305                                         lpp_flush(comm);
306                                 }
307                         }
308
309                         /* A network message arrived. */
310                         if(FD_ISSET(fd_comm, &rfds)) {
311                                 int cmd;
312                                 int retval;
313
314                                 retval = read(fd_comm, &cmd, sizeof(cmd));
315                                 if(retval == 0) {
316                                         DBG((dbg, LEVEL_2, "cancelling solver thread tid %d\n", job->solver));
317                                         //pthread_cancel(job->solver);
318                                         exit(1);
319                                         //res = 1;
320                                         //break;
321                                 }
322
323                                 switch(cmd) {
324                                         /* eat senseless data. */
325                                         default:
326                                                 while(read(fd_comm, &cmd, sizeof(cmd)) > 0);
327                                 }
328                                 res = 1;
329                         }
330                 }
331         }
332
333         pthread_join(job->solver, NULL);
334         semops.sem_num = 0;
335         semops.sem_op = 1;
336         semops.sem_flg = SEM_UNDO;
337         retval = semop(sem, &semops, 1);
338         if(retval < 0) {
339                 perror("free semaphore");
340         }
341
342         fclose(rd);
343         return res;
344 }
345
346 static void *session(int fd)
347 {
348         solver_func_t *solver = DEFAULT_SOLVER;
349         lpp_comm_t *comm      = lpp_comm_new(fd, LPP_BUFSIZE);
350
351         DBG((dbg, LEVEL_1, "starting session thread pid %d tid %d\n", getpid(), pthread_self()));
352         setproctitle("lpp_server [child]");
353
354         /* install the signal handler which gets triggered when the child dies. */
355         DBG((dbg, LEVEL_1, "new thread\n"));
356         for(;;) {
357                 char buf[64];
358                 int cmd = lpp_readl(comm);
359
360                 DBG((dbg, LEVEL_2, "command: %s(%d)\n", lpp_get_cmd_name(cmd), cmd));
361                 setproctitle("lpp_server [command %s(%d)]", lpp_get_cmd_name(cmd), cmd);
362                 switch(cmd) {
363                         /* we could not read from the socket, so the connection died. bail out. */
364                         case -1:
365                         case LPP_CMD_BAD:
366                                 goto end;
367
368                         case LPP_CMD_PROBLEM:
369                                 {
370                                         job_t job;
371                                         lpp_t *lpp;
372
373                                         lpp = lpp_deserialize(comm);
374                                         lpp_deserialize_values(comm, lpp, lpp_value_start);
375                                         DBG((dbg, LEVEL_3, "problem %s received\n", lpp->name));
376                                         job_init(&job, comm, lpp, solver);
377
378                                         DBG((dbg, LEVEL_3, "starting job for problem %s\n", lpp->name));
379                                         setproctitle("lpp_server [problem waiting: %s]", lpp->name);
380
381                                         solve(comm, &job);
382                                         lpp_writel(comm, LPP_CMD_SOLUTION);
383                                         lpp_serialize_stats(comm, lpp);
384                                         lpp_serialize_values(comm, lpp, lpp_value_solution);
385                                         lpp_flush(comm);
386                                         DBG((dbg, LEVEL_1, "job %d: finished with problem %s\n", job.id, lpp->name));
387                                         setproctitle("lpp_server [problem finished: %s]", lpp->name);
388
389                                         free_lpp(lpp);
390                                 }
391
392                                 break;
393
394                         case LPP_CMD_SOLVER:
395                                 lpp_readbuf(comm, buf, sizeof(buf));
396                                 solver = find_solver(buf);
397                                 DBG((dbg, LEVEL_2, "setting solver to: %s\n", buf));
398                                 //lpp_send_res(comm, solver != NULL, "could not find solver: %s", buf);
399                                 break;
400
401                         case LPP_CMD_SOLVERS:
402                                 {
403                                         int i, n = ARRAY_SIZE(solvers);
404                                         lpp_writel(comm, n);
405                                         for(i = 0; i < n; ++i)
406                                                 lpp_writes(comm, solvers[i].name);
407                                         lpp_flush(comm);
408                                 }
409                                 break;
410
411                         case LPP_CMD_SET_DEBUG:
412                                 {
413                                         int mask = lpp_readl(comm);
414                                         firm_dbg_set_mask(dbg, mask);
415                                 }
416                                 break;
417
418                         case LPP_CMD_BYE:
419                                 goto end;
420
421                         default:
422                                 fprintf(stderr, "illegal command %d. exiting\n", cmd);
423                                 goto end;
424                 }
425         }
426
427 end:
428         /* signal the queue, bail out and we are free now. */
429         lpp_comm_free(comm);
430         close(fd);
431
432         exit(0);
433 }
434
435 static void child_handler(int sig)
436 {
437   pid_t pid;
438   int status;
439
440   (void) sig;
441
442   pid = waitpid(-1, &status, WNOHANG);
443   do {
444           if(WIFEXITED(status)) {
445                   DBG((dbg, LEVEL_1, "child %d died normally with return value %d\n", pid, WEXITSTATUS(status)));
446                   --n_children;
447                   if(n_children != 0)
448                           setproctitle("lpp_server [main (%d %s)]", n_children, (n_children>1)?"children":"child");
449                   else
450                           setproctitle("lpp_server [main]");
451           } else if(WIFSIGNALED(status)) {
452                   DBG((dbg, LEVEL_1, "child %d died by signal %d\n", pid, WTERMSIG(status)));
453                   --n_children;
454                   if(n_children != 0)
455                           setproctitle("lpp_server [main (%d %s)]", n_children, (n_children>1)?"children":"child");
456                   else
457                           setproctitle("lpp_server [main]");
458           } else
459                   DBG((dbg, LEVEL_1, "child %d did something unexpected\n", pid));
460
461           pid = waitpid(-1, &status, WNOHANG);
462   } while(pid > 0);
463 }
464
465 static void main_loop(void)
466 {
467         int csock;
468
469         DBG((dbg, LEVEL_1, "master pid %d\n", getpid()));
470
471         msock = passive_tcp(LPP_PORT, 10);
472
473         for(;;) {
474                 struct sockaddr_in fsin;
475                 socklen_t len = sizeof(fsin);
476                 pid_t child;
477
478                 csock = accept(msock, (struct sockaddr *) &fsin, &len);
479                 if(csock < 0) {
480                         if(errno == EINTR)
481                                 continue;
482                         else
483                                 fprintf(stderr, "could not accept: %s\n", strerror(errno));
484                 }
485
486                 child = fork();
487                 switch(child) {
488                         case 0: /* we're in the new child, start the session handler */
489                                 close(msock);
490                                 session(csock);
491                                 break;
492                         case -1: /* error, die! */
493                                 perror("fork");
494                                 exit(1);
495                         default: /* if we're in the parent just continue */
496                                 ++n_children;
497                                 setproctitle("lpp_server [main (%d %s)]", n_children, (n_children>1)?"children":"child");
498                                 close(csock);
499                                 break;
500                 }
501         }
502 }
503
504 static void toggle_dbg(int num)
505 {
506   static int mask = 0;
507   (void) num;
508   mask = ~mask;
509   firm_dbg_set_mask(dbg, mask);
510 }
511
512 #define SEMKEYPATH "/tmp/lppkey"
513 #define SEMKEYID 42
514
515 static void print_syntax(void) {
516         fprintf(stderr, "lpp_server [-g <dbg level>] [-s <thread stack size>]\n");
517 }
518
519 int main(int argc, char *argv[])
520 {
521         key_t semkey;
522         int ret;
523         int c;
524         struct sigaction sigact1, sigact2;
525
526         dbg = firm_dbg_register("lpp.server");
527         firm_dbg_set_mask(dbg, 1);
528
529         set_solver_stack_size(128 * 1024 * 1024);
530
531         /* parse options. */
532         while((c = getopt(argc, argv, "s:g:")) != -1) {
533                 switch(c) {
534                         case 's':
535                                 set_solver_stack_size(atoi(optarg));
536                                 break;
537                         case 'g':
538                                 firm_dbg_set_mask(dbg, atoi(optarg));
539                                 break;
540                         default:
541                                 print_syntax();
542                                 exit(1);
543                 }
544         }
545
546         initproctitle(argc, argv);
547         setproctitle("lpp_server [main]");
548
549         memset(&sigact1, 0, sizeof(sigact1));
550         sigact1.sa_handler = toggle_dbg;
551         sigaction(SIGUSR1, &sigact1, NULL);
552
553         memset(&sigact2, 0, sizeof(sigact2));
554         sigact2.sa_handler = child_handler;
555         sigact2.sa_flags = SA_NOCLDSTOP;
556         sigaction(SIGCHLD, &sigact2, NULL);
557
558         /* set up semaphore */
559         semkey = ftok(SEMKEYPATH,SEMKEYID);
560         if ( semkey == (key_t)-1 ) {
561                 perror("ftok() for sem failed");
562                 return 1;
563         }
564
565         sem = semget( semkey, 1, 0666 | IPC_CREAT);// | IPC_EXCL );
566         if ( sem == -1 ) {
567                 perror("semget() failed");
568                 return -1;
569         }
570
571         ret = semctl(sem, 0, SETVAL, 1);
572         if ( ret < 0 ) {
573                 perror("semctl() failed");
574                 return -1;
575         }
576
577         main_loop();
578
579         ret = semctl( sem, 0, IPC_RMID );
580         if (ret < 0) {
581                 printf("semctl() remove id failed\n");
582                 return -1;
583         }
584         return 0;
585 }