chore(xo-server/XapiStats): improve the cache implementation (#3859)
This commit is contained in:
@@ -3,10 +3,10 @@ import limitConcurrency from 'limit-concurrency-decorator'
|
||||
import synchronized from 'decorator-synchronized'
|
||||
import { BaseError } from 'make-error'
|
||||
import {
|
||||
defaults,
|
||||
endsWith,
|
||||
findKey,
|
||||
forEach,
|
||||
get,
|
||||
identity,
|
||||
map,
|
||||
mapValues,
|
||||
@@ -52,11 +52,6 @@ const RRD_POINTS_PER_STEP = {
|
||||
// Utils
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Return current local timestamp in seconds
|
||||
function getCurrentTimestamp() {
|
||||
return Date.now() / 1000
|
||||
}
|
||||
|
||||
function convertNanToNull(value) {
|
||||
return isNaN(value) ? null : value
|
||||
}
|
||||
@@ -78,23 +73,8 @@ const computeValues = (dataRow, legendIndex, transformValue = identity) =>
|
||||
const combineStats = (stats, path, combineValues) =>
|
||||
zipWith(...map(stats, path), (...values) => combineValues(values))
|
||||
|
||||
// It browse the object in depth and initialise it's properties
|
||||
// The targerPath can be a string or an array containing the depth
|
||||
// targetPath: [a, b, c] => a.b.c
|
||||
const getValuesFromDepth = (obj, targetPath) => {
|
||||
if (typeof targetPath === 'string') {
|
||||
return (obj[targetPath] = [])
|
||||
}
|
||||
|
||||
forEach(targetPath, (path, key) => {
|
||||
if (obj[path] === undefined) {
|
||||
obj = obj[path] = targetPath.length - 1 === key ? [] : {}
|
||||
return
|
||||
}
|
||||
obj = obj[path]
|
||||
})
|
||||
return obj
|
||||
}
|
||||
const createGetProperty = (obj, property, defaultValue) =>
|
||||
defaults(obj, { [property]: defaultValue })[property]
|
||||
|
||||
const testMetric = (test, type) =>
|
||||
typeof test === 'string'
|
||||
@@ -297,45 +277,26 @@ export default class XapiStats {
|
||||
.then(response => response.readAll().then(JSON5.parse))
|
||||
}
|
||||
|
||||
async _getNextTimestamp(xapi, host, step) {
|
||||
const currentTimeStamp = await getServerTimestamp(xapi, host.$ref)
|
||||
const maxDuration = step * RRD_POINTS_PER_STEP[step]
|
||||
const lastTimestamp = get(this._statsByObject, [
|
||||
host.uuid,
|
||||
step,
|
||||
'endTimestamp',
|
||||
])
|
||||
// To avoid multiple requests, we keep a cash for the stats and
|
||||
// only return it if we not exceed a step
|
||||
_getCachedStats(uuid, step, currentTimeStamp) {
|
||||
const statsByObject = this._statsByObject
|
||||
|
||||
if (
|
||||
lastTimestamp === undefined ||
|
||||
currentTimeStamp - lastTimestamp + step > maxDuration
|
||||
) {
|
||||
return currentTimeStamp - maxDuration + step
|
||||
}
|
||||
return lastTimestamp
|
||||
}
|
||||
|
||||
_getStats(hostUuid, step, vmUuid) {
|
||||
const hostStats = this._statsByObject[hostUuid][step]
|
||||
|
||||
// Return host stats
|
||||
if (vmUuid === undefined) {
|
||||
return {
|
||||
interval: step,
|
||||
...hostStats,
|
||||
}
|
||||
const stats = statsByObject[uuid]?.[step]
|
||||
if (stats === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// Return vm stats
|
||||
return {
|
||||
interval: step,
|
||||
endTimestamp: hostStats.endTimestamp,
|
||||
...this._statsByObject[vmUuid][step],
|
||||
if (stats.endTimestamp + step < currentTimeStamp) {
|
||||
delete statsByObject[uuid][step]
|
||||
return
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
@synchronized.withKey((_, { host }) => host.uuid)
|
||||
async _getAndUpdateStats(xapi, { host, vmUuid, granularity }) {
|
||||
async _getAndUpdateStats(xapi, { host, uuid, granularity }) {
|
||||
const step =
|
||||
granularity === undefined
|
||||
? RRD_STEP_SECONDS
|
||||
@@ -347,103 +308,93 @@ export default class XapiStats {
|
||||
)
|
||||
}
|
||||
|
||||
// Limit the number of http requests
|
||||
const hostUuid = host.uuid
|
||||
const currentTimeStamp = await getServerTimestamp(xapi, host.$ref)
|
||||
|
||||
if (
|
||||
!(
|
||||
vmUuid !== undefined &&
|
||||
get(this._statsByObject, [vmUuid, step]) === undefined
|
||||
) &&
|
||||
get(this._statsByObject, [hostUuid, step, 'localTimestamp']) + step >
|
||||
getCurrentTimestamp()
|
||||
) {
|
||||
return this._getStats(hostUuid, step, vmUuid)
|
||||
const stats = this._getCachedStats(uuid, step, currentTimeStamp)
|
||||
if (stats !== undefined) {
|
||||
return stats
|
||||
}
|
||||
|
||||
const timestamp = await this._getNextTimestamp(xapi, host, step)
|
||||
const json = await this._getJson(xapi, host, timestamp, step)
|
||||
if (json.meta.step !== step) {
|
||||
const maxDuration = step * RRD_POINTS_PER_STEP[step]
|
||||
|
||||
// To avoid crossing over the boundary, we ask for one less step
|
||||
const optimumTimestamp = currentTimeStamp - maxDuration + step
|
||||
const json = await this._getJson(xapi, host, optimumTimestamp, step)
|
||||
|
||||
const actualStep = json.meta.step
|
||||
if (json.data.length > 0) {
|
||||
// fetched data is organized from the newest to the oldest
|
||||
// but this implementation requires it in the other direction
|
||||
json.data.reverse()
|
||||
json.meta.legend.forEach((legend, index) => {
|
||||
const [, type, uuid, metricType] = /^AVERAGE:([^:]+):(.+):(.+)$/.exec(
|
||||
legend
|
||||
)
|
||||
|
||||
const metrics = STATS[type]
|
||||
if (metrics === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const { metric, testResult } = findMetric(metrics, metricType)
|
||||
if (metric === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const xoObjectStats = createGetProperty(this._statsByObject, uuid, {})
|
||||
let stepStats = xoObjectStats[actualStep]
|
||||
if (
|
||||
stepStats === undefined ||
|
||||
stepStats.endTimestamp !== json.meta.end
|
||||
) {
|
||||
stepStats = xoObjectStats[actualStep] = {
|
||||
endTimestamp: json.meta.end,
|
||||
interval: actualStep,
|
||||
}
|
||||
}
|
||||
|
||||
const path =
|
||||
metric.getPath !== undefined
|
||||
? metric.getPath(testResult)
|
||||
: [findKey(metrics, metric)]
|
||||
|
||||
const lastKey = path.length - 1
|
||||
let metricStats = createGetProperty(stepStats, 'stats', {})
|
||||
path.forEach((property, key) => {
|
||||
if (key === lastKey) {
|
||||
metricStats[property] = computeValues(
|
||||
json.data,
|
||||
index,
|
||||
metric.transformValue
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
metricStats = createGetProperty(metricStats, property, {})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
if (actualStep !== step) {
|
||||
throw new FaultyGranularity(
|
||||
`Unable to get the true granularity: ${json.meta.step}`
|
||||
`Unable to get the true granularity: ${actualStep}`
|
||||
)
|
||||
}
|
||||
|
||||
// It exists data
|
||||
if (json.data.length !== 0) {
|
||||
// Warning: Sometimes, the json.xport.meta.start value does not match with the
|
||||
// timestamp of the oldest data value
|
||||
// So, we use the timestamp of the oldest data value !
|
||||
const startTimestamp = json.data[json.meta.rows - 1].t
|
||||
const endTimestamp = get(this._statsByObject, [
|
||||
hostUuid,
|
||||
step,
|
||||
'endTimestamp',
|
||||
])
|
||||
|
||||
const statsOffset = endTimestamp - startTimestamp + step
|
||||
if (endTimestamp !== undefined && statsOffset > 0) {
|
||||
const parseOffset = statsOffset / step
|
||||
// Remove useless data
|
||||
// Note: Older values are at end of json.data.row
|
||||
json.data.splice(json.data.length - parseOffset)
|
||||
return (
|
||||
this._statsByObject[uuid]?.[step] ?? {
|
||||
endTimestamp: currentTimeStamp,
|
||||
interval: step,
|
||||
stats: {},
|
||||
}
|
||||
|
||||
// It exists useful data
|
||||
if (json.data.length > 0) {
|
||||
// reorder data
|
||||
json.data.reverse()
|
||||
forEach(json.meta.legend, (legend, index) => {
|
||||
const [, type, uuid, metricType] = /^AVERAGE:([^:]+):(.+):(.+)$/.exec(
|
||||
legend
|
||||
)
|
||||
|
||||
const metrics = STATS[type]
|
||||
if (metrics === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const { metric, testResult } = findMetric(metrics, metricType)
|
||||
|
||||
if (metric === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const path =
|
||||
metric.getPath !== undefined
|
||||
? metric.getPath(testResult)
|
||||
: [findKey(metrics, metric)]
|
||||
|
||||
const metricValues = getValuesFromDepth(this._statsByObject, [
|
||||
uuid,
|
||||
step,
|
||||
'stats',
|
||||
...path,
|
||||
])
|
||||
|
||||
metricValues.push(
|
||||
...computeValues(json.data, index, metric.transformValue)
|
||||
)
|
||||
|
||||
// remove older Values
|
||||
metricValues.splice(
|
||||
0,
|
||||
metricValues.length - RRD_POINTS_PER_STEP[step]
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Update timestamp
|
||||
const hostStats = this._statsByObject[hostUuid][step]
|
||||
hostStats.endTimestamp = json.meta.end
|
||||
hostStats.localTimestamp = getCurrentTimestamp()
|
||||
return this._getStats(hostUuid, step, vmUuid)
|
||||
)
|
||||
}
|
||||
|
||||
getHostStats(xapi, hostId, granularity) {
|
||||
const host = xapi.getObject(hostId)
|
||||
return this._getAndUpdateStats(xapi, {
|
||||
host: xapi.getObject(hostId),
|
||||
host,
|
||||
uuid: host.uuid,
|
||||
granularity,
|
||||
})
|
||||
}
|
||||
@@ -457,7 +408,7 @@ export default class XapiStats {
|
||||
|
||||
return this._getAndUpdateStats(xapi, {
|
||||
host,
|
||||
vmUuid: vm.uuid,
|
||||
uuid: vm.uuid,
|
||||
granularity,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user