diff --git a/lib/multithreading.c b/lib/multithreading.c index fd3a5a5..2449f80 100644 --- a/lib/multithreading.c +++ b/lib/multithreading.c @@ -139,6 +139,9 @@ int iscsi_mt_sem_wait(libiscsi_sem_t* sem) #elif defined(HAVE_PTHREAD) /* WIN32 */ +#include +#include +#include #include #include @@ -156,13 +159,20 @@ iscsi_tid_t iscsi_mt_get_tid(void) #endif } +static void on_sigusr1(int _unused) +{ +} + static void *iscsi_mt_service_thread(void *arg) { struct iscsi_context *iscsi = (struct iscsi_context *)arg; struct pollfd pfd; int revents; int ret; - + + /* set signal to break poll when we need to send more data */ + signal(SIGUSR1, on_sigusr1); + iscsi->multithreading_enabled = 1; /* TODO: add timeout scanning */ @@ -172,12 +182,21 @@ static void *iscsi_mt_service_thread(void *arg) pfd.revents = 0; ret = poll(&pfd, 1, iscsi->poll_timeout); - if (ret < 0) { + if (ret < 0 && errno == EINTR) { + /* + * Got a signal. Assume it is because we need to start writing new PDUs + * to the socket. + */ + revents = POLLOUT; + goto call_service; + } + if (ret < 0) { iscsi_set_error(iscsi, "Poll failed"); revents = -1; } else { revents = pfd.revents; } + call_service: if (iscsi_service(iscsi, revents) < 0) { if (revents != -1) iscsi_set_error(iscsi, "iscsi_service failed"); diff --git a/lib/socket.c b/lib/socket.c index f1a1e15..21f4ae3 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -62,6 +62,10 @@ #include #endif +#ifdef HAVE_PTHREAD +#include +#endif + #include #include #include @@ -88,7 +92,7 @@ union socket_address { void iscsi_add_to_outqueue(struct iscsi_context *iscsi, struct iscsi_pdu *pdu) { - struct iscsi_pdu *current = iscsi->outqueue; + struct iscsi_pdu *current; struct iscsi_pdu *last = NULL; if (iscsi->scsi_timeout > 0) { @@ -97,12 +101,15 @@ iscsi_add_to_outqueue(struct iscsi_context *iscsi, struct iscsi_pdu *pdu) pdu->scsi_timeout = 0; } - if (iscsi->outqueue == NULL) { + iscsi_mt_mutex_lock(&iscsi->iscsi_mutex); + + current = iscsi->outqueue; + if (iscsi->outqueue == NULL) { iscsi->outqueue = pdu; pdu->next = NULL; - return; + goto finished; } - + /* queue pdus in ascending order of CmdSN. * ensure that pakets with the same CmdSN are kept in FIFO order. * immediate PDUs are queued in front of queue with the CmdSN @@ -131,7 +138,7 @@ iscsi_add_to_outqueue(struct iscsi_context *iscsi, struct iscsi_pdu *pdu) iscsi->outqueue=pdu; } pdu->next = current; - return; + goto finished; } last=current; current=current->next; @@ -139,9 +146,24 @@ iscsi_add_to_outqueue(struct iscsi_context *iscsi, struct iscsi_pdu *pdu) last->next = pdu; pdu->next = NULL; + + finished: + iscsi_mt_mutex_unlock(&iscsi->iscsi_mutex); + + /* TODO QQQ need to immediately send for the non multithreading case too + * and for the Windows API too */ +#if defined(HAVE_MULTITHREADING) && defined(HAVE_PTHREAD) + if (current == NULL && pdu == iscsi->outqueue) { + if(iscsi->multithreading_enabled) { + pthread_kill(iscsi->service_thread, SIGUSR1); + } + } +#endif + return; } void iscsi_decrement_iface_rr() { + /* TODO QQQ use an atomic here */ iface_rr--; }