Input processing:
Input processing used to keep all data in one single input buffer, which makes it hard to handle nested events as well as reading directly from the socket into the application buffer without an extra copy. Create a new iscsi_in_pdu structure where we store the header, and any data for the recevied pdu and store them in a proper input queue. Change the signature for all processing functions to tahe a iscsi_in_pdu structure for the received pdu instead of just a pointer to a buffer.
This commit is contained in:
150
lib/socket.c
150
lib/socket.c
@@ -167,81 +167,85 @@ iscsi_which_events(struct iscsi_context *iscsi)
|
||||
static int
|
||||
iscsi_read_from_socket(struct iscsi_context *iscsi)
|
||||
{
|
||||
int available;
|
||||
int size;
|
||||
unsigned char *buf;
|
||||
ssize_t count;
|
||||
struct iscsi_in_pdu *in;
|
||||
ssize_t data_size, count;
|
||||
|
||||
if (ioctl(iscsi->fd, FIONREAD, &available) != 0) {
|
||||
iscsi_set_error(iscsi, "ioctl FIONREAD returned error : "
|
||||
"%d", errno);
|
||||
return -1;
|
||||
}
|
||||
if (available == 0) {
|
||||
iscsi_set_error(iscsi, "no data readable in socket, "
|
||||
"socket is closed");
|
||||
return -1;
|
||||
}
|
||||
size = iscsi->insize - iscsi->inpos + available;
|
||||
buf = malloc(size);
|
||||
if (buf == NULL) {
|
||||
iscsi_set_error(iscsi, "failed to allocate %d bytes for "
|
||||
"input buffer", size);
|
||||
return -1;
|
||||
}
|
||||
if (iscsi->insize > iscsi->inpos) {
|
||||
memcpy(buf, iscsi->inbuf + iscsi->inpos,
|
||||
iscsi->insize - iscsi->inpos);
|
||||
iscsi->insize -= iscsi->inpos;
|
||||
iscsi->inpos = 0;
|
||||
}
|
||||
|
||||
count = read(iscsi->fd, buf + iscsi->insize, available);
|
||||
if (count == -1) {
|
||||
if (errno == EINTR) {
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
return 0;
|
||||
if (iscsi->incoming == NULL) {
|
||||
iscsi->incoming = malloc(sizeof(struct iscsi_in_pdu));
|
||||
if (iscsi->incoming == NULL) {
|
||||
iscsi_set_error(iscsi, "Out-of-memory: failed to malloc iscsi_in_pdu");
|
||||
return -1;
|
||||
}
|
||||
iscsi_set_error(iscsi, "read from socket failed, "
|
||||
bzero(iscsi->incoming, sizeof(struct iscsi_in_pdu));
|
||||
}
|
||||
in = iscsi->incoming;
|
||||
|
||||
/* first we must read the header, including any digests */
|
||||
if (in->hdr_pos < ISCSI_HEADER_SIZE) {
|
||||
count = read(iscsi->fd, &in->hdr[in->hdr_pos], ISCSI_HEADER_SIZE - in->hdr_pos);
|
||||
if (count < 0) {
|
||||
if (errno == EINTR) {
|
||||
return 0;
|
||||
}
|
||||
iscsi_set_error(iscsi, "read from socket failed, "
|
||||
"errno:%d", errno);
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
free(iscsi->inbuf);
|
||||
|
||||
iscsi->inbuf = buf;
|
||||
iscsi->insize += count;
|
||||
|
||||
while (1) {
|
||||
if (iscsi->insize - iscsi->inpos < ISCSI_RAW_HEADER_SIZE) {
|
||||
return 0;
|
||||
}
|
||||
count = iscsi_get_pdu_size(iscsi,
|
||||
iscsi->inbuf + iscsi->inpos);
|
||||
if (iscsi->insize + iscsi->inpos < count) {
|
||||
return 0;
|
||||
}
|
||||
if (iscsi_process_pdu(iscsi, iscsi->inbuf + iscsi->inpos,
|
||||
count) != 0) {
|
||||
iscsi->inpos += count;
|
||||
return -1;
|
||||
}
|
||||
iscsi->inpos += count;
|
||||
if (iscsi->inpos == iscsi->insize) {
|
||||
free(iscsi->inbuf);
|
||||
iscsi->inbuf = NULL;
|
||||
iscsi->insize = 0;
|
||||
iscsi->inpos = 0;
|
||||
if (count == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (iscsi->inpos > iscsi->insize) {
|
||||
iscsi_set_error(iscsi, "inpos > insize. bug!");
|
||||
in->hdr_pos += count;
|
||||
}
|
||||
|
||||
if (in->hdr_pos < ISCSI_HEADER_SIZE) {
|
||||
/* we dont have the full header yet, so return */
|
||||
return 0;
|
||||
}
|
||||
|
||||
data_size = iscsi_get_pdu_data_size(&in->hdr[0]);
|
||||
if (data_size != 0) {
|
||||
if (in->data == NULL) {
|
||||
in->data = malloc(data_size);
|
||||
if (in->data == NULL) {
|
||||
iscsi_set_error(iscsi, "Out-of-memory: failed to malloc iscsi_in_pdu->data(%d)", (int)data_size);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
count = read(iscsi->fd, &in->data[in->data_pos], data_size - in->data_pos);
|
||||
if (count < 0) {
|
||||
if (errno == EINTR) {
|
||||
return 0;
|
||||
}
|
||||
iscsi_set_error(iscsi, "read from socket failed, "
|
||||
"errno:%d", errno);
|
||||
return -1;
|
||||
}
|
||||
if (count == 0) {
|
||||
return 0;
|
||||
}
|
||||
in->data_pos += count;
|
||||
}
|
||||
|
||||
if (in->data_pos < data_size) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SLIST_ADD_END(&iscsi->inqueue, in);
|
||||
iscsi->incoming = NULL;
|
||||
|
||||
|
||||
while (iscsi->inqueue != NULL) {
|
||||
struct iscsi_in_pdu *in = iscsi->inqueue;
|
||||
|
||||
if (iscsi_process_pdu(iscsi, in) != 0) {
|
||||
return -1;
|
||||
}
|
||||
SLIST_REMOVE(&iscsi->inqueue, in);
|
||||
iscsi_free_iscsi_in_pdu(in);
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -353,3 +357,19 @@ iscsi_queue_pdu(struct iscsi_context *iscsi, struct iscsi_pdu *pdu)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
iscsi_free_iscsi_in_pdu(struct iscsi_in_pdu *in)
|
||||
{
|
||||
free(in->data);
|
||||
free(in);
|
||||
}
|
||||
|
||||
void
|
||||
iscsi_free_iscsi_inqueue(struct iscsi_in_pdu *inqueue)
|
||||
{
|
||||
while (inqueue != NULL) {
|
||||
struct iscsi_in_pdu *next = inqueue->next;
|
||||
iscsi_free_iscsi_in_pdu(inqueue);
|
||||
inqueue = next;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user