connection: tick: Operate on combined object lists

Makes the flow a bit simpler, and allows us to break out polling without
having to pass around a ton of lists.
This commit is contained in:
Cole Robinson
2015-04-10 12:52:42 -04:00
parent 5357b91402
commit 838baf6946
7 changed files with 162 additions and 137 deletions

View File

@@ -233,7 +233,7 @@ class vmmConnection(vmmGObject):
def fetch_all_vols():
ret = []
for pool in self.list_pools():
for vol in pool.get_volumes().values():
for vol in pool.get_volumes():
try:
ret.append(vol.get_xmlobj(refresh_if_nec=False))
except Exception, e:
@@ -560,7 +560,7 @@ class vmmConnection(vmmGObject):
def get_vol_by_path(self, path):
for pool in self.list_pools():
for vol in pool.get_volumes().values():
for vol in pool.get_volumes():
try:
if vol.get_target_path() == path:
return vol
@@ -996,41 +996,6 @@ class vmmConnection(vmmGObject):
# Tick/Update methods #
#######################
def _update_nets(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nets())
if not dopoll or not self.is_network_capable():
return {}, {}, keymap
return pollhelpers.fetch_nets(self._backend, keymap,
(lambda obj, key: vmmNetwork(self, obj, key)))
def _update_pools(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_pools())
if not dopoll or not self.is_storage_capable():
return {}, {}, keymap
return pollhelpers.fetch_pools(self._backend, keymap,
(lambda obj, key: vmmStoragePool(self, obj, key)))
def _update_interfaces(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_interfaces())
if not dopoll or not self.is_interface_capable():
return {}, {}, keymap
return pollhelpers.fetch_interfaces(self._backend, keymap,
(lambda obj, key: vmmInterface(self, obj, key)))
def _update_nodedevs(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nodedevs())
if not dopoll or not self.is_nodedev_capable():
return {}, {}, keymap
return pollhelpers.fetch_nodedevs(self._backend, keymap,
(lambda obj, key: vmmNodeDevice(self, obj, key)))
def _update_vms(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_vms())
if not dopoll:
return {}, {}, keymap
return pollhelpers.fetch_vms(self._backend, keymap,
(lambda obj, key: vmmDomain(self, obj, key)))
def _send_object_signals(self, new_objects, gone_objects,
finish_connecting):
"""
@@ -1083,48 +1048,6 @@ class vmmConnection(vmmGObject):
if finish_connecting:
self._change_state(self._STATE_ACTIVE)
def schedule_priority_tick(self, **kwargs):
# args/kwargs are what is passed to def tick()
if "stats_update" not in kwargs:
kwargs["stats_update"] = False
self.idle_emit("priority-tick", kwargs)
def tick(self, *args, **kwargs):
e = None
try:
self._tick(*args, **kwargs)
except KeyboardInterrupt:
raise
except Exception, e:
pass
if e is None:
return
from_remote = getattr(libvirt, "VIR_FROM_REMOTE", None)
from_rpc = getattr(libvirt, "VIR_FROM_RPC", None)
sys_error = getattr(libvirt, "VIR_ERR_SYSTEM_ERROR", None)
dom = -1
code = -1
if isinstance(e, libvirt.libvirtError):
dom = e.get_error_domain()
code = e.get_error_code()
logging.debug("Error polling connection %s",
self.get_uri(), exc_info=True)
if (dom in [from_remote, from_rpc] and
code in [sys_error]):
e = None
logging.debug("Not showing user error since libvirtd "
"appears to have stopped.")
self._schedule_close()
if e:
raise e # pylint: disable=raising-bad-type
def _refresh_new_objects(self, newlist):
if not newlist:
return
@@ -1138,7 +1061,7 @@ class vmmConnection(vmmGObject):
pool.refresh()
def _refresh_volumes(p):
for vol in p.get_volumes().values():
for vol in p.get_volumes():
vol.refresh_xml()
self._start_thread(_refresh_volumes,
"pool=%s refreshing xml for volumes" % pool.get_name(),
@@ -1150,6 +1073,72 @@ class vmmConnection(vmmGObject):
self._start_thread(cb,
"refreshing xml for new %s" % newlist[0].__class__)
def _update_nets(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nets())
if not dopoll or not self.is_network_capable():
return [], [], keymap.values()
return pollhelpers.fetch_nets(self._backend, keymap,
(lambda obj, key: vmmNetwork(self, obj, key)))
def _update_pools(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_pools())
if not dopoll or not self.is_storage_capable():
return [], [], keymap.values()
return pollhelpers.fetch_pools(self._backend, keymap,
(lambda obj, key: vmmStoragePool(self, obj, key)))
def _update_interfaces(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_interfaces())
if not dopoll or not self.is_interface_capable():
return [], [], keymap.values()
return pollhelpers.fetch_interfaces(self._backend, keymap,
(lambda obj, key: vmmInterface(self, obj, key)))
def _update_nodedevs(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nodedevs())
if not dopoll or not self.is_nodedev_capable():
return [], [], keymap.values()
return pollhelpers.fetch_nodedevs(self._backend, keymap,
(lambda obj, key: vmmNodeDevice(self, obj, key)))
def _update_vms(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_vms())
if not dopoll:
return [], [], keymap.values()
return pollhelpers.fetch_vms(self._backend, keymap,
(lambda obj, key: vmmDomain(self, obj, key)))
def _poll(self, pollvm, pollnet, pollpool, polliface, pollnodedev):
"""
Helper called from tick() to do necessary polling and return
the relevant object lists
"""
gone_objects = []
new_objects = []
preexisting_objects = []
def _process_objects(polloutput):
gone, new, master = polloutput
gone_objects.extend(gone)
new_objects.extend(new)
preexisting_objects.extend([o for o in master if o not in new])
return new
self._refresh_new_objects(_process_objects(
self._update_nets(pollnet)))
self._refresh_new_objects(_process_objects(
self._update_pools(pollpool)))
self._refresh_new_objects(_process_objects(
self._update_interfaces(polliface)))
self._refresh_new_objects(_process_objects(
self._update_nodedevs(pollnodedev)))
# These are refreshing in their __init__ method, because the
# data is wanted immediately
_process_objects(self._update_vms(pollvm))
return gone_objects, new_objects, preexisting_objects
def _tick(self, stats_update,
pollvm=False, pollnet=False,
pollpool=False, polliface=False,
@@ -1157,7 +1146,9 @@ class vmmConnection(vmmGObject):
force=False):
"""
main update function: polls for new objects, updates stats, ...
@force: Perform the requested polling even if async events are in use
:param force: Perform the requested polling even if async events
are in use
"""
finish_connecting = False
@@ -1172,6 +1163,8 @@ class vmmConnection(vmmGObject):
return
finish_connecting = True
# We need to set this before the event check, since stats polling
# is independent of events
if not pollvm:
stats_update = False
@@ -1182,45 +1175,28 @@ class vmmConnection(vmmGObject):
self.hostinfo = self._backend.getInfo()
(goneNets, newNets, nets) = self._update_nets(pollnet)
self._refresh_new_objects(newNets.values())
(gonePools, newPools, pools) = self._update_pools(pollpool)
self._refresh_new_objects(newPools.values())
(goneInterfaces,
newInterfaces, interfaces) = self._update_interfaces(polliface)
self._refresh_new_objects(newInterfaces.values())
(goneNodedevs,
newNodedevs, nodedevs) = self._update_nodedevs(pollnodedev)
self._refresh_new_objects(newNodedevs.values())
# These are refreshing in their __init__ method, because the
# data is wanted immediately
(goneVMs, newVMs, vms) = self._update_vms(pollvm)
gone_objects = (goneVMs.values() + goneNets.values() +
gonePools.values() + goneInterfaces.values() +
goneNodedevs.values())
new_objects = (newVMs.values() + newNets.values() +
newPools.values() + newInterfaces.values() +
newNodedevs.values())
gone_objects, new_objects, preexisting_objects = self._poll(
pollvm, pollnet, pollpool, polliface, pollnodedev)
self.idle_add(self._send_object_signals,
new_objects, gone_objects, finish_connecting)
ticklist = []
if pollvm or stats_update:
ticklist.extend(vms.values())
if pollnet:
ticklist.extend(nets.values())
if pollpool:
ticklist.extend(pools.values())
if polliface:
ticklist.extend(interfaces.values())
if pollnodedev:
ticklist.extend(nodedevs.values())
for obj in ticklist:
# Only tick() pre-existing objects, since new objects will be
# initialized asynchronously and tick() would be redundant
for obj in preexisting_objects:
try:
if obj.reports_stats() and stats_update:
pass
elif obj.__class__ is vmmDomain and not pollvm:
continue
elif obj.__class__ is vmmNetwork and not pollnet:
continue
elif obj.__class__ is vmmStoragePool and not pollpool:
continue
elif obj.__class__ is vmmInterface and not polliface:
continue
elif obj.__class__ is vmmNodeDevice and not pollnodedev:
continue
obj.tick(stats_update=stats_update)
except Exception, e:
logging.exception("Tick for %s failed", obj)
@@ -1234,11 +1210,10 @@ class vmmConnection(vmmGObject):
"Ignoring.")
if stats_update:
self._recalculate_stats(vms.values())
self._recalculate_stats(
[o for o in preexisting_objects if o.reports_stats()])
self.idle_emit("resources-sampled")
return 1
def _recalculate_stats(self, vms):
if not self._backend.is_open():
return
@@ -1303,6 +1278,48 @@ class vmmConnection(vmmGObject):
self.record.insert(0, newStats)
def schedule_priority_tick(self, **kwargs):
# args/kwargs are what is passed to def tick()
if "stats_update" not in kwargs:
kwargs["stats_update"] = False
self.idle_emit("priority-tick", kwargs)
def tick(self, *args, **kwargs):
e = None
try:
self._tick(*args, **kwargs)
except KeyboardInterrupt:
raise
except Exception, e:
pass
if e is None:
return
from_remote = getattr(libvirt, "VIR_FROM_REMOTE", None)
from_rpc = getattr(libvirt, "VIR_FROM_RPC", None)
sys_error = getattr(libvirt, "VIR_ERR_SYSTEM_ERROR", None)
dom = -1
code = -1
if isinstance(e, libvirt.libvirtError):
dom = e.get_error_domain()
code = e.get_error_code()
logging.debug("Error polling connection %s",
self.get_uri(), exc_info=True)
if (dom in [from_remote, from_rpc] and
code in [sys_error]):
e = None
logging.debug("Not showing user error since libvirtd "
"appears to have stopped.")
self._schedule_close()
if e:
raise e # pylint: disable=raising-bad-type
########################
# Stats getter methods #
########################

View File

@@ -432,6 +432,8 @@ class vmmDomain(vmmLibvirtObject):
# Misc API getter methods #
###########################
def reports_stats(self):
return True
def _using_events(self):
return self.conn.using_domain_events

View File

@@ -150,6 +150,8 @@ class vmmLibvirtObject(vmmGObject):
# for vmmDomain == "pollvm"
raise NotImplementedError()
def reports_stats(self):
return False
def _using_events(self):
return False
def _check_supports_isactive(self):

View File

@@ -405,13 +405,13 @@ class vmmStorageList(vmmGObjectUI):
def _populate_vols(self):
list_widget = self.widget("vol-list")
pool = self._current_pool()
vols = pool and pool.get_volumes() or {}
vols = pool and pool.get_volumes() or []
model = list_widget.get_model()
list_widget.get_selection().unselect_all()
model.clear()
for key in vols.keys():
vol = vols[key]
for vol in vols:
key = vol.get_connkey()
try:
path = vol.get_target_path()

View File

@@ -115,7 +115,7 @@ class vmmStoragePool(vmmLibvirtObject):
vmmLibvirtObject.__init__(self, conn, backend, key, StoragePool)
self._last_refresh_time = 0
self._volumes = {}
self._volumes = []
##########################
@@ -179,18 +179,22 @@ class vmmStoragePool(vmmLibvirtObject):
###################
def get_volumes(self):
return self._volumes
return self._volumes[:]
def get_volume(self, key):
return self._volumes[key]
for vol in self.get_volumes():
if vol.get_connkey() == key:
return vol
return None
def _update_volumes(self):
if not self.is_active():
self._volumes = {}
self._volumes = []
return
keymap = dict((o.get_connkey(), o) for o in self._volumes)
(ignore, ignore, allvols) = pollhelpers.fetch_volumes(
self.conn.get_backend(), self.get_backend(), self._volumes.copy(),
self.conn.get_backend(), self.get_backend(), keymap,
lambda obj, key: vmmStorageVolume(self.conn, obj, key))
self._volumes = allvols

View File

@@ -193,10 +193,10 @@ class VirtualConnection(object):
if key in self._fetch_cache:
return self._fetch_cache[key]
ignore, ignore, ret = pollhelpers.fetch_vms(self, {},
lambda obj, ignore: obj)
ignore, ignore, ret = pollhelpers.fetch_vms(
self, {}, lambda obj, ignore: obj)
ret = [Guest(weakref.ref(self), parsexml=obj.XMLDesc(0))
for obj in ret.values()]
for obj in ret]
if self.cache_object_fetch:
self._fetch_cache[key] = ret
return ret
@@ -214,10 +214,10 @@ class VirtualConnection(object):
if key in self._fetch_cache:
return self._fetch_cache[key]
ignore, ignore, ret = pollhelpers.fetch_pools(self, {},
lambda obj, ignore: obj)
ignore, ignore, ret = pollhelpers.fetch_pools(
self, {}, lambda obj, ignore: obj)
ret = [StoragePool(weakref.ref(self), parsexml=obj.XMLDesc(0))
for obj in ret.values()]
for obj in ret]
if self.cache_object_fetch:
self._fetch_cache[key] = ret
return ret
@@ -244,7 +244,7 @@ class VirtualConnection(object):
ignore, ignore, vols = pollhelpers.fetch_volumes(
self, pool, {}, lambda obj, ignore: obj)
for vol in vols.values():
for vol in vols:
try:
xml = vol.XMLDesc(0)
ret.append(StorageVolume(weakref.ref(self), parsexml=xml))
@@ -271,7 +271,7 @@ class VirtualConnection(object):
ignore, ignore, ret = pollhelpers.fetch_nodedevs(
self, {}, lambda obj, ignore: obj)
ret = [NodeDevice.parse(weakref.ref(self), obj.XMLDesc(0))
for obj in ret.values()]
for obj in ret]
if self.cache_object_fetch:
self._fetch_cache[key] = ret
return ret

View File

@@ -49,7 +49,7 @@ def _new_poll_helper(origmap, typename, listfunc, buildfunc):
current[connkey] = origmap[connkey]
del(origmap[connkey])
return (origmap, new, current)
return (origmap.values(), new.values(), current.values())
def _old_poll_helper(origmap, typename,
@@ -107,7 +107,7 @@ def _old_poll_helper(origmap, typename,
except:
logging.exception("Couldn't fetch %s '%s'", typename, name)
return (origmap, new, current)
return (origmap.values(), new.values(), current.values())
def fetch_nets(backend, origmap, build_func):