New parser for host and vms stats.

The data are cached for other http requests.
This commit is contained in:
wescoeur 2015-10-26 15:14:54 +01:00
parent d56cca7873
commit 6fbfece4ff
5 changed files with 546 additions and 189 deletions

View File

@ -107,6 +107,7 @@
"gulp-plumber": "^1.0.0",
"gulp-sourcemaps": "^1.5.1",
"gulp-watch": "^4.2.2",
"json5": "^0.4.0",
"leche": "^2.1.1",
"mocha": "^2.2.1",
"must": "^0.13.1",

View File

@ -283,97 +283,10 @@ installAllPatches.resolve = {
exports.installAllPatches = installAllPatches
#---------------------------------------------------------------------
points = {}
stats = $coroutine ({host, granularity}) ->
granularity = if granularity then granularity else 0
# granularity: 0: every 5 sec along last 10 minutes, 1: every minute along last 2 hours, 2: every hour along past week, 3: everyday along past year
# see http://xenserver.org/partners/developing-products-for-xenserver/18-sdk-development/96-xs-dev-rrds.html
if points[host.id] and (Date.now() - points[host.id].timestamp < 5000)
return points[host.id]
# select the AVERAGE values
granularity = {0:0, 1:1, 2:4, 3:7}[granularity]
xapi = @getXAPI host
{body} = response = yield got(
"https://#{host.address}/host_rrd?session_id=#{xapi.sessionId}",
{ rejectUnauthorized: false }
)
if response.statusCode isnt 200
throw new Error('Cannot fetch the RRDs')
json = parseXml(body)
# Find index of needed objects for getting their values after
cpusIndexes = []
pifsIndexes = []
memoryFreeIndex = []
memoryIndex = []
loadIndex = []
index = 0
$forEach(json.rrd.ds, (value, i) ->
if /^cpu[0-9]+$/.test(value.name)
cpusIndexes.push(i)
else if startsWith(value.name, 'pif_eth') && endsWith(value.name, '_tx')
pifsIndexes.push(i)
else if startsWith(value.name, 'pif_eth') && endsWith(value.name, '_rx')
pifsIndexes.push(i)
else if startsWith(value.name, 'loadavg')
loadIndex.push(i)
else if startsWith(value.name, 'memory_free_kib')
memoryFreeIndex.push(i)
else if startsWith(value.name, 'memory_total_kib')
memoryIndex.push(i)
return
)
memoryFree = []
memoryUsed = []
memory = []
load = []
cpus = []
pifs = []
date = []
archive = json.rrd.rra[granularity]
dateStep = json.rrd.step * archive.pdp_per_row
baseDate = json.rrd.lastupdate - (json.rrd.lastupdate % dateStep)
numStep = archive.database.row.length - 1
$forEach archive.database.row, (n, key) ->
memoryFree.push(Math.round(parseInt(n.v[memoryFreeIndex])))
memoryUsed.push(Math.round(parseInt(n.v[memoryIndex])-(n.v[memoryFreeIndex])))
memory.push(parseInt(n.v[memoryIndex]))
load.push(if n.v[loadIndex] == 'NaN' then null else n.v[loadIndex])
date.push(baseDate - (dateStep * (numStep - key)))
# build the multi dimensional arrays
$forEach cpusIndexes, (value, key) ->
cpus[key] ?= []
cpus[key].push(n.v[value]*100)
return
$forEach pifsIndexes, (value, key) ->
pifs[key] ?= []
pifs[key].push(if n.v[value] == 'NaN' then null else n.v[value]) # * (if key % 2 then -1 else 1))
return
return
points[host.id] = {
memoryFree: memoryFree
memoryUsed: memoryUsed
memory: memory
date: date
cpus: cpus
pifs: pifs
load: load
timestamp: Date.now()
}
# the final object
return points[host.id]
stats = yield @getXapiHostStats(host, granularity)
return stats
stats.description = 'returns statistic of the host'

View File

@ -776,107 +776,9 @@ detachPci.resolve = {
exports.detachPci = detachPci
#---------------------------------------------------------------------
points = {}
stats = $coroutine ({vm, granularity}) ->
granularity = if granularity then granularity else 0
# granularity: 0: every 5 sec along last 10 minutes, 1: every minute along last 2 hours, 2: every hour along past week, 3: everyday along past year
# see http://xenserver.org/partners/developing-products-for-xenserver/18-sdk-development/96-xs-dev-rrds.html
if points[vm.id] and (Date.now() - points[vm.id].timestamp < 5000)
return points[vm.id]
xapi = @getXAPI vm
host = @getObject vm.$container
do (type = host.type) =>
if type is 'pool'
host = @getObject host.master, 'host'
else unless type is 'host'
throw new Error "unexpected type: got #{type} instead of host"
{body} = response = yield got(
"https://#{host.address}/vm_rrd?session_id=#{xapi.sessionId}&uuid=#{vm.id}",
{ rejectUnauthorized: false }
)
if response.statusCode isnt 200
throw new Error('Cannot fetch the RRDs')
json = parseXml(body)
# Find index of needed objects for getting their values after
cpusIndexes = []
vifsIndexes = []
xvdsIndexes = []
memoryFreeIndex = []
memoryIndex = []
index = 0
$forEach(json.rrd.ds, (value, i) ->
if /^cpu[0-9]+$/.test(value.name)
cpusIndexes.push(i)
else if startsWith(value.name, 'vif_') && endsWith(value.name, '_tx')
vifsIndexes.push(i)
else if startsWith(value.name, 'vif_') && endsWith(value.name, '_rx')
vifsIndexes.push(i)
else if startsWith(value.name, 'vbd_xvd') && endsWith(value.name, '_write', 14)
xvdsIndexes.push(i)
else if startsWith(value.name, 'vbd_xvd') && endsWith(value.name, '_read', 13)
xvdsIndexes.push(i)
else if startsWith(value.name, 'memory_internal_free')
memoryFreeIndex.push(i)
else if endsWith(value.name, 'memory')
memoryIndex.push(i)
return
)
memoryFree = []
memoryUsed = []
memory = []
cpus = []
vifs = []
xvds = []
date = []
archive = json.rrd.rra[granularity]
dateStep = json.rrd.step * archive.pdp_per_row
baseDate = json.rrd.lastupdate - (json.rrd.lastupdate % dateStep)
numStep = archive.database.row.length - 1
$forEach archive.database.row, (n, key) ->
# WARNING! memoryFree is in Kb not in b, memory is in b
memoryFree.push(n.v[memoryFreeIndex]*1024)
memoryUsed.push(Math.round(parseInt(n.v[memoryIndex])-(n.v[memoryFreeIndex]*1024)))
memory.push(parseInt(n.v[memoryIndex]))
date.push(baseDate - (dateStep * (numStep - key)))
# build the multi dimensional arrays
$forEach cpusIndexes, (value, key) ->
cpus[key] ?= []
cpus[key].push(n.v[value]*100)
return
$forEach vifsIndexes, (value, key) ->
vifs[key] ?= []
vifs[key].push(if n.v[value] == 'NaN' then null else n.v[value]) # * (if key % 2 then -1 else 1))
return
$forEach xvdsIndexes, (value, key) ->
xvds[key] ?= []
xvds[key].push(if n.v[value] == 'NaN' then null else n.v[value]) # * (if key % 2 then -1 else 1))
return
return
points[vm.id] = {
memoryFree: memoryFree
memoryUsed: memoryUsed
memory: memory
date: date
cpus: cpus
vifs: vifs
xvds: xvds
timestamp: Date.now()
}
# the final object
return points[vm.id]
stats = yield @getXapiVmStats(vm, granularity)
return stats
stats.params = {
id: { type: 'string' }

519
src/xapi-stats.js Normal file
View File

@ -0,0 +1,519 @@
import got from 'got'
import JSON5 from 'json5'
import { BaseError } from 'make-error'
const RRD_STEP_SECONDS = 5
const RRD_STEP_MINUTES = 60
const RRD_STEP_HOURS = 3600
const RRD_STEP_DAYS = 86400
const RRD_STEP_FROM_STRING = {
'seconds': RRD_STEP_SECONDS,
'minutes': RRD_STEP_MINUTES,
'hours': RRD_STEP_HOURS,
'days': RRD_STEP_DAYS
}
const RRD_POINTS_PER_STEP = {
[RRD_STEP_SECONDS]: 120,
[RRD_STEP_MINUTES]: 120,
[RRD_STEP_HOURS]: 168,
[RRD_STEP_DAYS]: 366
}
export class XapiStatsError extends BaseError {}
export class UnknownLegendFormat extends XapiStatsError {
constructor (line) {
super('Unknown legend line: ' + line)
}
}
export class FaultyGranularity extends XapiStatsError {
constructor (msg) {
super(msg)
}
}
// -------------------------------------------------------------------
// Utils
// -------------------------------------------------------------------
function makeUrl (hostname, sessionId, timestamp) {
return `https://${hostname}/rrd_updates?session_id=${sessionId}&start=${timestamp}&cf=AVERAGE&host=true&json=true`
}
// Return current local timestamp in seconds
function getCurrentTimestamp () {
return Date.now() / 1000
}
function convertNanToNull (value) {
return isNaN(value) ? null : value
}
// -------------------------------------------------------------------
// Stats
// -------------------------------------------------------------------
function getNewHostStats () {
return {
cpus: [],
pifs: {
rx: [],
tx: []
},
load: [],
memory: [],
memoryFree: [],
memoryUsed: []
}
}
function getNewVmStats () {
return {
cpus: [],
vifs: {
rx: [],
tx: []
},
xvds: {
r: {},
w: {}
},
memory: [],
memoryFree: [],
memoryUsed: []
}
}
// -------------------------------------------------------------------
// Stats legends
// -------------------------------------------------------------------
function getNewHostLegends () {
return {
cpus: [],
pifs: {
rx: [],
tx: []
},
load: null,
memoryFree: null,
memory: null
}
}
function getNewVmLegends () {
return {
cpus: [],
vifs: {
rx: [],
tx: []
},
xvds: {
r: [],
w: []
},
memoryFree: null,
memory: null
}
}
// Compute one legend line for one host
function parseOneHostLegend (hostLegend, type, index) {
let resReg
if ((resReg = /^cpu([0-9]+)$/.exec(type)) !== null) {
hostLegend.cpus[resReg[1]] = index
} else if ((resReg = /^pif_eth([0-9]+)_(rx|tx)$/.exec(type)) !== null) {
if (resReg[2] === 'rx') {
hostLegend.pifs.rx[resReg[1]] = index
} else {
hostLegend.pifs.tx[resReg[1]] = index
}
} else if (type === 'loadavg') {
hostLegend.load = index
} else if (type === 'memory_free_kib') {
hostLegend.memoryFree = index
} else if (type === 'memory_total_kib') {
hostLegend.memory = index
}
}
// Compute one legend line for one vm
function parseOneVmLegend (vmLegend, type, index) {
let resReg
if ((resReg = /^cpu([0-9]+)$/.exec(type)) !== null) {
vmLegend.cpus[resReg[1]] = index
} else if ((resReg = /^vif_([0-9]+)_(rx|tx)$/.exec(type)) !== null) {
if (resReg[2] === 'rx') {
vmLegend.vifs.rx[resReg[1]] = index
} else {
vmLegend.vifs.tx[resReg[1]] = index
}
} else if ((resReg = /^vbd_xvd(.)_(read|write)$/.exec(type))) {
if (resReg[2] === 'read') {
vmLegend.xvds.r[resReg[1]] = index
} else {
vmLegend.xvds.w[resReg[1]] = index
}
} else if (type === 'memory_internal_free') {
vmLegend.memoryFree = index
} else if (type.endsWith('memory')) {
vmLegend.memory = index
}
}
// Compute Stats Legends for host and vms from RRD update
function parseLegends (json) {
const hostLegends = getNewHostLegends()
const vmsLegends = {}
json.meta.legend.forEach((value, index) => {
const parsedLine = /^AVERAGE:(host|vm):(.+):(.+)$/.exec(value)
if (parsedLine === null) {
throw new UnknownLegendFormat(value)
}
const [ , name, uuid, type, , ] = parsedLine
if (name !== 'vm') {
parseOneHostLegend(hostLegends, type, index)
} else {
if (vmsLegends[uuid] === undefined) {
vmsLegends[uuid] = getNewVmLegends()
}
parseOneVmLegend(vmsLegends[uuid], type, index)
}
})
return [hostLegends, vmsLegends]
}
export default class XapiStats {
constructor () {
this._vms = {}
this._hosts = {}
}
// -------------------------------------------------------------------
// Remove stats (Helper)
// -------------------------------------------------------------------
_removeOlderStats (source, dest, pointsPerStep) {
for (const key in source) {
if (key === 'cpus') {
for (const cpuIndex in source.cpus) {
dest.cpus[cpuIndex].splice(0, dest.cpus[cpuIndex].length - pointsPerStep)
}
// If the number of cpus has been decreased, remove !
let offset
if ((offset = dest.cpus.length - source.cpus.length) > 0) {
dest.cpus.splice(-offset)
}
} else if (key.endsWith('ifs')) {
// For each pif or vif
for (const ifType in source[key]) {
for (const pifIndex in source[key][ifType]) {
dest[key][ifType][pifIndex].splice(0, dest[key][ifType][pifIndex].length - pointsPerStep)
}
// If the number of pifs has been decreased, remove !
let offset
if ((offset = dest[key][ifType].length - source[key][ifType].length) > 0) {
dest[key][ifType].splice(-offset)
}
}
} else if (key === 'xvds') {
for (const xvdType in source.xvds) {
for (const xvdLetter in source.xvds[xvdType]) {
dest.xvds[xvdType][xvdLetter].splice(0, dest.xvds[xvdType][xvdLetter].length - pointsPerStep)
}
// If the number of xvds has been decreased, remove !
// FIXME
}
} else if (key === 'load') {
dest.load.splice(0, dest[key].length - pointsPerStep)
} else if (key === 'memory') {
// Load, memory, memoryFree, memoryUsed
const length = dest.memory.length - pointsPerStep
dest.memory.splice(0, length)
dest.memoryFree.splice(0, length)
dest.memoryUsed.splice(0, length)
}
}
}
// -------------------------------------------------------------------
// HOST: Computation and stats update
// -------------------------------------------------------------------
// Compute one stats row for one host
_parseRowHostStats (hostLegends, hostStats, values) {
// Cpus
hostLegends.cpus.forEach((cpuIndex, index) => {
if (hostStats.cpus[index] === undefined) {
hostStats.cpus[index] = []
}
hostStats.cpus[index].push(values[cpuIndex] * 100)
})
// Pifs
for (const pifType in hostLegends.pifs) {
hostLegends.pifs[pifType].forEach((pifIndex, index) => {
if (hostStats.pifs[pifType][index] === undefined) {
hostStats.pifs[pifType][index] = []
}
hostStats.pifs[pifType][index].push(convertNanToNull(values[pifIndex]))
})
}
// Load
hostStats.load.push(convertNanToNull(values[hostLegends.load]))
// Memory
const memory = values[hostLegends.memory]
const memoryFree = values[hostLegends.memoryFree]
hostStats.memory.push(memory)
if (hostLegends.memoryFree !== undefined) {
hostStats.memoryFree.push(memoryFree)
hostStats.memoryUsed.push(memory - memoryFree)
}
}
// Compute stats for host from RRD update
_parseHostStats (json, hostname, hostLegends, step) {
const host = this._hosts[hostname][step]
if (host.stats === undefined) {
host.stats = getNewHostStats()
}
for (const row of json.data) {
this._parseRowHostStats(hostLegends, host.stats, row.values)
}
}
// -------------------------------------------------------------------
// VM: Computation and stats update
// -------------------------------------------------------------------
// Compute stats for vms from RRD update
_parseRowVmStats (vmLegends, vmStats, values) {
// Cpus
vmLegends.cpus.forEach((cpuIndex, index) => {
if (vmStats.cpus[index] === undefined) {
vmStats.cpus[index] = []
}
vmStats.cpus[index].push(values[cpuIndex] * 100)
})
// Vifs
for (const vifType in vmLegends.vifs) {
vmLegends.vifs[vifType].forEach((vifIndex, index) => {
if (vmStats.vifs[vifType][index] === undefined) {
vmStats.vifs[vifType][index] = []
}
vmStats.vifs[vifType][index].push(convertNanToNull(values[vifIndex]))
})
}
// Xvds
for (const xvdType in vmLegends.xvds) {
for (const index in vmLegends.xvds[xvdType]) {
if (vmStats.xvds[xvdType][index] === undefined) {
vmStats.xvds[xvdType][index] = []
}
vmStats.xvds[xvdType][index].push(convertNanToNull(values[vmLegends.xvds[xvdType][index]]))
}
}
// Memory
// WARNING! memoryFree is in Kb not in b, memory is in b
const memory = values[vmLegends.memory]
const memoryFree = values[vmLegends.memoryFree] * 1024
vmStats.memory.push(memory)
if (vmLegends.memoryFree !== undefined) {
vmStats.memoryFree.push(memoryFree)
vmStats.memoryUsed.push(memory - memoryFree)
}
}
// Compute stats for vms
_parseVmsStats (json, hostname, vmsLegends, step) {
if (this._vms[hostname][step] === undefined) {
this._vms[hostname][step] = {}
}
const vms = this._vms[hostname][step]
for (const uuid in vmsLegends) {
if (vms[uuid] === undefined) {
vms[uuid] = getNewVmStats()
}
}
for (const row of json.data) {
for (const uuid in vmsLegends) {
this._parseRowVmStats(vmsLegends[uuid], vms[uuid], row.values)
}
}
}
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// Execute one http request on a XenServer for get stats
// Return stats (Json format) or throws got exception
async _getJson (url) {
const response = await got(url, { rejectUnauthorized: false })
return JSON5.parse(response.body)
}
_getLastTimestamp (hostname, step) {
if (this._hosts[hostname][step] === undefined) {
return Math.floor(getCurrentTimestamp()) - step * RRD_POINTS_PER_STEP[step] + step
}
return this._hosts[hostname][step].endTimestamp
}
_getPoints (hostname, step, vmId) {
// Return host points
if (vmId === undefined) {
return this._hosts[hostname][step]
}
// Return vm points
const points = { endTimestamp: this._hosts[hostname][step].endTimestamp }
if (this._vms[hostname][step] !== undefined) {
points.stats = this._vms[hostname][step][vmId]
}
return points
}
async _getAndUpdatePoints (hostname, granularity, sessionId, vmId) {
console.log('my granularity: ' + granularity)
// Get granularity to use
const step = (granularity === undefined || granularity === 0)
? RRD_STEP_SECONDS : RRD_STEP_FROM_STRING[granularity]
if (step === undefined) {
throw new FaultyGranularity(`Unknown granularity: '${granularity}'. Use 'seconds', 'minutes', 'hours', or 'days'.`)
}
// Limit the number of http requests
if (this._hosts[hostname] === undefined) {
this._hosts[hostname] = {}
this._vms[hostname] = {}
}
if (this._hosts[hostname][step] !== undefined &&
this._hosts[hostname][step].localTimestamp + step > getCurrentTimestamp()) {
return this._getPoints(hostname, step, vmId)
}
// Check if we are in the good interval, use this._hosts[hostname][step].localTimestamp
// for avoid bad requests
// TODO
// Get json
const timestamp = this._getLastTimestamp(hostname, step)
let json = await this._getJson(makeUrl(hostname, sessionId, timestamp))
// Check if the granularity is linked to 5 seconds
// If it's not the case, we retry other url with the json timestamp
if (json.meta.step !== step) {
console.log(`RRD call: Expected step: ${json.meta.step}, received step: ${json.meta.step}. Retry with other timestamp`)
// Approximately: half points are asked
// FIXME: Not the best solution
json = await this._getJson(makeUrl(hostname, sessionId, json.meta.end - step * (RRD_POINTS_PER_STEP[step] / 2) + step))
if (json.meta.step !== step) {
throw new FaultyGranularity(`Unable to get the true granularity: json.meta.step`)
}
}
// Make new backup slot if necessary
if (this._hosts[hostname][step] === undefined) {
this._hosts[hostname][step] = {
endTimestamp: 0,
localTimestamp: 0
}
}
// It exists data
if (json.data.length !== 0) {
// Warning: Sometimes, the json.xport.meta.start value does not match with the
// timestamp of the oldest data value
// So, we use the timestamp of the oldest data value !
const startTimestamp = json.data[json.meta.rows - 1].t
// Remove useless data and reorder
// Note: Older values are at end of json.data.row
const parseOffset = (this._hosts[hostname][step].endTimestamp - startTimestamp + step) / step
json.data.splice(json.data.length - parseOffset)
json.data.reverse()
// It exists useful data
if (json.data.length > 0) {
const [hostLegends, vmsLegends] = parseLegends(json)
// Compute and update host/vms stats
this._parseVmsStats(json, hostname, vmsLegends, step)
this._parseHostStats(json, hostname, hostLegends, step)
// Remove older stats
this._removeOlderStats(hostLegends, this._hosts[hostname][step].stats, RRD_POINTS_PER_STEP[step])
for (const uuid in vmsLegends) {
this._removeOlderStats(vmsLegends[uuid], this._vms[hostname][step][uuid], RRD_POINTS_PER_STEP[step])
}
}
}
// Update timestamp
this._hosts[hostname][step].endTimestamp = json.meta.end
this._hosts[hostname][step].localTimestamp = getCurrentTimestamp()
return this._getPoints(hostname, step, vmId)
}
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// Warning: This functions returns one reference on internal data
// So, data can be changed by a parallel call on this functions
// It is forbidden to modify the returned data
// Return host stats
async getHostPoints (hostname, granularity, sessionId) {
return this._getAndUpdatePoints(hostname, granularity, sessionId)
}
// Return vms stats
async getVmPoints (hostname, granularity, sessionId, vmId) {
return this._getAndUpdatePoints(hostname, granularity, sessionId, vmId)
}
}

View File

@ -22,6 +22,7 @@ import {EventEmitter} from 'events'
import * as xapiObjectsToXo from './xapi-objects-to-xo'
import Connection from './connection'
import Xapi from './xapi'
import XapiStats from './xapi-stats'
import {Acls} from './models/acl'
import {autobind} from './decorators'
import {
@ -118,6 +119,9 @@ export default class Xo extends EventEmitter {
// Connections to Xen servers.
this._xapis = createRawObject()
// Stats utils.
this._xapiStats = new XapiStats()
// Connections to users.
this._nextConId = 0
this._connections = createRawObject()
@ -912,6 +916,24 @@ export default class Xo extends EventEmitter {
return xapi
}
getXapiVmStats (vm, granularity) {
const xapi = this.getXAPI(vm)
let host = this.getObject(vm.$container)
if (host.type === 'pool') {
host = this.getObject(host.master, 'host')
} else if (host.type !== 'host') {
throw new Error(`unexpected type: got ${host.type} instead of host`)
}
return this._xapiStats.getVmPoints(host.address, granularity, xapi.sessionId, vm.id)
}
getXapiHostStats (host, granularity) {
const xapi = this.getXAPI(host)
return this._xapiStats.getHostPoints(host.address, granularity, xapi.sessionId)
}
async mergeXenPools (sourceId, targetId, force = false) {
const sourceXapi = this.getXAPI(sourceId)
const {