virtinst: connection: Absorb all scattered URI helpers

This commit is contained in:
Cole Robinson 2013-07-05 20:36:28 -04:00
parent 57e86259d8
commit ec734191fc
15 changed files with 207 additions and 367 deletions

View File

@ -34,7 +34,6 @@ import virtinst
import virtinst.CapabilitiesParser
import virtinst.cli as cli
import virtinst.util as util
from virtinst import uriutil
from virtinst import VirtualCharDevice
from virtinst.cli import fail, print_stdout, print_stderr
@ -310,7 +309,7 @@ def get_virt_type(conn, options):
if (not req_virt_type and
not req_hv_type and
req_accel and
uriutil.is_qemu(conn) and
conn.is_qemu() and
capsguest.arch in ["i686", "x86_64"] and
not capsdomain.is_accelerated()):
logging.warn("KVM acceleration not available, using '%s'",
@ -454,7 +453,7 @@ def check_option_collisions(options, guest):
fail(_("Paravirtualized guests cannot install off cdrom media."))
if (options.location and
guest.is_remote() and not
guest.conn.is_remote() and not
guest.installer.support_remote_url_install()):
fail(_("Libvirt version does not support remote --location installs"))

View File

@ -32,8 +32,6 @@ import traceback
import libvirt
import virtinst
import virtinst.cli
from virtinst import uriutil
from virtManager import util
from virtManager import connectauth
@ -270,9 +268,6 @@ class vmmConnection(vmmGObject):
# This can throw an exception, so beware when calling!
return socket.gethostbyaddr(socket.gethostname())[0]
def get_uri_hostname(self):
return uriutil.get_uri_hostname(self.get_uri())
def get_short_hostname(self):
hostname = self.get_hostname()
offset = hostname.find(".")
@ -287,68 +282,23 @@ class vmmConnection(vmmGObject):
except:
return self.get_uri_hostname()
def get_transport(self):
return uriutil.get_uri_transport(self.get_uri())
get_uri_hostname = property(lambda s:
getattr(s, "_backend").get_uri_hostname)
get_transport = property(lambda s:
getattr(s, "_backend").get_uri_transport)
get_driver = property(lambda s: getattr(s, "_backend").get_uri_driver)
is_container = property(lambda s: getattr(s, "_backend").is_container)
is_lxc = property(lambda s: getattr(s, "_backend").is_lxc)
is_openvz = property(lambda s: getattr(s, "_backend").is_openvz)
is_xen = property(lambda s: getattr(s, "_backend").is_xen)
is_remote = property(lambda s: getattr(s, "_backend").is_remote)
is_qemu = property(lambda s: getattr(s, "_backend").is_qemu)
is_qemu_system = property(lambda s: getattr(s, "_backend").is_qemu_system)
is_qemu_session = property(lambda s:
getattr(s, "_backend").is_qemu_session)
is_test_conn = property(lambda s: getattr(s, "_backend").is_test)
is_session_uri = property(lambda s: getattr(s, "_backend").is_session_uri)
def get_driver(self):
return uriutil.get_uri_driver(self.get_uri())
def is_local(self):
return bool(self.get_uri_hostname() == "localhost")
def is_container(self):
return self.is_lxc() or self.is_openvz()
def is_lxc(self):
if self._backend.is_virtinst_test_uri:
self.get_uri().count(",lxc")
return uriutil.uri_split(self.get_uri())[0].startswith("lxc")
def is_openvz(self):
return uriutil.uri_split(self.get_uri())[0].startswith("openvz")
def is_xen(self):
if self._backend.is_virtinst_test_uri:
return self.get_uri().count(",xen")
scheme = uriutil.uri_split(self.get_uri())[0]
return scheme.startswith("xen")
def is_qemu(self):
if self._backend.is_virtinst_test_uri:
return self.get_uri().count(",qemu")
scheme = uriutil.uri_split(self.get_uri())[0]
return scheme.startswith("qemu")
def is_remote(self):
return uriutil.is_uri_remote(self.get_uri())
def is_qemu_system(self):
(scheme, ignore, ignore,
path, ignore, ignore) = uriutil.uri_split(self.get_uri())
if path == "/system" and scheme.startswith("qemu"):
return True
return False
def is_qemu_session(self):
(scheme, ignore, ignore,
path, ignore, ignore) = uriutil.uri_split(self.get_uri())
if path == "/session" and scheme.startswith("qemu"):
return True
return False
def is_test_conn(self):
(scheme, ignore, ignore,
ignore, ignore, ignore) = uriutil.uri_split(self.get_uri())
if scheme.startswith("test"):
return True
return False
def is_session_uri(self):
path = uriutil.uri_split(self.get_uri())[3]
return path == "/session"
# Connection capabilities debug helpers
def rhel6_defaults(self, emulator):
@ -392,7 +342,7 @@ class vmmConnection(vmmGObject):
return match_whole_string(orig, "[0-9.]+")
(scheme, username, hostname,
path, ignore, ignore) = uriutil.uri_split(self.get_uri())
path, ignore, ignore) = virtinst.util.uri_split(self.get_uri())
hv = ""
rest = ""

