engine: Switch tick threading model to use queues

And allow other parts of the API to request a priority tick that is
dispatched from the main tick thread.
This commit is contained in:
Cole Robinson
2013-07-06 20:03:42 -04:00
parent d427800f11
commit 76887c9a32
10 changed files with 81 additions and 64 deletions

View File

@@ -816,7 +816,7 @@ class vmmCloneVM(vmmGObjectUI):
self.err.show_err(error, details=details)
else:
self.close()
self.conn.tick(noStatsUpdate=True)
self.conn.schedule_priority_tick()
def _async_clone(self, asyncjob):
try:

View File

@@ -67,7 +67,9 @@ class vmmConnection(vmmGObject):
"mediadev-removed": (GObject.SignalFlags.RUN_FIRST, None, [str]),
"resources-sampled": (GObject.SignalFlags.RUN_FIRST, None, []),
"state-changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"connect-error": (GObject.SignalFlags.RUN_FIRST, None, [str, str, bool]),
"connect-error": (GObject.SignalFlags.RUN_FIRST, None,
[str, str, bool]),
"priority-tick": (GObject.SignalFlags.RUN_FIRST, None, [object]),
}
STATE_DISCONNECTED = 0
@@ -85,7 +87,6 @@ class vmmConnection(vmmGObject):
self.state = self.STATE_DISCONNECTED
self.connectThread = None
self.connectError = None
self._ticklock = threading.Lock()
self._backend = virtinst.VirtualConnection(self._uri)
self._caps = None
@@ -945,7 +946,7 @@ class vmmConnection(vmmGObject):
if self.state == self.STATE_ACTIVE:
logging.debug("%s capabilities:\n%s",
self.get_uri(), self.caps.xml)
self.tick()
self.schedule_priority_tick()
if self.state == self.STATE_DISCONNECTED:
if self.connectError:
@@ -1168,14 +1169,10 @@ class vmmConnection(vmmGObject):
ignore = obj
self.emit(signal, key)
def tick(self, noStatsUpdate=False):
try:
self._ticklock.acquire()
self._tick(noStatsUpdate)
finally:
self._ticklock.release()
def schedule_priority_tick(self, obj=None):
self.emit("priority-tick", obj or self)
def _tick(self, noStatsUpdate=False):
def tick(self, noStatsUpdate=False):
""" main update function: polls for new objects, updates stats, ..."""
if self.state != self.STATE_ACTIVE:
return

View File

@@ -18,8 +18,9 @@
# MA 02110-1301 USA.
#
import threading
import logging
import threading
import time
# pylint: disable=E0611
from gi.repository import GObject
@@ -1896,7 +1897,14 @@ class vmmCreate(vmmGObjectUI):
logging.debug("Install completed")
# Make sure we pick up the domain object
self.conn.tick(noStatsUpdate=True)
# Wait for VM to show up
self.conn.schedule_priority_tick()
count = 0
while (guest.uuid not in self.conn.vms) and (count < 100):
count += 1
time.sleep(.1)
vm = self.conn.get_vm(guest.uuid)
vm.tick()

View File

@@ -1124,8 +1124,7 @@ class vmmCreateInterface(vmmGObjectUI):
self.err.show_err(error,
details=details)
else:
# FIXME: Hmm, shouldn't we emit a signal here rather than do this?
self.conn.tick(noStatsUpdate=True)
self.conn.schedule_priority_tick()
self.close()
def do_install(self, asyncjob, activate):

View File

@@ -1013,7 +1013,7 @@ class vmmCreateNetwork(vmmGObjectUI):
self.err.show_err(_("Error creating virtual network: %s" % str(e)))
return
self.conn.tick(noStatsUpdate=True)
self.conn.schedule_priority_tick()
self.close()
def validate_name(self):

View File

@@ -452,6 +452,7 @@ class vmmCreatePool(vmmGObjectUI):
self.err.show_err(error,
details=details)
else:
self.conn.schedule_priority_tick()
self.close()
def _async_pool_create(self, asyncjob, build):

View File

@@ -158,13 +158,11 @@ class vmmDeleteDialog(vmmGObjectUI):
self.topwin.set_sensitive(True)
self.topwin.get_window().set_cursor(Gdk.Cursor.new(Gdk.CursorType.TOP_LEFT_ARROW))
conn = self.conn
if error is not None:
self.err.show_err(error, details=details)
conn.tick(noStatsUpdate=True)
self.conn.schedule_priority_tick()
self.close()
def _async_delete(self, asyncjob, paths):

View File

@@ -1404,9 +1404,6 @@ class vmmDomain(vmmLibvirtObject):
self._enable_net_poll = self.config.get_stats_enable_net_poll()
if self._enable_net_poll and len(self.record) > 1:
# resample the current value before calculating the rate in
# self.tick() otherwise we'd get a huge spike when switching
# from 0 to bytes_transfered_so_far
rxBytes, txBytes = self._sample_network_traffic()
self.record[0]["netRxKB"] = rxBytes / 1024
self.record[0]["netTxKB"] = txBytes / 1024
@@ -1415,9 +1412,6 @@ class vmmDomain(vmmLibvirtObject):
self._enable_disk_poll = self.config.get_stats_enable_disk_poll()
if self._enable_disk_poll and len(self.record) > 1:
# resample the current value before calculating the rate in
# self.tick() otherwise we'd get a huge spike when switching
# from 0 to bytes_transfered_so_far
rdBytes, wrBytes = self._sample_disk_io()
self.record[0]["diskRdKB"] = rdBytes / 1024
self.record[0]["diskWrKB"] = wrBytes / 1024

