mirror of
https://github.com/libvirt/libvirt.git
synced 2025-02-25 18:55:26 -06:00
Revert "rpc: Fix slow volume download (virsh vol-download)"
This reverts commit d9c9e138f2
.
Unfortunately, things are going to be handled differently so this
commit must go.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
This commit is contained in:
parent
0c56d94318
commit
435ee578a0
@ -49,9 +49,9 @@ struct _virNetClientStream {
|
|||||||
* time by stopping consuming any incoming data
|
* time by stopping consuming any incoming data
|
||||||
* off the socket....
|
* off the socket....
|
||||||
*/
|
*/
|
||||||
struct iovec *incomingVec; /* I/O Vector to hold data */
|
char *incoming;
|
||||||
size_t writeVec; /* Vectors produced */
|
size_t incomingOffset;
|
||||||
size_t readVec; /* Vectors consumed */
|
size_t incomingLength;
|
||||||
bool incomingEOF;
|
bool incomingEOF;
|
||||||
|
|
||||||
virNetClientStreamEventCallback cb;
|
virNetClientStreamEventCallback cb;
|
||||||
@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
|
|||||||
if (!st->cb)
|
if (!st->cb)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
|
VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
|
||||||
|
|
||||||
if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
|
if (((st->incomingOffset || st->incomingEOF) &&
|
||||||
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
|
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
|
||||||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
|
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
|
||||||
VIR_DEBUG("Enabling event timer");
|
VIR_DEBUG("Enabling event timer");
|
||||||
@ -110,14 +110,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
|
|||||||
|
|
||||||
if (st->cb &&
|
if (st->cb &&
|
||||||
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
|
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
|
||||||
((st->readVec < st->writeVec) || st->incomingEOF))
|
(st->incomingOffset || st->incomingEOF))
|
||||||
events |= VIR_STREAM_EVENT_READABLE;
|
events |= VIR_STREAM_EVENT_READABLE;
|
||||||
if (st->cb &&
|
if (st->cb &&
|
||||||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
|
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
|
||||||
events |= VIR_STREAM_EVENT_WRITABLE;
|
events |= VIR_STREAM_EVENT_WRITABLE;
|
||||||
|
|
||||||
VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
|
VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
|
||||||
st->readVec, st->writeVec);
|
|
||||||
if (events) {
|
if (events) {
|
||||||
virNetClientStreamEventCallback cb = st->cb;
|
virNetClientStreamEventCallback cb = st->cb;
|
||||||
void *cbOpaque = st->cbOpaque;
|
void *cbOpaque = st->cbOpaque;
|
||||||
@ -162,7 +161,7 @@ void virNetClientStreamDispose(void *obj)
|
|||||||
virNetClientStreamPtr st = obj;
|
virNetClientStreamPtr st = obj;
|
||||||
|
|
||||||
virResetError(&st->err);
|
virResetError(&st->err);
|
||||||
VIR_FREE(st->incomingVec);
|
VIR_FREE(st->incoming);
|
||||||
virObjectUnref(st->prog);
|
virObjectUnref(st->prog);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,50 +265,38 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
|
|||||||
virNetMessagePtr msg)
|
virNetMessagePtr msg)
|
||||||
{
|
{
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
struct iovec iov;
|
size_t need;
|
||||||
char *base;
|
|
||||||
size_t piece, pieces, length, offset = 0, size = 1024*1024;
|
|
||||||
|
|
||||||
virObjectLock(st);
|
virObjectLock(st);
|
||||||
|
need = msg->bufferLength - msg->bufferOffset;
|
||||||
|
if (need) {
|
||||||
|
size_t avail = st->incomingLength - st->incomingOffset;
|
||||||
|
if (need > avail) {
|
||||||
|
size_t extra = need - avail;
|
||||||
|
if (VIR_REALLOC_N(st->incoming,
|
||||||
|
st->incomingLength + extra) < 0) {
|
||||||
|
VIR_DEBUG("Out of memory handling stream data");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
st->incomingLength += extra;
|
||||||
|
}
|
||||||
|
|
||||||
length = msg->bufferLength - msg->bufferOffset;
|
memcpy(st->incoming + st->incomingOffset,
|
||||||
|
msg->buffer + msg->bufferOffset,
|
||||||
if (length == 0) {
|
msg->bufferLength - msg->bufferOffset);
|
||||||
|
st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
|
||||||
|
} else {
|
||||||
st->incomingEOF = true;
|
st->incomingEOF = true;
|
||||||
goto end;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pieces = VIR_DIV_UP(length, size);
|
VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
|
||||||
for (piece = 0; piece < pieces; piece++) {
|
st->incomingOffset, st->incomingLength,
|
||||||
if (size > length - offset)
|
st->incomingEOF);
|
||||||
size = length - offset;
|
|
||||||
|
|
||||||
if (VIR_ALLOC_N(base, size)) {
|
|
||||||
VIR_DEBUG("Allocation failed");
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
|
|
||||||
iov.iov_base = base;
|
|
||||||
iov.iov_len = size;
|
|
||||||
offset += size;
|
|
||||||
|
|
||||||
if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
|
|
||||||
VIR_DEBUG("Append failed");
|
|
||||||
VIR_FREE(base);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
|
|
||||||
st->readVec, st->writeVec, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
end:
|
|
||||||
virNetClientStreamEventTimerUpdate(st);
|
virNetClientStreamEventTimerUpdate(st);
|
||||||
|
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
|
|
||||||
st->readVec, st->writeVec, st->incomingEOF);
|
|
||||||
virObjectUnlock(st);
|
virObjectUnlock(st);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -374,21 +361,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
|
|||||||
size_t nbytes,
|
size_t nbytes,
|
||||||
bool nonblock)
|
bool nonblock)
|
||||||
{
|
{
|
||||||
int ret = -1;
|
int rv = -1;
|
||||||
size_t partial, offset;
|
|
||||||
|
|
||||||
virObjectLock(st);
|
|
||||||
|
|
||||||
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
|
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
|
||||||
st, client, data, nbytes, nonblock);
|
st, client, data, nbytes, nonblock);
|
||||||
|
virObjectLock(st);
|
||||||
if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
|
if (!st->incomingOffset && !st->incomingEOF) {
|
||||||
virNetMessagePtr msg;
|
virNetMessagePtr msg;
|
||||||
int rv;
|
int ret;
|
||||||
|
|
||||||
if (nonblock) {
|
if (nonblock) {
|
||||||
VIR_DEBUG("Non-blocking mode and no data available");
|
VIR_DEBUG("Non-blocking mode and no data available");
|
||||||
ret = -2;
|
rv = -2;
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -404,66 +387,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
|
|||||||
|
|
||||||
VIR_DEBUG("Dummy packet to wait for stream data");
|
VIR_DEBUG("Dummy packet to wait for stream data");
|
||||||
virObjectUnlock(st);
|
virObjectUnlock(st);
|
||||||
rv = virNetClientSendWithReplyStream(client, msg, st);
|
ret = virNetClientSendWithReplyStream(client, msg, st);
|
||||||
virObjectLock(st);
|
virObjectLock(st);
|
||||||
virNetMessageFree(msg);
|
virNetMessageFree(msg);
|
||||||
|
|
||||||
if (rv < 0)
|
if (ret < 0)
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
offset = 0;
|
VIR_DEBUG("After IO %zu", st->incomingOffset);
|
||||||
partial = nbytes;
|
if (st->incomingOffset) {
|
||||||
|
int want = st->incomingOffset;
|
||||||
while (st->incomingVec && (st->readVec < st->writeVec)) {
|
if (want > nbytes)
|
||||||
struct iovec *iov = st->incomingVec + st->readVec;
|
want = nbytes;
|
||||||
|
memcpy(data, st->incoming, want);
|
||||||
if (!iov || !iov->iov_base) {
|
if (want < st->incomingOffset) {
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR,
|
memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
|
||||||
"%s", _("NULL pointer encountered"));
|
st->incomingOffset -= want;
|
||||||
goto cleanup;
|
} else {
|
||||||
|
VIR_FREE(st->incoming);
|
||||||
|
st->incomingOffset = st->incomingLength = 0;
|
||||||
}
|
}
|
||||||
|
rv = want;
|
||||||
if (partial < iov->iov_len) {
|
} else {
|
||||||
memcpy(data+offset, iov->iov_base, partial);
|
rv = 0;
|
||||||
memmove(iov->iov_base, (char*)iov->iov_base+partial,
|
|
||||||
iov->iov_len-partial);
|
|
||||||
iov->iov_len -= partial;
|
|
||||||
offset += partial;
|
|
||||||
VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(data+offset, iov->iov_base, iov->iov_len);
|
|
||||||
VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
|
|
||||||
partial -= iov->iov_len;
|
|
||||||
offset += iov->iov_len;
|
|
||||||
VIR_FREE(iov->iov_base);
|
|
||||||
iov->iov_len = 0;
|
|
||||||
st->readVec++;
|
|
||||||
|
|
||||||
VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu",
|
|
||||||
offset, st->readVec, st->writeVec);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Shrink the I/O Vector buffer to free up memory. Do the
|
|
||||||
shrinking only when there is selected amount or more buffers to
|
|
||||||
free so it doesn't constantly memmove() and realloc() buffers.
|
|
||||||
*/
|
|
||||||
if (st->readVec >= 16) {
|
|
||||||
memmove(st->incomingVec, st->incomingVec + st->readVec,
|
|
||||||
sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
|
|
||||||
VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
|
|
||||||
VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec);
|
|
||||||
st->readVec = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = offset;
|
|
||||||
virNetClientStreamEventTimerUpdate(st);
|
virNetClientStreamEventTimerUpdate(st);
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
virObjectUnlock(st);
|
virObjectUnlock(st);
|
||||||
return ret;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user