X-Git-Url: http://nsz.repo.hu/git/?a=blobdiff_plain;f=src%2Fthread%2Fsynccall.c;h=ba2f258e8161962a63a4f53a82ff3be04b0669e7;hb=f66022dda8d18e6732626c7806f6c4d32023d574;hp=2cd25e4bb2b84315ca4adb9c562d4592eebcb55a;hpb=407d933052c310ebc5541dae2ecd8c4bd8f55fb9;p=musl diff --git a/src/thread/synccall.c b/src/thread/synccall.c index 2cd25e4b..ba2f258e 100644 --- a/src/thread/synccall.c +++ b/src/thread/synccall.c @@ -1,117 +1,178 @@ #include "pthread_impl.h" #include +#include +#include +#include +#include +#include "futex.h" +#include "atomic.h" +#include "../dirent/__dirent.h" static struct chain { struct chain *next; - sem_t sem, sem2; -} *head, *cur; + int tid; + sem_t target_sem, caller_sem; +} *volatile head; +static volatile int synccall_lock[1]; +static volatile int target_tid; static void (*callback)(void *), *context; -static int chainlen; -static sem_t chainlock, chaindone; -static pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER; +static volatile int dummy = 0; +weak_alias(dummy, __block_new_threads); -static void handler(int sig, siginfo_t *si, void *ctx) +static void handler(int sig) { struct chain ch; - pthread_t self = __pthread_self(); int old_errno = errno; - if (chainlen == libc.threads_minus_1) return; + sem_init(&ch.target_sem, 0, 0); + sem_init(&ch.caller_sem, 0, 0); - sigqueue(self->pid, SIGSYNCCALL, (union sigval){0}); + ch.tid = __syscall(SYS_gettid); - /* Threads which have already decremented themselves from the - * thread count must not act. Block further receipt of signals - * and return. */ - if (self->dead) { - memset(&((ucontext_t *)ctx)->uc_sigmask, -1, 8); - errno = old_errno; - return; - } - - sem_init(&ch.sem, 0, 0); - sem_init(&ch.sem2, 0, 0); + do ch.next = head; + while (a_cas_p(&head, ch.next, &ch) != ch.next); - while (sem_wait(&chainlock)); - ch.next = head; - head = &ch; - if (++chainlen == libc.threads_minus_1) sem_post(&chaindone); - sem_post(&chainlock); + if (a_cas(&target_tid, ch.tid, 0) == (ch.tid | 0x80000000)) + __syscall(SYS_futex, &target_tid, FUTEX_UNLOCK_PI|FUTEX_PRIVATE); - while (sem_wait(&ch.sem)); + sem_wait(&ch.target_sem); callback(context); - sem_post(&ch.sem2); - while (sem_wait(&ch.sem)); + sem_post(&ch.caller_sem); + sem_wait(&ch.target_sem); errno = old_errno; } -void __synccall_wait() -{ - struct chain *ch = cur; - sem_post(&ch->sem2); - while (sem_wait(&ch->sem)); - sem_post(&ch->sem); -} - void __synccall(void (*func)(void *), void *ctx) { - pthread_t self; - struct sigaction sa; - struct chain *next; - uint64_t oldmask; - - if (!libc.threads_minus_1) { - func(ctx); - return; - } - - pthread_rwlock_wrlock(&lock); - - __syscall(SYS_rt_sigprocmask, SIG_BLOCK, (uint64_t[]){-1}, &oldmask, 8); + sigset_t oldmask; + int cs, i, r, pid, self;; + DIR dir = {0}; + struct dirent *de; + struct sigaction sa = { .sa_flags = SA_RESTART, .sa_handler = handler }; + struct chain *cp, *next; + struct timespec ts; + + /* Blocking signals in two steps, first only app-level signals + * before taking the lock, then all signals after taking the lock, + * is necessary to achieve AS-safety. Blocking them all first would + * deadlock if multiple threads called __synccall. Waiting to block + * any until after the lock would allow re-entry in the same thread + * with the lock already held. */ + __block_app_sigs(&oldmask); + LOCK(synccall_lock); + __block_all_sigs(0); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cs); + + head = 0; + + if (!libc.threaded) goto single_threaded; - sem_init(&chaindone, 0, 0); - sem_init(&chainlock, 0, 1); - chainlen = 0; callback = func; context = ctx; - sa.sa_flags = SA_SIGINFO | SA_RESTART; - sa.sa_sigaction = handler; - sigfillset(&sa.sa_mask); - __libc_sigaction(SIGSYNCCALL, &sa, 0); + /* This atomic store ensures that any signaled threads will see the + * above stores, and prevents more than a bounded number of threads, + * those already in pthread_create, from creating new threads until + * the value is cleared to zero again. */ + a_store(&__block_new_threads, 1); - self = __pthread_self(); - sigqueue(self->pid, SIGSYNCCALL, (union sigval){0}); - while (sem_wait(&chaindone)); + /* Block even implementation-internal signals, so that nothing + * interrupts the SIGSYNCCALL handlers. The main possible source + * of trouble is asynchronous cancellation. */ + memset(&sa.sa_mask, -1, sizeof sa.sa_mask); + __libc_sigaction(SIGSYNCCALL, &sa, 0); - for (cur=head; cur; cur=cur->next) { - sem_post(&cur->sem); - while (sem_wait(&cur->sem2)); + pid = __syscall(SYS_getpid); + self = __syscall(SYS_gettid); + + /* Since opendir is not AS-safe, the DIR needs to be setup manually + * in automatic storage. Thankfully this is easy. */ + dir.fd = open("/proc/self/task", O_RDONLY|O_DIRECTORY|O_CLOEXEC); + if (dir.fd < 0) goto out; + + /* Initially send one signal per counted thread. But since we can't + * synchronize with thread creation/exit here, there could be too + * few signals. This initial signaling is just an optimization, not + * part of the logic. */ + for (i=libc.threads_minus_1; i; i--) + __syscall(SYS_kill, pid, SIGSYNCCALL); + + /* Loop scanning the kernel-provided thread list until it shows no + * threads that have not already replied to the signal. */ + for (;;) { + int miss_cnt = 0; + while ((de = readdir(&dir))) { + if (!isdigit(de->d_name[0])) continue; + int tid = atoi(de->d_name); + if (tid == self || !tid) continue; + + /* Set the target thread as the PI futex owner before + * checking if it's in the list of caught threads. If it + * adds itself to the list after we check for it, then + * it will see its own tid in the PI futex and perform + * the unlock operation. */ + a_store(&target_tid, tid); + + /* Thread-already-caught is a success condition. */ + for (cp = head; cp && cp->tid != tid; cp=cp->next); + if (cp) continue; + + r = -__syscall(SYS_tgkill, pid, tid, SIGSYNCCALL); + + /* Target thread exit is a success condition. */ + if (r == ESRCH) continue; + + /* The FUTEX_LOCK_PI operation is used to loan priority + * to the target thread, which otherwise may be unable + * to run. Timeout is necessary because there is a race + * condition where the tid may be reused by a different + * process. */ + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_nsec += 10000000; + if (ts.tv_nsec >= 1000000000) { + ts.tv_sec++; + ts.tv_nsec -= 1000000000; + } + r = -__syscall(SYS_futex, &target_tid, + FUTEX_LOCK_PI|FUTEX_PRIVATE, 0, &ts); + + /* Obtaining the lock means the thread responded. ESRCH + * means the target thread exited, which is okay too. */ + if (!r || r == ESRCH) continue; + + miss_cnt++; + } + if (!miss_cnt) break; + rewinddir(&dir); } - func(ctx); + close(dir.fd); - for (cur=head; cur; cur=next) { - next = cur->next; - sem_post(&cur->sem); + /* Serialize execution of callback in caught threads. */ + for (cp=head; cp; cp=cp->next) { + sem_post(&cp->target_sem); + sem_wait(&cp->caller_sem); } - sa.sa_flags = 0; sa.sa_handler = SIG_IGN; __libc_sigaction(SIGSYNCCALL, &sa, 0); - __syscall(SYS_rt_sigprocmask, SIG_SETMASK, &oldmask, 0, 8); +single_threaded: + func(ctx); - pthread_rwlock_unlock(&lock); -} + /* Only release the caught threads once all threads, including the + * caller, have returned from the callback function. */ + for (cp=head; cp; cp=next) { + next = cp->next; + sem_post(&cp->target_sem); + } -void __synccall_lock() -{ - pthread_rwlock_rdlock(&lock); -} +out: + a_store(&__block_new_threads, 0); + __wake(&__block_new_threads, -1, 1); -void __synccall_unlock() -{ - pthread_rwlock_unlock(&lock); + pthread_setcancelstate(cs, 0); + UNLOCK(synccall_lock); + __restore_sigs(&oldmask); }