Helper functions for processing data streams in libvirtd

Defines the extensions to the remote protocol for generic
data streams. Adds a bunch of helper code to the libvirtd
daemon for working with data streams.

* daemon/Makefile.am: Add stream.c/stream.h to build
* daemon/stream.c, qemud/stream.h: Generic helper functions for
  creating new streams, associating streams with clients, finding
  existing streams for a client and removing/deleting streams.
* src/remote/remote_protocol.x: Add a new 'REMOTE_STREAM' constant
  for the 'enum remote_message_type' for encoding stream data
  in wire messages. Add a new 'REMOTE_CONTINUE' constant to
  'enum remote_message_status' to indicate further data stream
  messsages are expected to follow.  Document how the
  remote_message_header is used to encode data streams
* src/remote/remote_protocol.h: Regenerate
* daemon/dispatch.c: Remove assumption that a error message
  sent to client is always type=REMOTE_REPLY. It may now
  also be type=REMOTE_STREAM. Add convenient method for
  sending outgoing stream data packets. Log and ignore
  non-filtered incoming stream packets. Add a method for
  serializing a stream error message
* daemon/dispatch.h:  Add API for serializing stream errors
  and sending stream data packets
* daemon/qemud.h: Add struct qemud_client_stream for tracking
  active data streams for clients. Tweak filter function
  operation so that it accepts a client object too.
* daemon/qemud.c: Refactor code for free'ing message objects
  which have been fully transmitted into separate method.
  Release all active streams when client shuts down. Change
  filter function to be responsible for queueing the message
This commit is contained in:
Daniel P. Berrange
2009-07-10 13:06:36 +01:00
parent 182eba1bc6
commit 11573f3ec1
9 changed files with 509 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ DAEMON_SOURCES = \
libvirtd.c libvirtd.h \
remote.c remote.h \
dispatch.c dispatch.h \
stream.c stream.h \
remote_dispatch_prototypes.h \
remote_dispatch_table.h \
remote_dispatch_args.h \

View File