View File

@ -29,7 +29,7 @@ from gi.repository import Gtk
# pylint: enable=E0611
import libvirt
from virtinst import uriutil
from virtinst import util as virtinstutil
from virtManager import util
from virtManager.baseclass import vmmGObjectUI
@ -275,7 +275,7 @@ class vmmMigrateDialog(vmmGObjectUI):
return self.edit_uri(srcuri, desthost, None)
def edit_uri(self, uri, hostname, port):
split = list(uriutil.uri_split(uri))
split = list(virtinstutil.uri_split(uri))
hostname = hostname or split[2]
if port:
@ -303,7 +303,7 @@ class vmmMigrateDialog(vmmGObjectUI):
# For secure migration, we need to make sure we aren't migrating
# to the local connection, because libvirt will pull try to use
# 'qemu:///system' as the migrate URI which will deadlock
if destconn.is_local():
if destconn.get_uri_hostname() == "localhost":
uri = self.build_localhost_uri(destconn, srcuri)
else:
uri = destconn.get_uri()

View File

@ -301,7 +301,7 @@ class Cloner(object):
# we have permissions to do so. This validation check
# caused a few bug reports in a short period of time,
# so must be a common case.
if (clone_disk.is_remote() or
if (self.conn.is_remote() or
clone_disk.type != clone_disk.TYPE_BLOCK or
not orig_disk.path or
not os.access(orig_disk.path, os.R_OK) or

View File

@ -194,7 +194,7 @@ class DistroInstaller(Installer.Installer):
is_tuple = False
validated = True
self._location_is_path = True
is_local = not self.is_remote()
is_local = not self.conn.is_remote()
# Basic validation
if type(val) is not str and (type(val) is not tuple and len(val) != 2):
@ -326,8 +326,9 @@ class DistroInstaller(Installer.Installer):
conn = guest.conn
system_scratchdir = self._get_system_scratchdir()
if (not guest.is_remote() and
(self.is_session_uri() or self.scratchdir == system_scratchdir)):
if (not self.conn.is_remote() and
(self.conn.is_session_uri() or
self.scratchdir == system_scratchdir)):
# We have access to system scratchdir, don't jump through hoops
logging.debug("Have access to preferred scratchdir so"
" nothing to upload")

View File

@ -236,7 +236,7 @@ class Installer(XMLBuilderDomain.XMLBuilderDomain):
def _get_scratchdir(self):
scratch = None
if not self.is_session_uri():
if not self.conn.is_session_uri():
scratch = self._get_system_scratchdir()
if (not scratch or

View File

@ -31,7 +31,6 @@ import urlgrabber.progress as progress
import libvirt
import virtinst
from virtinst import uriutil
from virtinst import util
from virtinst import Storage
@ -190,7 +189,7 @@ def _check_if_path_managed(conn, path):
pool = trypool
if not vol and not pool:
if not uriutil.is_uri_remote(conn.getURI(), conn=conn):
if not conn.is_remote():
# Building local disk
return None, None, False
@ -340,7 +339,6 @@ class VirtualDisk(VirtualDevice):
"""
Check if path exists. If we can't determine, return False
"""
is_remote = uriutil.is_uri_remote(conn.getURI(), conn=conn)
try:
vol = None
path_is_pool = False
@ -352,7 +350,7 @@ class VirtualDisk(VirtualDevice):
if vol or path_is_pool:
return True
if not is_remote:
if not conn.is_remote():
return os.path.exists(path)
except:
pass
@ -370,7 +368,7 @@ class VirtualDisk(VirtualDevice):
"""
if path is None:
return []
if uriutil.is_uri_remote(conn.getURI(), conn=conn):
if conn.is_remote():
return []
try:
@ -1131,8 +1129,7 @@ class VirtualDisk(VirtualDevice):
drvname = self._driverName
drvtype = self._driverType
is_qemu = self.is_qemu()
if is_qemu and not drvname:
if self.conn.is_qemu() and not drvname:
drvname = self.DRIVER_QEMU
if self.format:
@ -1181,7 +1178,7 @@ class VirtualDisk(VirtualDevice):
return False
return True
if (not self.is_remote() and
if (not self.conn.is_remote() and
self.path and
os.path.exists(self.path)):
return False
@ -1217,7 +1214,7 @@ class VirtualDisk(VirtualDevice):
storage_capable = util.is_storage_capable(self.conn)
if self.is_remote():
if self.conn.is_remote():
if not storage_capable:
raise ValueError(_("Connection doesn't support remote "
"storage."))
@ -1442,13 +1439,13 @@ class VirtualDisk(VirtualDevice):
if virtinst.enable_rhel6_defaults:
# Enable cache=none for non-CDROM devs
if (self.is_qemu() and
if (self.conn.is_qemu() and
not cache and
self.device != self.DEVICE_CDROM):
cache = self.CACHE_MODE_NONE
# Enable AIO native for block devices
if (self.is_qemu() and
if (self.conn.is_qemu() and
not iomode and
self.device == self.DEVICE_DISK and
self.type == self.TYPE_BLOCK):
@ -1466,7 +1463,7 @@ class VirtualDisk(VirtualDevice):
drvxml += " io='%s'" % iomode
if drvxml and self.driver_name is None:
if self.is_qemu():
if self.conn.is_qemu():
self.driver_name = "qemu"
if not self.driver_name is None:

View File

@ -154,12 +154,10 @@ class VirtualFilesystem(VirtualDevice):
def _get_target(self):
return self._target
def _set_target(self, val):
is_qemu = self.is_qemu()
# In case of qemu for default fs type (mount) target is not
# actually a directory, it is merely a arbitrary string tag
# that is exported to the guest as a hint for where to mount
if (is_qemu and
if (self.conn.is_qemu() and
(self.type == self.TYPE_DEFAULT or
self.type == self.TYPE_MOUNT)):
pass

View File

@ -98,7 +98,7 @@ class VirtualHostDevice(VirtualDevice):
return
self.managed = True
if self.is_xen():
if self.conn.is_xen():
self.managed = False

View File

@ -25,7 +25,6 @@ import threading
import libxml2
from virtinst import CapabilitiesParser
from virtinst import uriutil
from virtinst import util
_xml_refs_lock = threading.Lock()
@ -402,8 +401,6 @@ class XMLBuilderDomain(object):
@param caps: Capabilities() instance
"""
self._conn = conn
self._conn_uri = self._conn.getURI()
self.__remote = uriutil.is_uri_remote(self._conn_uri, conn=self._conn)
self.__caps = None
self._xml_node = None
@ -450,25 +447,11 @@ class XMLBuilderDomain(object):
return self._conn
conn = property(_get_conn)
def get_uri(self):
return self._conn_uri
def _get_caps(self):
if not self.__caps:
self.__caps = CapabilitiesParser.parse(self.conn.getCapabilities())
return self.__caps
def is_remote(self):
return bool(self.__remote)
def is_qemu(self):
return uriutil.is_qemu(self.conn, self.get_uri())
def is_qemu_system(self):
return uriutil.is_qemu_system(self.conn, self.get_uri())
def is_session_uri(self):
return uriutil.is_session_uri(self.conn, self.get_uri())
def is_xen(self):
return uriutil.is_xen(self.conn, self.get_uri())
def _check_bool(self, val, name):
if val not in [True, False]:
raise ValueError(_("'%s' must be True or False" % name))

View File

@ -34,7 +34,6 @@ import libvirt
from virtcli import cliconfig
import virtinst
from virtinst import uriutil
from virtinst import util
from virtinst.util import listify
@ -238,7 +237,7 @@ def getConnection(uri):
logging.debug("Requesting libvirt URI %s", (uri or "default"))
conn = virtinst.VirtualConnection(uri)
conn.open(_do_creds_authname)
logging.debug("Received libvirt URI %s", conn.getURI())
logging.debug("Received libvirt URI %s", conn.uri)
return conn
@ -300,7 +299,7 @@ def nice_exit():
def virsh_start_cmd(guest):
return ("virsh --connect %s start %s" % (guest.get_uri(), guest.name))
return ("virsh --connect %s start %s" % (guest.conn.uri, guest.name))
def install_fail(guest):
@ -700,8 +699,7 @@ def get_cpuset(guest, cpuset, memory):
def _default_network_opts(guest):
opts = ""
if (uriutil.is_qemu_session(guest.conn) or
uriutil.is_test(guest.conn)):
if (guest.conn.is_qemu_session() or guest.conn.is_test()):
opts = "user"
else:
net = util.default_network(guest.conn)

View File

@ -23,6 +23,7 @@ import re
import libvirt
from virtinst.cli import parse_optstr
from virtinst.util import uri_split
_virtinst_uri_magic = "__virtinst_test__"
@ -35,10 +36,22 @@ class VirtualConnection(object):
- simplified API wrappers that handle new and old ways of doing things
"""
def __init__(self, uri):
self._uri = uri
self._libvirtconn = None
self._initial_uri = uri or ""
self.is_virtinst_test_uri = uri and uri.startswith(_virtinst_uri_magic)
# virtinst unit test URI handling
if self._initial_uri.startswith(_virtinst_uri_magic):
uri = self._initial_uri.replace(_virtinst_uri_magic, "")
ret = uri.split(",", 1)
self._open_uri = ret[0]
self._test_opts = parse_optstr(len(ret) > 1 and ret[1] or "")
self._uri = self._virtinst_uri_make_fake()
else:
self._open_uri = self._initial_uri
self._uri = self._initial_uri
self._test_opts = {}
self._libvirtconn = None
self._urisplits = uri_split(self._uri)
# Just proxy virConnect access for now
@ -53,7 +66,10 @@ class VirtualConnection(object):
# Properties #
##############
uri = property(lambda self: getattr(self, "_uri"))
def _get_uri(self):
return self._uri or self._open_uri
uri = property(_get_uri)
libvirtconn = property(lambda self: getattr(self, "_libvirtconn"))
@ -63,6 +79,7 @@ class VirtualConnection(object):
def close(self):
self._libvirtconn = None
self._uri = None
def is_open(self):
return bool(self._libvirtconn)
@ -74,24 +91,63 @@ class VirtualConnection(object):
authcb = self._auth_cb
authcb_data = passwordcb
testopts = []
uri = self.uri
if self.is_virtinst_test_uri:
uri = uri.replace(_virtinst_uri_magic, "")
ret = uri.split(",", 1)
uri = ret[0]
testopts = parse_optstr(len(ret) > 1 and ret[1] or "")
conn = libvirt.openAuth(uri,
conn = libvirt.openAuth(self._open_uri,
[valid_auth_options, authcb,
(authcb_data, valid_auth_options)],
open_flags)
if testopts:
self._fixup_virtinst_test_uri(conn, testopts)
self._fixup_virtinst_test_uri(conn)
self._libvirtconn = conn
###################
# Public URI bits #
###################
def is_remote(self):
if (hasattr(self, "_virtinst__fake_conn_remote") or
self._urisplits[2]):
return True
def get_uri_hostname(self):
return self._urisplits[2] or "localhost"
def get_uri_transport(self):
scheme = self._urisplits[0]
username = self._urisplits[1]
offset = scheme.find("+")
if offset != -1:
return [scheme[offset + 1:], username]
return [None, None]
def get_uri_driver(self):
scheme = self._urisplits[0]
offset = scheme.find("+")
if offset > 0:
return scheme[:offset]
return scheme
def is_session_uri(self):
return self._urisplits[3] == "/session"
def is_qemu(self):
return self._urisplits[0].startswith("qemu")
def is_qemu_system(self):
return (self.is_qemu() and self._urisplits[3] == "/system")
def is_qemu_session(self):
return (self.is_qemu() and self.is_session_uri())
def is_test(self):
return self._urisplits[0].startswith("test")
def is_xen(self):
return (self._urisplits[0].startswith("xen") or
self._urisplits[0].startswith("libxl"))
def is_lxc(self):
return self._urisplits[0].startswith("lxc")
def is_openvz(self):
return self._urisplits[0].startswith("openvz")
def is_container(self):
return self.is_lxc() or self.is_openvz()
###################
@ -105,11 +161,24 @@ class VirtualConnection(object):
"%s" % (cred[0], passwordcreds))
return passwordcb(creds)
def _fixup_virtinst_test_uri(self, conn, opts):
def _virtinst_uri_make_fake(self):
if "qemu" in self._test_opts:
return "qemu+abc:///system"
elif "xen" in self._test_opts:
return "xen+abc:///"
elif "lxc" in self._test_opts:
return "lxc+abc:///"
return self._open_uri
def _fixup_virtinst_test_uri(self, conn):
"""
This hack allows us to fake various drivers via passing a magic
URI string to virt-*. Helps with testing
"""
if not self._test_opts:
return
opts = self._test_opts.copy()
def sanitize_xml(xml):
import difflib
@ -148,7 +217,12 @@ class VirtualConnection(object):
conn.getCapabilities = lambda: capsxml
if ("qemu" in opts) or ("xen" in opts) or ("lxc" in opts):
opts.pop("qemu", None)
opts.pop("xen", None)
opts.pop("lxc", None)
conn.getVersion = lambda: 10000000000
conn.getURI = self._virtinst_uri_make_fake
origcreate = conn.createLinux
origdefine = conn.defineXML
@ -161,15 +235,6 @@ class VirtualConnection(object):
conn.createLinux = newcreate
conn.defineXML = newdefine
if "qemu" in opts:
opts.pop("qemu")
conn.getURI = lambda: "qemu+abc:///system"
if "xen" in opts:
opts.pop("xen")
conn.getURI = lambda: "xen+abc:///"
if "lxc" in opts:
opts.pop("lxc")
conn.getURI = lambda: "lxc+abc:///"
# These need to come after the HV setter, since that sets a default
# conn version

View File

@ -20,7 +20,6 @@
# MA 02110-1301 USA.
import libvirt
from virtinst import uriutil
# Flags for check_conn_support
@ -296,18 +295,23 @@ def set_rhel6(val):
def get_rhel6():
return _rhel6
# Pull a connection object from the passed libvirt object
def _get_conn_from_object(obj):
if not hasattr(obj, "_conn"):
return obj
return getattr(obj, "_conn")
conn = None
if hasattr(obj, "getURI"):
conn = obj
elif hasattr(obj, "_conn"):
conn = getattr(obj, "_conn")
if conn:
from virtinst import VirtualConnection
return VirtualConnection(conn.getURI())
return obj
# Check that command is present in the python bindings, and return the
# the requested function
def _get_command(funcname, objname=None, obj=None):
if not obj:
obj = libvirt
@ -322,22 +326,19 @@ def _get_command(funcname, objname=None, obj=None):
return getattr(obj, funcname)
# Make sure libvirt object 'objname' has function 'funcname'
def _has_command(funcname, objname=None, obj=None):
return bool(_get_command(funcname, objname, obj))
# Make sure libvirt object has flag 'flag_name'
def _get_flag(flag_name):
return _get_command(flag_name)
# Try to call the passed function, and look for signs that libvirt or driver
# doesn't support it
def _try_command(func, args, check_all_error=False):
try:
func(*args)
@ -355,16 +356,14 @@ def _try_command(func, args, check_all_error=False):
return True
# Version of the local libvirt library
def _local_lib_ver():
return libvirt.getVersion()
# Version of libvirt library/daemon on the connection (could be remote)
def _daemon_lib_ver(conn, uri, force_version, minimum_libvirt_version):
def _daemon_lib_ver(conn, is_remote, force_version, minimum_libvirt_version):
# Always force the required version if it's after the version which
# has getLibVersion
if force_version or minimum_libvirt_version >= 7004:
@ -372,7 +371,7 @@ def _daemon_lib_ver(conn, uri, force_version, minimum_libvirt_version):
else:
default_ret = 100000000000
if not uriutil.is_uri_remote(uri, conn=conn):
if not is_remote:
return _local_lib_ver()
if not _has_command("getLibVersion", obj=conn):
@ -386,8 +385,7 @@ def _daemon_lib_ver(conn, uri, force_version, minimum_libvirt_version):
# Return the hypervisor version
def _hv_ver(conn, uri):
drv_type = uriutil.get_uri_driver(uri)
def _hv_ver(conn, drv_type):
args = ()
cmd = _get_command("getVersion", obj=conn)
@ -436,6 +434,9 @@ def _check_support(conn, feature, data=None):
@returns: True if feature is supported, False otherwise
"""
is_remote = conn.is_remote()
drv_type = conn.get_uri_driver()
# Temporary hack to make this work
if "VirtualConnection" in repr(conn):
conn = getattr(conn, "_libvirtconn")
@ -450,8 +451,6 @@ def _check_support(conn, feature, data=None):
key_list.remove(key)
return support_info.get(key)
uri = conn.getURI()
drv_type = uriutil.get_uri_driver(uri)
is_rhel6 = get_rhel6()
force_version = get_value("force_version") or False
@ -473,9 +472,9 @@ def _check_support(conn, feature, data=None):
flag = get_value("flag")
actual_lib_ver = _local_lib_ver()
actual_daemon_ver = _daemon_lib_ver(conn, uri, force_version,
actual_daemon_ver = _daemon_lib_ver(conn, is_remote, force_version,
minimum_libvirt_version)
actual_drv_ver = _hv_ver(conn, uri)
actual_drv_ver = _hv_ver(conn, drv_type)
# Make sure there are no keys left in the key_list. This will
# ensure we didn't mistype anything above, or in the support_dict

View File

@ -1,178 +0,0 @@
#
# Copyright 2006-2013 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA.
#
import logging
def uri_split(uri):
"""
Parse a libvirt hypervisor uri into it's individual parts
@returns: tuple of the form (scheme (ex. 'qemu', 'xen+ssh'), username,
hostname, path (ex. '/system'), query,
fragment)
"""
def splitnetloc(url, start=0):
for c in '/?#': # the order is important!
delim = url.find(c, start)
if delim >= 0:
break
else:
delim = len(url)
return url[start:delim], url[delim:]
username = netloc = query = fragment = ''
i = uri.find(":")
if i > 0:
scheme, uri = uri[:i].lower(), uri[i + 1:]
if uri[:2] == '//':
netloc, uri = splitnetloc(uri, 2)
offset = netloc.find("@")
if offset > 0:
username = netloc[0:offset]
netloc = netloc[offset + 1:]
if '#' in uri:
uri, fragment = uri.split('#', 1)
if '?' in uri:
uri, query = uri.split('?', 1)
else:
scheme = uri.lower()
return scheme, username, netloc, uri, query, fragment
def is_uri_remote(uri, conn=None):
if conn and hasattr(conn, "_virtinst__fake_conn_remote"):
# Testing hack
return True
try:
split_uri = uri_split(uri)
netloc = split_uri[2]
if netloc == "":
return False
return True
except Exception, e:
logging.exception("Error parsing URI in is_remote: %s", e)
return True
def get_uri_hostname(uri):
try:
split_uri = uri_split(uri)
netloc = split_uri[2]
if netloc != "":
return netloc
except Exception, e:
logging.warning("Cannot parse URI %s: %s", uri, str(e))
return "localhost"
def get_uri_transport(uri):
try:
split_uri = uri_split(uri)
scheme = split_uri[0]
username = split_uri[1]
if scheme:
offset = scheme.index("+")
if offset > 0:
return [scheme[offset + 1:], username]
except:
pass
return [None, None]
def get_uri_driver(uri):
try:
split_uri = uri_split(uri)
scheme = split_uri[0]
if scheme:
offset = scheme.find("+")
if offset > 0:
return scheme[:offset]
return scheme
except Exception:
pass
return "xen"
def _get_uri_to_split(conn, uri):
if not conn and not uri:
return None
if type(conn) is str:
uri = conn
elif uri is None:
uri = conn.getURI()
return uri
def is_qemu_system(conn, uri=None):
uri = _get_uri_to_split(conn, uri)
if not uri:
return False
(scheme, ignore, ignore,
path, ignore, ignore) = uri_split(uri)
if path == "/system" and scheme.startswith("qemu"):
return True
return False
def is_qemu_session(conn):
return is_qemu(conn) and is_session_uri(conn)
def is_session_uri(conn, uri=None):
uri = _get_uri_to_split(conn, uri)
if not uri:
return False
(ignore, ignore, ignore,
path, ignore, ignore) = uri_split(uri)
return bool(path and path == "/session")
def is_test(conn, uri=None):
uri = _get_uri_to_split(conn, uri)
if not uri:
return False
scheme = uri_split(uri)[0]
return scheme.startswith("test")
def is_qemu(conn, uri=None):
uri = _get_uri_to_split(conn, uri)
if not uri:
return False
scheme = uri_split(uri)[0]
return scheme.startswith("qemu")
def is_xen(conn, uri=None):
uri = _get_uri_to_split(conn, uri)
if not uri:
return False
scheme = uri_split(uri)[0]
return scheme.startswith("xen")

View File

@ -27,8 +27,6 @@ import stat
import libvirt
import libxml2
from virtinst import uriutil
def listify(l):
if l is None:
@ -277,25 +275,28 @@ def generate_name(base, collision_cb, suffix="", lib_collision=True,
def default_bridge(conn):
if conn.is_remote():
return None
dev = default_route()
if not dev:
return None
if (dev is not None and uriutil.is_uri_remote(conn.getURI(), conn)):
# New style peth0 == phys dev, eth0 == bridge, eth0 == default route
if os.path.exists("/sys/class/net/%s/bridge" % dev):
return ["bridge", dev]
# New style peth0 == phys dev, eth0 == bridge, eth0 == default route
if os.path.exists("/sys/class/net/%s/bridge" % dev):
return ["bridge", dev]
# Old style, peth0 == phys dev, eth0 == netloop, xenbr0 == bridge,
# vif0.0 == netloop enslaved, eth0 == default route
try:
defn = int(dev[-1])
except:
defn = -1
if (defn >= 0 and
os.path.exists("/sys/class/net/peth%d/brport" % defn) and
os.path.exists("/sys/class/net/xenbr%d/bridge" % defn)):
return ["bridge", "xenbr%d" % defn]
# Old style, peth0 == phys dev, eth0 == netloop, xenbr0 == bridge,
# vif0.0 == netloop enslaved, eth0 == default route
try:
defn = int(dev[-1])
except:
defn = -1
if (defn >= 0 and
os.path.exists("/sys/class/net/peth%d/brport" % defn) and
os.path.exists("/sys/class/net/xenbr%d/bridge" % defn)):
return ["bridge", "xenbr%d" % defn]
return None
@ -541,9 +542,36 @@ def lookup_pool_by_path(conn, path):
return None
def _test():
import doctest
doctest.testmod()
def uri_split(uri):
"""
Parse a libvirt hypervisor uri into it's individual parts
@returns: tuple of the form (scheme (ex. 'qemu', 'xen+ssh'), username,
hostname, path (ex. '/system'), query,
fragment)
"""
def splitnetloc(url, start=0):
for c in '/?#': # the order is important!
delim = url.find(c, start)
if delim >= 0:
break
else:
delim = len(url)
return url[start:delim], url[delim:]
if __name__ == "__main__":
_test()
username = netloc = query = fragment = ''
i = uri.find(":")
if i > 0:
scheme, uri = uri[:i].lower(), uri[i + 1:]
if uri[:2] == '//':
netloc, uri = splitnetloc(uri, 2)
offset = netloc.find("@")
if offset > 0:
username = netloc[0:offset]
netloc = netloc[offset + 1:]
if '#' in uri:
uri, fragment = uri.split('#', 1)
if '?' in uri:
uri, query = uri.split('?', 1)
else:
scheme = uri.lower()
return scheme, username, netloc, uri, query, fragment