#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
+#include <sys/auxv.h>
#include "syscall.h"
#include "atomic.h"
-#include "libc.h"
#include "pthread_impl.h"
/* The following is a threads-based implementation of AIO with minimal
* blocked permanently.
*/
-struct aio_args {
- struct aiocb *cb;
- int op;
- int err;
- sem_t sem;
-};
-
struct aio_thread {
pthread_t td;
struct aiocb *cb;
struct aio_thread *next, *prev;
struct aio_queue *q;
- int running, err, op;
+ volatile int running;
+ int err, op;
ssize_t ret;
};
struct aio_thread *head;
};
+struct aio_args {
+ struct aiocb *cb;
+ struct aio_queue *q;
+ int op;
+ sem_t sem;
+};
+
static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
static struct aio_queue *****map;
static volatile int aio_fd_cnt;
* Types 1-3 are notified via atomics/futexes, mainly for AS-safety
* considerations. Type 4 is notified later via a cond var. */
- a_store(&cb->__ret, at->ret);
+ cb->__ret = at->ret;
if (a_swap(&at->running, 0) < 0)
__wake(&at->running, -1, 1);
if (a_swap(&cb->__err, at->err) != EINPROGRESS)
size_t len = cb->aio_nbytes;
off_t off = cb->aio_offset;
- struct aio_queue *q = __aio_get_queue(fd, 1);
+ struct aio_queue *q = args->q;
ssize_t ret;
- args->err = q ? 0 : EAGAIN;
+ pthread_mutex_lock(&q->lock);
sem_post(&args->sem);
- if (!q) return 0;
at.op = op;
at.running = 1;
at.prev = 0;
if ((at.next = q->head)) at.next->prev = &at;
q->head = &at;
- q->ref++;
if (!q->init) {
int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
return 0;
}
+static size_t io_thread_stack_size = MINSIGSTKSZ+2048;
+static pthread_once_t init_stack_size_once;
+
+static void init_stack_size()
+{
+ unsigned long val = __getauxval(AT_MINSIGSTKSZ);
+ if (val > MINSIGSTKSZ) io_thread_stack_size = val + 512;
+}
+
static int submit(struct aiocb *cb, int op)
{
int ret = 0;
pthread_attr_t a;
sigset_t allmask, origmask;
pthread_t td;
- struct aio_args args = { .cb = cb, .op = op };
+ struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1);
+ struct aio_args args = { .cb = cb, .op = op, .q = q };
sem_init(&args.sem, 0, 0);
+ if (!q) {
+ if (cb->aio_fildes < 0) errno = EBADF;
+ else errno = EAGAIN;
+ return -1;
+ }
+ q->ref++;
+ pthread_mutex_unlock(&q->lock);
+
if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
if (cb->aio_sigevent.sigev_notify_attributes)
a = *cb->aio_sigevent.sigev_notify_attributes;
else
pthread_attr_init(&a);
} else {
+ pthread_once(&init_stack_size_once, init_stack_size);
pthread_attr_init(&a);
- pthread_attr_setstacksize(&a, PTHREAD_STACK_MIN);
+ pthread_attr_setstacksize(&a, io_thread_stack_size);
pthread_attr_setguardsize(&a, 0);
}
pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
cb->__err = EINPROGRESS;
if (pthread_create(&td, &a, io_thread_func, &args)) {
+ pthread_mutex_lock(&q->lock);
+ __aio_unref_queue(q);
errno = EAGAIN;
ret = -1;
}
if (!ret) {
while (sem_wait(&args.sem));
- if (args.err) {
- errno = args.err;
- ret = -1;
- }
}
return ret;
return fd;
}
-LFS64(aio_cancel);
-LFS64(aio_error);
-LFS64(aio_fsync);
-LFS64(aio_read);
-LFS64(aio_write);
-LFS64(aio_return);
+weak_alias(aio_cancel, aio_cancel64);
+weak_alias(aio_error, aio_error64);
+weak_alias(aio_fsync, aio_fsync64);
+weak_alias(aio_read, aio_read64);
+weak_alias(aio_write, aio_write64);
+weak_alias(aio_return, aio_return64);