@@ -104,7 +104,7 @@ void remoteDispatchOOMError (remote_error *rerr)
{
remoteDispatchStringError(rerr,
VIR_ERR_NO_MEMORY,
NULL);
"out of memory");
}
@@ -136,6 +136,10 @@ remoteSerializeError(struct qemud_client *client,
unsigned int len;
struct qemud_client_message *msg = NULL;
DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d, msg=%s",
program, version, procedure, type, serial,
rerr->message ? *rerr->message : "(none)");
if (VIR_ALLOC(msg) < 0)
goto fatal_error;
@@ -206,19 +210,38 @@ fatal_error:
*
* Returns 0 if the error was sent, -1 upon fatal error
*/
static int
int
remoteSerializeReplyError(struct qemud_client *client,
remote_error *rerr,
remote_message_header *req) {
/*
* For data streams, errors are sent back as data streams
* For method calls, errors are sent back as method replies
*/
return remoteSerializeError(client,
rerr,
req->prog,
req->vers,
req->proc,
REMOTE_REPLY,
req->type == REMOTE_STREAM ? REMOTE_STREAM : REMOTE_REPLY,
req->serial);
}
int
remoteSerializeStreamError(struct qemud_client *client,
remote_error *rerr,
int proc,
int serial)
{
return remoteSerializeError(client,
rerr,
REMOTE_PROGRAM,
REMOTE_PROTOCOL_VERSION,
proc,
REMOTE_STREAM,
serial);
}
/*
* @msg: the complete incoming message, whose header to decode
*
@@ -338,6 +361,10 @@ remoteDispatchClientRequest (struct qemud_server *server,
{
remote_error rerr;
DEBUG("prog=%d ver=%d type=%d satus=%d serial=%d proc=%d",
msg->hdr.prog, msg->hdr.vers, msg->hdr.type,
msg->hdr.status, msg->hdr.serial, msg->hdr.proc);
memset(&rerr, 0, sizeof rerr);
/* Check version, etc. */
@@ -358,11 +385,24 @@ remoteDispatchClientRequest (struct qemud_server *server,
case REMOTE_CALL:
return remoteDispatchClientCall(server, client, msg);
case REMOTE_STREAM:
/* Since stream data is non-acked, async, we may continue to received
* stream packets after we closed down a stream. Just drop & ignore
* these.
*/
VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
msg->hdr.serial, msg->hdr.proc, msg->hdr.status);
qemudClientMessageRelease(client, msg);
break;
default:
remoteDispatchFormatError (&rerr, _("type (%d) != REMOTE_CALL"),
(int) msg->hdr.type);
goto error;
}
return 0;
error:
return remoteSerializeReplyError(client, &rerr, &msg->hdr);
}
@@ -532,3 +572,84 @@ xdr_error:
fatal_error:
return -1;
}
int
remoteSendStreamData(struct qemud_client *client,
struct qemud_client_stream *stream,
const char *data,
size_t len)
{
struct qemud_client_message *msg;
XDR xdr;
DEBUG("client=%p stream=%p data=%p len=%d", client, stream, data, len);
if (VIR_ALLOC(msg) < 0) {
return -1;
}
/* Return header. We're re-using same message object, so
* only need to tweak type/status fields */
msg->hdr.prog = REMOTE_PROGRAM;
msg->hdr.vers = REMOTE_PROTOCOL_VERSION;
msg->hdr.proc = stream->procedure;
msg->hdr.type = REMOTE_STREAM;
msg->hdr.serial = stream->serial;
/*
* NB
* data != NULL + len > 0 => REMOTE_CONTINUE (Sending back data)
* data != NULL + len == 0 => REMOTE_CONTINUE (Sending read EOF)
* data == NULL => REMOTE_OK (Sending finish handshake confirmation)
*/
msg->hdr.status = data ? REMOTE_CONTINUE : REMOTE_OK;
if (remoteEncodeClientMessageHeader(msg) < 0)
goto fatal_error;
if (data && len) {
if ((msg->bufferLength - msg->bufferOffset) < len)
goto fatal_error;
/* Now for the payload */
xdrmem_create (&xdr,
msg->buffer,
msg->bufferLength,
XDR_ENCODE);
/* Skip over existing header already written */
if (xdr_setpos(&xdr, msg->bufferOffset) == 0)
goto xdr_error;
memcpy(msg->buffer + msg->bufferOffset, data, len);
msg->bufferOffset += len;
/* Update the length word. */
len = msg->bufferOffset;
if (xdr_setpos (&xdr, 0) == 0)
goto xdr_error;
if (!xdr_u_int (&xdr, &len))
goto xdr_error;
xdr_destroy (&xdr);
DEBUG("Total %d", msg->bufferOffset);
}
/* Reset ready for I/O */
msg->bufferLength = msg->bufferOffset;
msg->bufferOffset = 0;
/* Put reply on end of tx queue to send out */
qemudClientMessageQueuePush(&client->tx, msg);
qemudUpdateClientEvent(client);
return 0;
xdr_error:
xdr_destroy (&xdr);
fatal_error:
VIR_FREE(msg);
return -1;
}

View File

@@ -49,6 +49,17 @@ void remoteDispatchOOMError (remote_error *rerr);
void remoteDispatchConnError (remote_error *rerr,
virConnectPtr conn);
int
remoteSerializeReplyError(struct qemud_client *client,
remote_error *rerr,
remote_message_header *req);
int
remoteSerializeStreamError(struct qemud_client *client,
remote_error *rerr,
int proc,
int serial);
/* Having this here is dubious. It should be in remote.h
* but qemud.c shouldn't depend on that header directly.
* Refactor this later to deal with this properly.
@@ -60,4 +71,10 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
void *opaque);
int
remoteSendStreamData(struct qemud_client *client,
struct qemud_client_stream *stream,
const char *data,
size_t len);
#endif /* __LIBVIRTD_DISPATCH_H__ */

View File

