diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 9a4a7ff30c..5ce78fe585 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -56,8 +56,6 @@ struct virFDStreamData { virObjectLockable parent; int fd; - int errfd; - virCommandPtr cmd; unsigned long long offset; unsigned long long length; @@ -79,6 +77,11 @@ struct virFDStreamData { virFDStreamInternalCloseCb icbCb; virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; void *icbOpaque; + + /* Thread data */ + virThreadPtr thread; + int threadErr; + bool threadQuit; }; static virClassPtr virFDStreamDataClass; @@ -264,57 +267,124 @@ virFDStreamAddCallback(virStreamPtr st, return ret; } -static int -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) -{ - char buf[1024]; - ssize_t len; - int status; - int ret = -1; - if (!fdst->cmd) +typedef struct _virFDStreamThreadData virFDStreamThreadData; +typedef virFDStreamThreadData *virFDStreamThreadDataPtr; +struct _virFDStreamThreadData { + virStreamPtr st; + size_t length; + int fdin; + char *fdinname; + int fdout; + char *fdoutname; +}; + + +static void +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) +{ + if (!data) + return; + + virObjectUnref(data->st); + VIR_FREE(data->fdinname); + VIR_FREE(data->fdoutname); + VIR_FREE(data); +} + + +static void +virFDStreamThread(void *opaque) +{ + virFDStreamThreadDataPtr data = opaque; + virStreamPtr st = data->st; + size_t length = data->length; + int fdin = data->fdin; + char *fdinname = data->fdinname; + int fdout = data->fdout; + char *fdoutname = data->fdoutname; + virFDStreamDataPtr fdst = st->privateData; + char *buf = NULL; + size_t buflen = 256 * 1024; + size_t total = 0; + + virObjectRef(fdst); + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + while (1) { + ssize_t got; + + if (length && + (length - total) < buflen) + buflen = length - total; + + if (buflen == 0) + break; /* End of requested data from client */ + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + if (got == 0) + break; + + total += got; + + if (safewrite(fdout, buf, got) < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + goto error; + } + } + + cleanup: + if (!virObjectUnref(fdst)) + st->privateData = NULL; + VIR_FORCE_CLOSE(fdin); + VIR_FORCE_CLOSE(fdout); + virFDStreamThreadDataFree(data); + VIR_FREE(buf); + return; + + error: + virObjectLock(fdst); + fdst->threadErr = errno; + virObjectUnlock(fdst); + goto cleanup; +} + + +static int +virFDStreamJoinWorker(virFDStreamDataPtr fdst, + bool streamAbort) +{ + int ret = -1; + if (!fdst->thread) return 0; - if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0) - buf[0] = '\0'; - else - buf[len] = '\0'; + /* Give the thread a chance to lock the FD stream object. */ + virObjectUnlock(fdst); + virThreadJoin(fdst->thread); + virObjectLock(fdst); - virCommandRawStatus(fdst->cmd); - if (virCommandWait(fdst->cmd, &status) < 0) - goto cleanup; - - if (status != 0) { - if (buf[0] != '\0') { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) { - if (streamAbort) { - /* Explicit abort request means the caller doesn't care - if there's data left over, so skip the error */ - goto out; - } - - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("I/O helper exited " - "before all data was processed")); - } else { - char *str = virProcessTranslateStatus(status); - virReportError(VIR_ERR_INTERNAL_ERROR, - _("I/O helper exited with %s"), - NULLSTR(str)); - VIR_FREE(str); - } + if (fdst->threadErr && !streamAbort) { + /* errors are expected on streamAbort */ goto cleanup; } - out: ret = 0; cleanup: - virCommandFree(fdst->cmd); - fdst->cmd = NULL; + VIR_FREE(fdst->thread); return ret; } + static int virFDStreamCloseInt(virStreamPtr st, bool streamAbort) { @@ -359,12 +429,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) /* mutex locked */ ret = VIR_CLOSE(fdst->fd); - if (virFDStreamCloseCommand(fdst, streamAbort) < 0) + if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret = -1; - if (VIR_CLOSE(fdst->errfd) < 0) - VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd); - st->privateData = NULL; /* call the internal stream closing callback */ @@ -516,14 +583,13 @@ static virStreamDriver virFDStreamDrv = { static int virFDStreamOpenInternal(virStreamPtr st, int fd, - virCommandPtr cmd, - int errfd, + virFDStreamThreadDataPtr threadData, unsigned long long length) { virFDStreamDataPtr fdst; - VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", - st, fd, cmd, errfd, length); + VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu", + st, fd, threadData, length); if (virFDStreamDataInitialize() < 0) return -1; @@ -538,21 +604,39 @@ static int virFDStreamOpenInternal(virStreamPtr st, return -1; fdst->fd = fd; - fdst->cmd = cmd; - fdst->errfd = errfd; fdst->length = length; st->driver = &virFDStreamDrv; st->privateData = fdst; + if (threadData) { + /* Create the thread after fdst and st were initialized. + * The thread worker expects them to be that way. */ + if (VIR_ALLOC(fdst->thread) < 0) + goto error; + + if (virThreadCreate(fdst->thread, + true, + virFDStreamThread, + threadData) < 0) + goto error; + } + return 0; + + error: + VIR_FREE(fdst->thread); + st->driver = NULL; + st->privateData = NULL; + virObjectUnref(fdst); + return -1; } int virFDStreamOpen(virStreamPtr st, int fd) { - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); + return virFDStreamOpenInternal(st, fd, NULL, 0); } @@ -598,7 +682,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, goto error; } - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) + if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0) goto error; return 0; @@ -627,11 +711,10 @@ virFDStreamOpenFileInternal(virStreamPtr st, bool forceIOHelper) { int fd = -1; - int childfd = -1; + int pipefds[2] = { -1, -1 }; + int tmpfd = -1; struct stat sb; - virCommandPtr cmd = NULL; - int errfd = -1; - char *iohelper_path = NULL; + virFDStreamThreadDataPtr threadData = NULL; VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o", st, path, oflags, offset, length, mode); @@ -648,6 +731,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, path); return -1; } + tmpfd = fd; if (fstat(fd, &sb) < 0) { virReportSystemError(errno, @@ -666,13 +750,12 @@ virFDStreamOpenFileInternal(virStreamPtr st, /* Thanks to the POSIX i/o model, we can't reliably get * non-blocking I/O on block devs/regular files. To - * support those we need to fork a helper process to do + * support those we need to create a helper thread to do * the I/O so we just have a fifo. Or use AIO :-( */ if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, @@ -681,58 +764,47 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; } - if (pipe(fds) < 0) { + if (pipe(pipefds) < 0) { virReportSystemError(errno, "%s", _("Unable to create pipe")); goto error; } - if (!(iohelper_path = virFileFindResource("libvirt_iohelper", - abs_topbuilddir "/src", - LIBEXECDIR))) + if (VIR_ALLOC(threadData) < 0) goto error; - cmd = virCommandNewArgList(iohelper_path, - path, - NULL); - - VIR_FREE(iohelper_path); - - virCommandAddArgFormat(cmd, "%llu", length); - virCommandPassFD(cmd, fd, - VIR_COMMAND_PASS_FD_CLOSE_PARENT); - virCommandAddArgFormat(cmd, "%d", fd); + threadData->st = virObjectRef(st); + threadData->length = length; if ((oflags & O_ACCMODE) == O_RDONLY) { - childfd = fds[1]; - fd = fds[0]; - virCommandSetOutputFD(cmd, &childfd); + threadData->fdin = fd; + threadData->fdout = pipefds[1]; + if (VIR_STRDUP(threadData->fdinname, path) < 0 || + VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + goto error; + tmpfd = pipefds[0]; } else { - childfd = fds[0]; - fd = fds[1]; - virCommandSetInputFD(cmd, childfd); + threadData->fdin = pipefds[0]; + threadData->fdout = fd; + if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || + VIR_STRDUP(threadData->fdoutname, path) < 0) + goto error; + tmpfd = pipefds[1]; } - virCommandSetErrorFD(cmd, &errfd); - - if (virCommandRunAsync(cmd, NULL) < 0) - goto error; - - VIR_FORCE_CLOSE(childfd); } - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) + if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0) goto error; return 0; error: - virCommandFree(cmd); VIR_FORCE_CLOSE(fd); - VIR_FORCE_CLOSE(childfd); - VIR_FORCE_CLOSE(errfd); - VIR_FREE(iohelper_path); + VIR_FORCE_CLOSE(pipefds[0]); + VIR_FORCE_CLOSE(pipefds[1]); if (oflags & O_CREAT) unlink(path); + virFDStreamThreadDataFree(threadData); return -1; } diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 32a741e269..34c4c3fc68 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -24,7 +24,6 @@ # define __VIR_FDSTREAM_H_ # include "internal.h" -# include "vircommand.h" /* internal callback, the generic one is used up by daemon stream driver */ /* the close callback is called with fdstream private data locked */