View File

@@ -25,8 +25,8 @@ from gi.repository import Gtk
# pylint: enable=E0611
import logging
import Queue
import threading
import os
import libvirt
import virtinst
@@ -54,7 +54,6 @@ from virtManager.delete import vmmDeleteDialog
# gtk3/pygobject has issues here as of Fedora 18
debug_ref_leaks = False
DETAILS_PERF = 1
DETAILS_CONFIG = 2
DETAILS_CONSOLE = 3
@@ -90,8 +89,13 @@ class vmmEngine(vmmGObject):
self.application.connect("activate", self._activate)
self._appwindow = Gtk.Window()
self._tick_thread = None
self._tick_counter = 0
self._tick_thread_slow = False
self._tick_thread = threading.Thread(name="Tick thread",
target=self._handle_tick_queue,
args=())
self._tick_thread.daemon = True
self._tick_queue = Queue.PriorityQueue(100)
self.inspection = None
self._create_inspection_thread()
@@ -116,6 +120,8 @@ class vmmEngine(vmmGObject):
self.schedule_timer()
self.load_stored_uris()
self._tick_thread.start()
self.tick()
@@ -260,48 +266,61 @@ class vmmEngine(vmmGObject):
self.timer = self.timeout_add(interval, self.tick)
def tick(self):
if self._tick_thread and self._tick_thread.isAlive():
def _add_obj_to_tick_queue(self, obj, isprio):
if self._tick_queue.full():
if not self._tick_thread_slow:
logging.debug("Tick is slow, not running at requested rate.")
self._tick_thread_slow = True
return 1
return
self._tick_thread = threading.Thread(name="Tick thread",
target=self._tick, args=())
self._tick_thread.daemon = True
self._tick_thread.start()
return 1
self._tick_counter += 1
self._tick_queue.put((isprio and 0 or 100,
self._tick_counter, obj))
def _tick(self):
def _schedule_priority_tick(self, conn, obj):
ignore = conn
self._add_obj_to_tick_queue(obj, True)
def tick(self):
for uri in self.conns.keys():
conn = self.conns[uri]["conn"]
try:
conn.tick()
except KeyboardInterrupt:
raise
except libvirt.libvirtError, e:
from_remote = getattr(libvirt, "VIR_FROM_REMOTE", None)
from_rpc = getattr(libvirt, "VIR_FROM_RPC", None)
sys_error = getattr(libvirt, "VIR_ERR_SYSTEM_ERROR", None)
dom = e.get_error_domain()
code = e.get_error_code()
if (dom in [from_remote, from_rpc] and
code in [sys_error]):
logging.exception("Could not refresh connection %s", uri)
logging.debug("Closing connection since libvirtd "
"appears to have stopped")
else:
error_msg = _("Error polling connection '%s': %s") \
% (conn.get_uri(), e)
self.idle_add(lambda: self.err.show_err(error_msg))
self.idle_add(conn.close)
self._add_obj_to_tick_queue(conn, False)
return 1
def _handle_tick_queue(self):
while True:
prio, ignore, obj = self._tick_queue.get()
self._tick_single_conn(obj, prio == 0)
self._tick_queue.task_done()
return 1
def _tick_single_conn(self, conn, noStatsUpdate):
try:
conn.tick(noStatsUpdate=noStatsUpdate)
except KeyboardInterrupt:
raise
except libvirt.libvirtError, e:
from_remote = getattr(libvirt, "VIR_FROM_REMOTE", None)
from_rpc = getattr(libvirt, "VIR_FROM_RPC", None)
sys_error = getattr(libvirt, "VIR_ERR_SYSTEM_ERROR", None)
dom = e.get_error_domain()
code = e.get_error_code()
if (dom in [from_remote, from_rpc] and
code in [sys_error]):
logging.exception("Could not refresh connection %s",
conn.get_uri())
logging.debug("Closing connection since libvirtd "
"appears to have stopped")
else:
error_msg = _("Error polling connection '%s': %s") \
% (conn.get_uri(), e)
self.idle_add(lambda: self.err.show_err(error_msg))
self.idle_add(conn.close)
def increment_window_counter(self, src):
ignore = src
self.windows += 1
@@ -430,7 +449,7 @@ class vmmEngine(vmmGObject):
conn.connect("vm-removed", self._do_vm_removed)
conn.connect("state-changed", self._do_conn_changed)
conn.tick()
conn.connect("priority-tick", self._schedule_priority_tick)
self.emit("conn-added", conn)
return conn
@@ -744,6 +763,7 @@ class vmmEngine(vmmGObject):
self._show_vm_helper(self.get_manager(), uri, uuid,
page=DETAILS_PERF, forcepage=True)
#######################################
# Domain actions run/destroy/save ... #
#######################################

View File

@@ -492,8 +492,8 @@ class vmmMigrateDialog(vmmGObjectUI):
self.err.show_err(error,
details=details)
else:
self.conn.tick(noStatsUpdate=True)
destconn.tick(noStatsUpdate=True)
self.conn.schedule_priority_tick()
destconn.schedule_priority_tick()
self.close()
def _async_set_max_downtime(self, vm, max_downtime, migrate_thread):