@@ -61,6 +61,7 @@
#include "conf.h"
#include "event.h"
#include "memory.h"
#include "stream.h"
#ifdef HAVE_AVAHI
#include "mdns.h"
#endif
@@ -1723,10 +1724,15 @@ readmore:
/* Check if any filters match this message */
filter = client->filters;
while (filter) {
if ((filter->query)(msg, filter->opaque)) {
qemudClientMessageQueuePush(&filter->dx, msg);
int ret;
ret = (filter->query)(client, msg, filter->opaque);
if (ret == 1) {
msg = NULL;
break;
} else if (ret == -1) {
VIR_FREE(msg);
qemudDispatchClientFailure(client);
return;
}
filter = filter->next;
}
@@ -1888,6 +1894,29 @@ static ssize_t qemudClientWrite(struct qemud_client *client) {
}
void
qemudClientMessageRelease(struct qemud_client *client,
struct qemud_client_message *msg)
{
if (!msg->async)
client->nrequests--;
/* See if the recv queue is currently throttled */
if (!client->rx &&
client->nrequests < max_client_requests) {
/* Reset message record for next RX attempt */
memset(msg, 0, sizeof(*msg));
client->rx = msg;
/* Get ready to receive next message */
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
} else {
VIR_FREE(msg);
}
qemudUpdateClientEvent(client);
}
/*
* Process all queued client->tx messages until
* we would block on I/O
@@ -1911,26 +1940,10 @@ qemudDispatchClientWrite(struct qemud_client *client) {
/* Get finished reply from head of tx queue */
reply = qemudClientMessageQueueServe(&client->tx);
/* If its not an async message, then we have
* just completed an RPC request */
if (!reply->async)
client->nrequests--;
/* Move record to end of 'rx' ist */
if (!client->rx &&
client->nrequests < max_client_requests) {
/* Reset message record for next RX attempt */
client->rx = reply;
client->rx->bufferOffset = 0;
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
} else {
VIR_FREE(reply);
}
qemudClientMessageRelease(client, reply);
if (client->closing)
qemudDispatchClientFailure(client);
else
qemudUpdateClientEvent(client);
}
}
}
@@ -2142,6 +2155,9 @@ static void qemudFreeClient(struct qemud_client *client) {
VIR_FREE(msg);
}
while (client->streams)
remoteRemoveClientStream(client, client->streams);
if (client->conn)
virConnectClose(client->conn);
virMutexDestroy(&client->lock);

View File

@@ -129,26 +129,43 @@ struct qemud_client_message {
unsigned int bufferLength;
unsigned int bufferOffset;
int async : 1;
unsigned int async : 1;
remote_message_header hdr;
struct qemud_client_message *next;
};
struct qemud_client;
/* Allow for filtering of incoming messages to a custom
* dispatch processing queue, instead of client->dx.
*/
typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque);
typedef int (*qemud_client_filter_func)(struct qemud_client *client,
struct qemud_client_message *msg, void *opaque);
struct qemud_client_filter {
qemud_client_filter_func query;
void *opaque;
struct qemud_client_message *dx;
struct qemud_client_filter *next;
};
struct qemud_client_stream {
virStreamPtr st;
int procedure;
int serial;
unsigned int recvEOF : 1;
unsigned int closed : 1;
struct qemud_client_filter filter;
struct qemud_client_message *rx;
int tx;
struct qemud_client_stream *next;
};
/* Stores the per-client connection state */
struct qemud_client {
virMutex lock;
@@ -197,6 +214,10 @@ struct qemud_client {
* end up on the 'dx' queue */
struct qemud_client_filter *filters;
/* Data streams */
struct qemud_client_stream *streams;
/* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is
* called, it will be set back to NULL if that succeeds.
@@ -275,6 +296,9 @@ qemudClientMessageQueuePush(struct qemud_client_message **queue,
struct qemud_client_message *
qemudClientMessageQueueServe(struct qemud_client_message **queue);
void
qemudClientMessageRelease(struct qemud_client *client,
struct qemud_client_message *msg);
#if HAVE_POLKIT

210
daemon/stream.c Normal file
View File

@@ -0,0 +1,210 @@
/*
* stream.c: APIs for managing client streams
*
* Copyright (C) 2009 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Daniel P. Berrange <berrange@redhat.com>
*/
#include <config.h>
#include "stream.h"
#include "memory.h"
#include "dispatch.h"
#include "logging.h"
/*
* @client: a locked client object
*
* Invoked by the main loop when filtering incoming messages.
*
* Returns 1 if the message was processed, 0 if skipped,
* -1 on fatal client error
*/
static int
remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED,
struct qemud_client_message *msg ATTRIBUTE_UNUSED,
void *opaque ATTRIBUTE_UNUSED)
{
return 0;
}
/*
* @conn: a connection object to associate the stream with
* @hdr: the method call to associate with the stram
*
* Creates a new stream for this conn
*
* Returns a new stream object, or NULL upon OOM
*/
struct qemud_client_stream *
remoteCreateClientStream(virConnectPtr conn,
remote_message_header *hdr)
{
struct qemud_client_stream *stream;
DEBUG("proc=%d serial=%d", hdr->proc, hdr->serial);
if (VIR_ALLOC(stream) < 0)
return NULL;
stream->procedure = hdr->proc;
stream->serial = hdr->serial;
stream->st = virStreamNew(conn, VIR_STREAM_NONBLOCK);
if (!stream->st) {
VIR_FREE(stream);
return NULL;
}
stream->filter.query = remoteStreamFilter;
stream->filter.opaque = stream;
return stream;
}
/*
* @stream: an unused client stream
*
* Frees the memory associated with this inactive client
* stream
*/
void remoteFreeClientStream(struct qemud_client *client,
struct qemud_client_stream *stream)
{
struct qemud_client_message *msg;
if (!stream)
return;
DEBUG("proc=%d serial=%d", stream->procedure, stream->serial);
msg = stream->rx;
while (msg) {
struct qemud_client_message *tmp = msg->next;
qemudClientMessageRelease(client, msg);
msg = tmp;
}
virStreamFree(stream->st);
VIR_FREE(stream);
}
/*
* @client: a locked client to add the stream to
* @stream: a stream to add
*/
int remoteAddClientStream(struct qemud_client *client,
struct qemud_client_stream *stream)
{
struct qemud_client_stream *tmp = client->streams;
DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
if (tmp) {
while (tmp->next)
tmp = tmp->next;
tmp->next = stream;
} else {
client->streams = stream;
}
stream->filter.next = client->filters;
client->filters = &stream->filter;
stream->tx = 1;
return 0;
}
/*
* @client: a locked client object
* @procedure: procedure associated with the stream
* @serial: serial number associated with the stream
*
* Finds a existing active stream
*
* Returns a stream object matching the procedure+serial number, or NULL
*/
struct qemud_client_stream *
remoteFindClientStream(struct qemud_client *client,
virStreamPtr st)
{
struct qemud_client_stream *stream = client->streams;
while (stream) {
if (stream->st == st)
return stream;
stream = stream->next;
}
return NULL;
}
/*
* @client: a locked client object
* @stream: an inactive, closed stream object
*
* Removes a stream from the list of active streams for the client
*
* Returns 0 if the stream was removd, -1 if it doesn't exist
*/
int
remoteRemoveClientStream(struct qemud_client *client,
struct qemud_client_stream *stream)
{
DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
struct qemud_client_stream *curr = client->streams;
struct qemud_client_stream *prev = NULL;
struct qemud_client_filter *filter = NULL;
if (client->filters == &stream->filter) {
client->filters = client->filters->next;
} else {
filter = client->filters;
while (filter) {
if (filter->next == &stream->filter) {
filter->next = filter->next->next;
break;
}
}
}
if (!stream->closed)
virStreamAbort(stream->st);
while (curr) {
if (curr == stream) {
if (prev)
prev->next = curr->next;
else
client->streams = curr->next;
remoteFreeClientStream(client, stream);
return 0;
}
prev = curr;
curr = curr->next;
}
return -1;
}

49
daemon/stream.h Normal file
View File

@@ -0,0 +1,49 @@
/*
* stream.h: APIs for managing client streams
*
* Copyright (C) 2009 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Daniel P. Berrange <berrange@redhat.com>
*/
#ifndef __LIBVIRTD_STREAM_H__
#define __LIBVIRTD_STREAM_H__
#include "libvirtd.h"
struct qemud_client_stream *
remoteCreateClientStream(virConnectPtr conn,
remote_message_header *hdr);
void remoteFreeClientStream(struct qemud_client *client,
struct qemud_client_stream *stream);
int remoteAddClientStream(struct qemud_client *client,
struct qemud_client_stream *stream);
struct qemud_client_stream *
remoteFindClientStream(struct qemud_client *client,
virStreamPtr stream);
int
remoteRemoveClientStream(struct qemud_client *client,
struct qemud_client_stream *stream);
#endif /* __LIBVIRTD_STREAM_H__ */