feat(xosan): beta 2 (#598)

This commit is contained in:
Nicolas Raynaud 2017-08-31 08:35:48 -07:00 committed by Julien Fontanet
parent 5e11e87a0c
commit 08d7c189eb
2 changed files with 574 additions and 228 deletions

View File

@ -1,106 +1,120 @@
import arp from 'arp-a'
import createLogger from 'debug'
import defer from 'golike-defer'
import execa from 'execa'
import fromPairs from 'lodash/fromPairs'
import fs from 'fs-extra'
import map from 'lodash/map'
import splitLines from 'split-lines'
import { tap } from 'promise-toolbox'
import { tap, delay } from 'promise-toolbox'
import {
includes,
isArray,
remove,
filter,
includes
range
} from 'lodash'
import {
asyncMap,
pFromCallback,
splitFirst
parseXml
} from '../utils'
const debug = createLogger('xo:xosan')
const SSH_KEY_FILE = 'id_rsa_xosan'
const NETWORK_PREFIX = '172.31.100.'
const XOSAN_VM_SYSTEM_DISK_SIZE = 10 * 1024 * 1024 * 1024
const VM_FIRST_NUMBER = 101
const GIGABYTE = 1024 * 1024 * 1024
const XOSAN_VM_SYSTEM_DISK_SIZE = 10 * GIGABYTE
const XOSAN_DATA_DISK_USEAGE_RATIO = 0.99
const XOSAN_MAX_DISK_SIZE = 2093050 * 1024 * 1024 // a bit under 2To
const CURRENTLY_CREATING_SRS = {}
export async function getVolumeInfo ({ sr }) {
function _getIPToVMDict (xapi, sr) {
const dict = {}
const data = xapi.xo.getData(sr, 'xosan_config')
if (data && data.nodes) {
data.nodes.forEach(conf => {
try {
dict[conf.brickName] = {vm: xapi.getObject(conf.vm.id), sr: conf.underlyingSr}
} catch (e) {
// pass
}
})
}
return dict
}
function _getGlusterEndpoint (sr) {
const xapi = this.getXapi(sr)
const giantIPtoVMDict = {}
const data = xapi.xo.getData(sr, 'xosan_config')
if (!data || !data.nodes) {
return null
}
const nodes = data.nodes
nodes.forEach(conf => {
giantIPtoVMDict[conf.vm.ip] = xapi.getObject(conf.vm.id)
})
const oneHostAndVm = nodes[0]
const resultCmd = await remoteSsh(xapi, {
host: xapi.getObject(oneHostAndVm.host),
address: oneHostAndVm.vm.ip
}, 'gluster volume info xosan')
const result = resultCmd['stdout']
return { xapi, data: data, hosts: map(data.nodes, node => xapi.getObject(node.host)), addresses: map(data.nodes, node => node.vm.ip) }
}
/*
Volume Name: xosan
Type: Disperse
Volume ID: 1d4d0e57-8b6b-43f9-9d40-c48be1df7548
Status: Started
Snapshot Count: 0
Number of Bricks: 1 x (2 + 1) = 3
Transport-type: tcp
Bricks:
Brick1: 192.168.0.201:/bricks/brick1/xosan1
Brick2: 192.168.0.202:/bricks/brick1/xosan1
Brick3: 192.168.0.203:/bricks/brick1/xosan1
Options Reconfigured:
client.event-threads: 16
server.event-threads: 16
performance.client-io-threads: on
nfs.disable: on
performance.readdir-ahead: on
transport.address-family: inet
features.shard: on
features.shard-block-size: 64MB
network.remote-dio: enable
cluster.eager-lock: enable
performance.io-cache: off
performance.read-ahead: off
performance.quick-read: off
performance.stat-prefetch: on
performance.strict-write-ordering: off
cluster.server-quorum-type: server
cluster.quorum-type: auto
*/
const info = fromPairs(
splitLines(result.trim()).map(line =>
splitFirst(line, ':').map(val => val.trim())
)
)
async function rateLimitedRetry (action, shouldRetry, retryCount = 20) {
let retryDelay = 500 * (1 + Math.random() / 20)
let result
while (retryCount > 0 && (result = await action()) && shouldRetry(result)) {
retryDelay *= 1.1
debug('waiting ' + retryDelay + 'ms and retrying')
await delay(retryDelay)
retryCount--
}
return result
}
const getNumber = item => +item.substr(5)
const brickKeys = filter(Object.keys(info), key => key.match(/^Brick[1-9]/)).sort((i1, i2) => getNumber(i1) - getNumber(i2))
export async function getVolumeInfo ({ sr, infoType }) {
const glusterEndpoint = this::_getGlusterEndpoint(sr)
// expected brickKeys : [ 'Brick1', 'Brick2', 'Brick3' ]
info['Bricks'] = brickKeys.map(key => {
const ip = info[key].split(':')[0]
return { config: info[key], ip: ip, vm: giantIPtoVMDict[ip] }
})
const entry = await pFromCallback(cb => arp.table(cb))
if (entry) {
const brick = info['Bricks'].find(element => element.config.split(':')[0] === entry.ip)
if (brick) {
brick.mac = entry.mac
function parseHeal (parsed) {
const bricks = []
parsed['healInfo']['bricks']['brick'].forEach(brick => {
bricks.push(brick)
if (brick['file'] && !isArray(brick['file'])) {
brick['file'] = [brick['file']]
}
})
return {commandStatus: true, result: {bricks}}
}
function parseStatus (parsed) {
const brickDictByUuid = {}
const volume = parsed['volStatus']['volumes']['volume']
volume['node'].forEach(node => {
brickDictByUuid[node.peerid] = brickDictByUuid[node.peerid] || []
brickDictByUuid[node.peerid].push(node)
})
return {
commandStatus: true,
result: {nodes: brickDictByUuid, tasks: volume['tasks']}
}
}
return info
function parseInfo (parsed) {
const volume = parsed['volInfo']['volumes']['volume']
volume['bricks'] = volume['bricks']['brick']
volume['options'] = volume['options']['option']
return {commandStatus: true, result: volume}
}
const infoTypes = {
heal: {command: 'heal xosan info', handler: parseHeal},
status: {command: 'status xosan', handler: parseStatus},
statusDetail: {command: 'status xosan detail', handler: parseStatus},
statusMem: {command: 'status xosan mem', handler: parseStatus},
info: {command: 'info xosan', handler: parseInfo}
}
const foundType = infoTypes[infoType]
if (!foundType) {
throw new Error('getVolumeInfo(): "' + infoType + '" is an invalid type')
}
const cmdShouldRetry =
result => !result['commandStatus'] && result.parsed && result.parsed['cliOutput']['opErrno'] === '30802'
const runCmd = async () => glusterCmd(glusterEndpoint, 'volume ' + foundType.command, true)
let commandResult = await rateLimitedRetry(runCmd, cmdShouldRetry)
return commandResult['commandStatus'] ? foundType.handler(commandResult.parsed['cliOutput']) : commandResult
}
getVolumeInfo.description = 'info on gluster volume'
@ -109,6 +123,9 @@ getVolumeInfo.permission = 'admin'
getVolumeInfo.params = {
sr: {
type: 'string'
},
infoType: {
type: 'string'
}
}
getVolumeInfo.resolve = {
@ -118,41 +135,8 @@ function floor2048 (value) {
return 2048 * Math.floor(value / 2048)
}
async function copyVm (xapi, originalVm, params) {
return { vm: await xapi.copyVm(originalVm, params.sr), params }
}
async function prepareGlusterVm (xapi, vmAndParam, xosanNetwork, increaseDataDisk = true) {
let vm = vmAndParam.vm
// refresh the object so that sizes are correct
const params = vmAndParam.params
const ip = params.xenstore_data['vm-data/ip']
const sr = xapi.getObject(params.sr.$id)
await xapi._waitObjectState(sr.$id, sr => Boolean(sr.$PBDs))
const host = sr.$PBDs[0].$host
const firstVif = vm.$VIFs[0]
if (xosanNetwork.$id !== firstVif.$network.$id) {
await xapi.call('VIF.move', firstVif.$ref, xosanNetwork.$ref)
}
await xapi.editVm(vm, {
name_label: params.name_label,
name_description: params.name_description
})
await xapi.call('VM.set_xenstore_data', vm.$ref, params.xenstore_data)
if (increaseDataDisk) {
const dataDisk = vm.$VBDs.map(vbd => vbd.$VDI).find(vdi => vdi && vdi.name_label === 'xosan_data')
const srFreeSpace = sr.physical_size - sr.physical_utilisation
// we use a percentage because it looks like the VDI overhead is proportional
const newSize = floor2048((srFreeSpace + dataDisk.virtual_size) * XOSAN_DATA_DISK_USEAGE_RATIO)
await xapi._resizeVdi(dataDisk, Math.min(newSize, XOSAN_MAX_DISK_SIZE))
}
await xapi.startVm(vm)
debug('waiting for boot of ', ip)
// wait until we find the assigned IP in the networks, we are just checking the boot is complete
const vmIsUp = vm => Boolean(vm.$guest_metrics && includes(vm.$guest_metrics.networks, ip))
vm = await xapi._waitObjectState(vm.$id, vmIsUp)
debug('booted ', ip)
return { address: ip, host, vm }
async function copyVm (xapi, originalVm, sr) {
return { sr, vm: await xapi.copyVm(originalVm, sr) }
}
async function callPlugin (xapi, host, command, params) {
@ -160,20 +144,70 @@ async function callPlugin (xapi, host, command, params) {
return JSON.parse(await xapi.call('host.call_plugin', host.$ref, 'xosan.py', command, params))
}
async function remoteSsh (xapi, hostAndAddress, cmd) {
const result = await callPlugin(xapi, hostAndAddress.host, 'run_ssh', {
destination: 'root@' + hostAndAddress.address,
cmd: cmd
})
if (result.exit !== 0) {
throw new Error('ssh error: ' + JSON.stringify(result))
async function remoteSsh (glusterEndpoint, cmd, ignoreError = false) {
let result
for (let address of glusterEndpoint.addresses) {
for (let host of glusterEndpoint.hosts) {
try {
result = await callPlugin(glusterEndpoint.xapi, host, 'run_ssh', {destination: 'root@' + address, cmd: cmd})
break
} catch (exception) {
if (exception['code'] !== 'HOST_OFFLINE') {
throw exception
}
}
}
debug(result.command.join(' '), '\n =>exit:', result.exit, '\n =>err :', result.stderr,
'\n =>out (1000 chars) :', result.stdout.substring(0, 1000))
// 255 seems to be ssh's own error codes.
if (result.exit !== 255) {
if (!ignoreError && result.exit !== 0) {
throw new Error('ssh error: ' + JSON.stringify(result))
}
return result
}
}
debug(result)
return result
throw new Error(result ? 'ssh error: ' + JSON.stringify(result) : 'no suitable SSH host: ' +
JSON.stringify(glusterEndpoint))
}
async function setPifIp (xapi, pif, address) {
await xapi.call('PIF.reconfigure_ip', pif.$ref, 'Static', address, '255.255.255.0', NETWORK_PREFIX + '1', '')
function findErrorMessage (commandResut) {
if (commandResut['exit'] === 0 && commandResut.parsed) {
const cliOut = commandResut.parsed['cliOutput']
if (cliOut['opErrstr'] && cliOut['opErrstr'].length) {
return cliOut['opErrstr']
}
// "peer probe" returns it's "already in peer" error in cliOutput/output
if (cliOut['output'] && cliOut['output'].length) {
return cliOut['output']
}
}
return commandResut['stderr'].length ? commandResut['stderr'] : commandResut['stdout']
}
async function glusterCmd (glusterEndpoint, cmd, ignoreError = false) {
const result = await remoteSsh(glusterEndpoint, `gluster --mode=script --xml ${cmd}`, true)
try {
result.parsed = parseXml(result['stdout'])
} catch (e) {
// pass, we never know if a message can be parsed or not, so we just try
}
if (result['exit'] === 0) {
const cliOut = result.parsed['cliOutput']
// we have found cases where opErrno is !=0 and opRet was 0, albeit the operation was an error.
result.commandStatus = cliOut['opRet'].trim() === '0' && cliOut['opErrno'].trim() === '0'
result.error = findErrorMessage(result)
} else {
result.commandStatus = false
// "gluster volume status" timeout error message is reported on stdout instead of stderr
result.error = findErrorMessage(result)
}
if (!ignoreError && !result.commandStatus) {
const error = new Error(`error in gluster "${result.error}"`)
error.result = result
throw error
}
return result
}
const createNetworkAndInsertHosts = defer.onFailure(async function ($onFailure, xapi, pif, vlan) {
@ -182,14 +216,15 @@ const createNetworkAndInsertHosts = defer.onFailure(async function ($onFailure,
name: 'XOSAN network',
description: 'XOSAN network',
pifId: pif._xapiId,
mtu: 9000,
mtu: pif.mtu,
vlan: +vlan
})
$onFailure(() => xapi.deleteNetwork(xosanNetwork))
await Promise.all(xosanNetwork.$PIFs.map(pif => setPifIp(xapi, pif, NETWORK_PREFIX + (hostIpLastNumber++))))
await Promise.all(xosanNetwork.$PIFs.map(pif => xapi.call('PIF.reconfigure_ip', pif.$ref, 'Static',
NETWORK_PREFIX + (hostIpLastNumber++), '255.255.255.0', NETWORK_PREFIX + '1', '')))
return xosanNetwork
})
async function getOrCreateSshKey (xapi) {
let sshKey = xapi.xo.getData(xapi.pool, 'xosan_ssh_key')
@ -212,7 +247,29 @@ async function getOrCreateSshKey (xapi) {
return sshKey
}
async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, glusterType, arbiter = null) {
const _probePoolAndWaitForPresence = defer.onFailure(async function ($onFailure, glusterEndpoint, addresses) {
await asyncMap(addresses, async (address) => {
await glusterCmd(glusterEndpoint, 'peer probe ' + address)
$onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
})
function shouldRetry (peers) {
for (let peer of peers) {
if (peer.state === '4') {
return true
}
if (peer.state === '6') {
throw new Error(`${peer.hostname} is not in pool ("${peer.stateStr}")`)
}
}
return false
}
const getPoolStatus = async () => (await glusterCmd(glusterEndpoint, 'pool list')).parsed.cliOutput.peerStatus.peer
return rateLimitedRetry(getPoolStatus, shouldRetry)
})
async function configureGluster (redundancy, ipAndHosts, glusterEndpoint, glusterType, arbiter = null) {
const configByType = {
replica_arbiter: {
creation: 'replica 3 arbiter 1',
@ -220,7 +277,7 @@ async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, g
},
replica: {
creation: 'replica ' + redundancy + ' ',
extra: ['gluster volume set xosan cluster.data-self-heal on']
extra: ['volume set xosan cluster.data-self-heal on']
},
disperse: {
creation: 'disperse ' + ipAndHosts.length + ' redundancy ' + redundancy + ' ',
@ -228,33 +285,32 @@ async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, g
}
}
let brickVms = arbiter ? ipAndHosts.concat(arbiter) : ipAndHosts
for (let i = 1; i < brickVms.length; i++) {
await remoteSsh(xapi, firstIpAndHost, 'gluster peer probe ' + brickVms[i].address)
}
await _probePoolAndWaitForPresence(glusterEndpoint, map(brickVms.slice(1), bv => bv.address))
const creation = configByType[glusterType].creation
const volumeCreation = 'gluster volume create xosan ' + creation +
' ' + brickVms.map(ipAndHost => (ipAndHost.address + ':/bricks/xosan/xosandir')).join(' ')
const volumeCreation = 'volume create xosan ' + creation + ' ' +
brickVms.map(ipAndHost => ipAndHost.brickName).join(' ')
debug('creating volume: ', volumeCreation)
await remoteSsh(xapi, firstIpAndHost, volumeCreation)
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan network.remote-dio enable')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan cluster.eager-lock enable')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.io-cache off')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.read-ahead off')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.quick-read off')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.strict-write-ordering off')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan client.event-threads 8')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan server.event-threads 8')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.io-thread-count 64')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.stat-prefetch on')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan features.shard on')
await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan features.shard-block-size 512MB')
await glusterCmd(glusterEndpoint, volumeCreation)
await glusterCmd(glusterEndpoint, 'volume set xosan network.remote-dio enable')
await glusterCmd(glusterEndpoint, 'volume set xosan cluster.eager-lock enable')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.io-cache off')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.read-ahead off')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.quick-read off')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.strict-write-ordering off')
await glusterCmd(glusterEndpoint, 'volume set xosan client.event-threads 8')
await glusterCmd(glusterEndpoint, 'volume set xosan server.event-threads 8')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.io-thread-count 64')
await glusterCmd(glusterEndpoint, 'volume set xosan performance.stat-prefetch on')
await glusterCmd(glusterEndpoint, 'volume set xosan features.shard on')
await glusterCmd(glusterEndpoint, 'volume set xosan features.shard-block-size 512MB')
for (const confChunk of configByType[glusterType].extra) {
await remoteSsh(xapi, firstIpAndHost, confChunk)
await glusterCmd(glusterEndpoint, confChunk)
}
await remoteSsh(xapi, firstIpAndHost, 'gluster volume start xosan')
await glusterCmd(glusterEndpoint, 'volume start xosan')
}
export const createSR = defer.onFailure(async function ($onFailure, { template, pif, vlan, srs, glusterType, redundancy }) {
export const createSR = defer.onFailure(async function ($onFailure, { template, pif, vlan, srs, glusterType,
redundancy, brickSize, memorySize }) {
if (!this.requestResource) {
throw new Error('requestResource is not a function')
}
@ -263,7 +319,7 @@ export const createSR = defer.onFailure(async function ($onFailure, { template,
return // TODO: throw an error
}
let vmIpLastNumber = 101
let vmIpLastNumber = VM_FIRST_NUMBER
const xapi = this.getXapi(srs[0])
if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) {
throw new Error('createSR is already running for this pool')
@ -275,86 +331,70 @@ export const createSR = defer.onFailure(async function ($onFailure, { template,
$onFailure(() => xapi.deleteNetwork(xosanNetwork))
const sshKey = await getOrCreateSshKey(xapi)
const srsObjects = map(srs, srId => xapi.getObject(srId))
const vmParameters = map(srs, srId => {
const sr = xapi.getObject(srId)
const host = sr.$PBDs[0].$host
return {
sr,
host,
name_label: `XOSAN - ${sr.name_label} - ${host.name_label}`,
name_description: 'Xosan VM storing data on volume ' + sr.name_label,
// the values of the xenstore_data object *have* to be string, don't forget.
xenstore_data: {
'vm-data/hostname': 'XOSAN' + sr.name_label,
'vm-data/sshkey': sshKey.public,
'vm-data/ip': NETWORK_PREFIX + (vmIpLastNumber++),
'vm-data/mtu': String(xosanNetwork.MTU),
'vm-data/vlan': String(vlan)
}
}
})
await Promise.all(vmParameters.map(vmParam => callPlugin(xapi, vmParam.host, 'receive_ssh_keys', {
await Promise.all(srsObjects.map(sr => callPlugin(xapi, sr.$PBDs[0].$host, 'receive_ssh_keys', {
private_key: sshKey.private,
public_key: sshKey.public,
force: 'true'
})))
const firstVM = await xapi.importVm(
await this.requestResource('xosan', template.id, template.version),
{ srId: vmParameters[0].sr.$ref, type: 'xva' }
)
$onFailure(() => xapi.deleteVm(firstVM))
await xapi.editVm(firstVM, {
autoPoweron: true
})
const copiedVms = await asyncMap(vmParameters.slice(1), param =>
copyVm(xapi, firstVM, param)::tap(({ vm }) =>
const firstSr = srsObjects[0]
const firstVM = await this::_importGlusterVM(xapi, template, firstSr)
$onFailure(() => xapi.deleteVm(firstVM, true))
const copiedVms = await asyncMap(srsObjects.slice(1), sr =>
copyVm(xapi, firstVM, sr)::tap(({ vm }) =>
$onFailure(() => xapi.deleteVm(vm))
)
)
const vmsAndParams = [{
const vmsAndSrs = [{
vm: firstVM,
params: vmParameters[0]
sr: firstSr
}].concat(copiedVms)
let arbiter = null
if (srs.length === 2) {
const sr = vmParameters[0].sr
const arbiterConfig = {
sr: sr,
host: vmParameters[0].host,
name_label: vmParameters[0].name_label + ' arbiter',
name_description: 'Xosan VM storing data on volume ' + sr.name_label,
xenstore_data: {
'vm-data/hostname': 'XOSAN' + sr.name_label + '_arb',
'vm-data/sshkey': sshKey.public,
'vm-data/ip': NETWORK_PREFIX + (vmIpLastNumber++),
'vm-data/mtu': String(xosanNetwork.MTU),
'vm-data/vlan': String(vlan)
}
}
const arbiterVm = await copyVm(xapi, firstVM, arbiterConfig)
$onFailure(() => xapi.deleteVm(arbiterVm.vm))
arbiter = await prepareGlusterVm(xapi, arbiterVm, xosanNetwork, false)
const sr = firstSr
const arbiterIP = NETWORK_PREFIX + (vmIpLastNumber++)
const arbiterVm = await xapi.copyVm(firstVM, sr)
$onFailure(() => xapi.deleteVm(arbiterVm, true))
arbiter = await _prepareGlusterVm(xapi, sr, arbiterVm, xosanNetwork, arbiterIP, {labelSuffix: '_arbiter',
increaseDataDisk: false,
memorySize})
arbiter.arbiter = true
}
const ipAndHosts = await Promise.all(map(vmsAndParams, vmAndParam => prepareGlusterVm(xapi, vmAndParam, xosanNetwork)))
const firstIpAndHost = ipAndHosts[0]
await configureGluster(redundancy, ipAndHosts, xapi, firstIpAndHost, glusterType, arbiter)
const ipAndHosts = await asyncMap(vmsAndSrs, vmAndSr => _prepareGlusterVm(xapi, vmAndSr.sr, vmAndSr.vm, xosanNetwork,
NETWORK_PREFIX + (vmIpLastNumber++), {maxDiskSize: brickSize, memorySize}))
const glusterEndpoint = { xapi, hosts: map(ipAndHosts, ih => ih.host), addresses: map(ipAndHosts, ih => ih.address) }
await configureGluster(redundancy, ipAndHosts, glusterEndpoint, glusterType, arbiter)
debug('xosan gluster volume started')
const config = { server: firstIpAndHost.address + ':/xosan', backupserver: ipAndHosts[1].address }
const xosanSr = await xapi.call('SR.create', srsObjects[0].$PBDs[0].$host.$ref, config, 0, 'XOSAN', 'XOSAN', 'xosan', '', true, {})
// We use 10 IPs of the gluster VM range as backup, in the hope that even if the first VM gets destroyed we find at least
// one VM to give mount the volfile.
// It is not possible to edit the device_config after the SR is created and this data is only used at mount time when rebooting
// the hosts.
const backupservers = map(range(VM_FIRST_NUMBER, VM_FIRST_NUMBER + 10), ipLastByte => NETWORK_PREFIX + ipLastByte).join(':')
const config = { server: ipAndHosts[0].address + ':/xosan', backupservers }
const xosanSrRef = await xapi.call('SR.create', firstSr.$PBDs[0].$host.$ref, config, 0, 'XOSAN', 'XOSAN',
'xosan', '', true, {})
// we just forget because the cleanup actions are stacked in the $onFailure system
$onFailure(() => xapi.forgetSr(xosanSrRef))
if (arbiter) {
ipAndHosts.push(arbiter)
}
// we just forget because the cleanup actions will be executed before.
$onFailure(() => xapi.forgetSr(xosanSr))
await xapi.xo.setData(xosanSr, 'xosan_config', {
nodes: ipAndHosts.map(param => ({
host: param.host.$id,
vm: { id: param.vm.$id, ip: param.address }
})),
network: xosanNetwork.$id
const nodes = ipAndHosts.map(param => ({
brickName: param.brickName,
host: param.host.$id,
vm: {id: param.vm.$id, ip: param.address},
underlyingSr: param.underlyingSr.$id,
arbiter: !!param['arbiter']
}))
await xapi.xo.setData(xosanSrRef, 'xosan_config', {
version: 'beta2',
nodes: nodes,
template: template,
network: xosanNetwork.$id,
type: glusterType,
redundancy
})
debug('scanning new SR')
await xapi.call('SR.scan', xosanSrRef)
} finally {
delete CURRENTLY_CREATING_SRS[xapi.pool.$id]
}
@ -388,6 +428,313 @@ createSR.resolve = {
pif: ['pif', 'PIF', 'administrate']
}
async function umountDisk (localEndpoint, diskMountPoint) {
await remoteSsh(localEndpoint, `killall -v -w /usr/sbin/xfs_growfs; fuser -v ${diskMountPoint}; umount ${diskMountPoint} && sed -i '\\_${diskMountPoint}\\S_d' /etc/fstab && rm -rf ${diskMountPoint}`)
}
async function createNewDisk (xapi, sr, vm, diskSize) {
const newDisk = await xapi.createVdi(diskSize, {sr: sr, name_label: 'xosan_data', name_description: 'Created by XO'})
await xapi.attachVdiToVm(newDisk, vm)
let vbd = await xapi._waitObjectState(newDisk.$id, disk => Boolean(disk.$VBDs.length)).$VBDs[0]
vbd = await xapi._waitObjectState(vbd.$id, vbd => Boolean(vbd.device.length))
return '/dev/' + vbd.device
}
async function mountNewDisk (localEndpoint, hostname, newDeviceFiledeviceFile) {
const brickRootCmd = 'bash -c \'mkdir -p /bricks; for TESTVAR in {1..9}; do TESTDIR="/bricks/xosan$TESTVAR" ;if mkdir $TESTDIR; then echo $TESTDIR; exit 0; fi ; done ; exit 1\''
const newBrickRoot = (await remoteSsh(localEndpoint, brickRootCmd)).stdout.trim()
const brickName = `${hostname}:${newBrickRoot}/xosandir`
const mountBrickCmd = `mkfs.xfs -i size=512 ${newDeviceFiledeviceFile}; mkdir -p ${newBrickRoot}; echo "${newDeviceFiledeviceFile} ${newBrickRoot} xfs defaults 0 0" >> /etc/fstab; mount -a`
await remoteSsh(localEndpoint, mountBrickCmd)
return brickName
}
async function replaceBrickOnSameVM (xosansr, previousBrick, newLvmSr, brickSize) {
// TODO: a bit of user input validation on 'previousBrick', it's going to ssh
const previousIp = previousBrick.split(':')[0]
brickSize = brickSize === undefined ? Infinity : brickSize
const xapi = this.getXapi(xosansr)
const data = xapi.xo.getData(xosansr, 'xosan_config')
const nodes = data.nodes
const nodeIndex = nodes.findIndex(node => node.vm.ip === previousIp)
const glusterEndpoint = this::_getGlusterEndpoint(xosansr)
const previousVM = _getIPToVMDict(xapi, xosansr)[previousBrick].vm
const newDeviceFile = await createNewDisk(xapi, newLvmSr, previousVM, brickSize)
const localEndpoint = {
xapi,
hosts: map(nodes, node => xapi.getObject(node.host)),
addresses: [previousIp]
}
const previousBrickRoot = previousBrick.split(':')[1].split('/').slice(0, 3).join('/')
const previousBrickDevice = (await remoteSsh(localEndpoint, `grep " ${previousBrickRoot} " /proc/mounts | cut -d ' ' -f 1 | sed 's_/dev/__'`)).stdout.trim()
const brickName = await mountNewDisk(localEndpoint, previousIp, newDeviceFile)
await glusterCmd(glusterEndpoint, `volume replace-brick xosan ${previousBrick} ${brickName} commit force`)
nodes[nodeIndex].brickName = brickName
nodes[nodeIndex].underlyingSr = newLvmSr
await xapi.xo.setData(xosansr, 'xosan_config', data)
await umountDisk(localEndpoint, previousBrickRoot)
const previousVBD = previousVM.$VBDs.find(vbd => vbd.device === previousBrickDevice)
await xapi.disconnectVbd(previousVBD)
await xapi.deleteVdi(previousVBD.VDI)
await xapi.call('SR.scan', xapi.getObject(xosansr).$ref)
}
export async function replaceBrick ({ xosansr, previousBrick, newLvmSr, brickSize, onSameVM = true }) {
if (onSameVM) {
return this::replaceBrickOnSameVM(xosansr, previousBrick, newLvmSr, brickSize)
}
// TODO: a bit of user input validation on 'previousBrick', it's going to ssh
const previousIp = previousBrick.split(':')[0]
brickSize = brickSize === undefined ? Infinity : brickSize
const xapi = this.getXapi(xosansr)
const nodes = xapi.xo.getData(xosansr, 'xosan_config').nodes
const newIpAddress = _findAFreeIPAddress(nodes)
const nodeIndex = nodes.findIndex(node => node.vm.ip === previousIp)
const stayingNodes = filter(nodes, (node, index) => index !== nodeIndex)
const glusterEndpoint = { xapi,
hosts: map(stayingNodes, node => xapi.getObject(node.host)),
addresses: map(stayingNodes, node => node.vm.ip) }
const previousVMEntry = _getIPToVMDict(xapi, xosansr)[previousBrick]
const arbiter = nodes[nodeIndex].arbiter
let { data, newVM, addressAndHost } = await this::insertNewGlusterVm(xapi, xosansr, newLvmSr,
{labelSuffix: arbiter ? '_arbiter' : '', glusterEndpoint, newIpAddress, increaseDataDisk: !arbiter, brickSize})
await glusterCmd(glusterEndpoint, `volume replace-brick xosan ${previousBrick} ${addressAndHost.brickName} commit force`)
await glusterCmd(glusterEndpoint, 'peer detach ' + previousIp)
data.nodes.splice(nodeIndex, 1, {
brickName: addressAndHost.brickName,
host: addressAndHost.host.$id,
arbiter: arbiter,
vm: {ip: addressAndHost.address, id: newVM.$id},
underlyingSr: newLvmSr
})
await xapi.xo.setData(xosansr, 'xosan_config', data)
if (previousVMEntry) {
await xapi.deleteVm(previousVMEntry.vm, true)
}
await xapi.call('SR.scan', xapi.getObject(xosansr).$ref)
}
replaceBrick.description = 'replaceBrick brick in gluster volume'
replaceBrick.permission = 'admin'
replaceBrick.params = {
xosansr: { type: 'string' },
previousBrick: { type: 'string' },
newLvmSr: { type: 'string' },
brickSize: { type: 'number' }
}
replaceBrick.resolve = {
xosansr: ['sr', 'SR', 'administrate']
}
async function _prepareGlusterVm (xapi, lvmSr, newVM, xosanNetwork, ipAddress, {labelSuffix = '', increaseDataDisk = true,
maxDiskSize = Infinity, memorySize = 2 * GIGABYTE}) {
const host = lvmSr.$PBDs[0].$host
const xenstoreData = {
'vm-data/hostname': 'XOSAN' + lvmSr.name_label + labelSuffix,
'vm-data/sshkey': (await getOrCreateSshKey(xapi)).public,
'vm-data/ip': ipAddress,
'vm-data/mtu': String(xosanNetwork.MTU),
'vm-data/vlan': String(xosanNetwork.$PIFs[0].vlan || 0)
}
const ip = ipAddress
const sr = xapi.getObject(lvmSr.$id)
// refresh the object so that sizes are correct
await xapi._waitObjectState(sr.$id, sr => Boolean(sr.$PBDs))
const firstVif = newVM.$VIFs[0]
if (xosanNetwork.$id !== firstVif.$network.$id) {
try {
await xapi.call('VIF.move', firstVif.$ref, xosanNetwork.$ref)
} catch (error) {
if (error.code === 'MESSAGE_METHOD_UNKNOWN') {
// VIF.move has been introduced in xenserver 7.0
await xapi.deleteVif(firstVif.$id)
await xapi.createVif(newVM.$id, xosanNetwork.$id, firstVif)
}
}
}
await xapi.addTag(newVM.$id, `XOSAN-${xapi.pool.name_label}`)
await xapi.editVm(newVM, {
name_label: `XOSAN - ${lvmSr.name_label} - ${host.name_label} ${labelSuffix}`,
name_description: 'Xosan VM storage',
// https://bugs.xenserver.org/browse/XSO-762
memoryMax: memorySize,
memoryMin: memorySize,
memoryStaticMax: memorySize,
memory: memorySize
})
await xapi.call('VM.set_xenstore_data', newVM.$ref, xenstoreData)
const rootDisk = newVM.$VBDs.map(vbd => vbd && vbd.$VDI).find(vdi => vdi && vdi.name_label === 'xosan_root')
const rootDiskSize = rootDisk.virtual_size
await xapi.startVm(newVM)
debug('waiting for boot of ', ip)
// wait until we find the assigned IP in the networks, we are just checking the boot is complete
const vmIsUp = vm => Boolean(vm.$guest_metrics && includes(vm.$guest_metrics.networks, ip))
const vm = await xapi._waitObjectState(newVM.$id, vmIsUp)
debug('booted ', ip)
const localEndpoint = {xapi: xapi, hosts: [host], addresses: [ip]}
const srFreeSpace = sr.physical_size - sr.physical_utilisation
// we use a percentage because it looks like the VDI overhead is proportional
const newSize = Math.min(floor2048(Math.min(maxDiskSize - rootDiskSize, srFreeSpace * XOSAN_DATA_DISK_USEAGE_RATIO)),
XOSAN_MAX_DISK_SIZE)
const smallDiskSize = 1073741824
const deviceFile = await createNewDisk(xapi, lvmSr, newVM, increaseDataDisk ? newSize : smallDiskSize)
const brickName = await mountNewDisk(localEndpoint, ip, deviceFile)
return { address: ip, host, vm, underlyingSr: lvmSr, brickName }
}
async function _importGlusterVM (xapi, template, lvmsrId) {
const templateStream = await this.requestResource('xosan', template.id, template.version)
const newVM = await xapi.importVm(templateStream, { srId: lvmsrId, type: 'xva' })
await xapi.editVm(newVM, {
autoPoweron: true
})
return newVM
}
function _findAFreeIPAddress (nodes) {
return _findIPAddressOutsideList(map(nodes, n => n.vm.ip))
}
function _findIPAddressOutsideList (reservedList) {
const vmIpLastNumber = 101
for (let i = vmIpLastNumber; i < 255; i++) {
const candidate = NETWORK_PREFIX + i
if (!reservedList.find(a => a === candidate)) {
return candidate
}
}
return null
}
const _median = arr => {
arr.sort((a, b) => a - b)
return arr[Math.floor(arr.length / 2)]
}
const insertNewGlusterVm = defer.onFailure(async function ($onFailure, xapi, xosansr, lvmsrId, {labelSuffix = '',
glusterEndpoint = null, ipAddress = null, increaseDataDisk = true, brickSize = Infinity}) {
const data = xapi.xo.getData(xosansr, 'xosan_config')
if (ipAddress === null) {
ipAddress = _findAFreeIPAddress(data.nodes)
}
const vmsMemories = []
for (let node of data.nodes) {
try {
vmsMemories.push(xapi.getObject(node.vm.id).memory_dynamic_max)
} catch (e) {
// pass
}
}
const xosanNetwork = xapi.getObject(data.network)
const srObject = xapi.getObject(lvmsrId)
// can't really copy an existing VM, because existing gluster VMs disks might too large to be copied.
const newVM = await this::_importGlusterVM(xapi, data.template, lvmsrId)
$onFailure(() => xapi.deleteVm(newVM, true))
const addressAndHost = await _prepareGlusterVm(xapi, srObject, newVM, xosanNetwork, ipAddress, {labelSuffix,
increaseDataDisk,
maxDiskSize: brickSize,
memorySize: vmsMemories.length ? _median(vmsMemories) : 2 * GIGABYTE})
if (!glusterEndpoint) {
glusterEndpoint = this::_getGlusterEndpoint(xosansr)
}
await _probePoolAndWaitForPresence(glusterEndpoint, [addressAndHost.address])
return { data, newVM, addressAndHost, glusterEndpoint }
})
export const addBricks = defer.onFailure(async function ($onFailure, {xosansr, lvmsrs, brickSize}) {
const xapi = this.getXapi(xosansr)
if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) {
throw new Error('createSR is already running for this pool')
}
CURRENTLY_CREATING_SRS[xapi.pool.$id] = true
try {
const data = xapi.xo.getData(xosansr, 'xosan_config')
const usedAddresses = map(data.nodes, n => n.vm.ip)
const glusterEndpoint = this::_getGlusterEndpoint(xosansr)
const newAddresses = []
const newNodes = []
for (let newSr of lvmsrs) {
const ipAddress = _findIPAddressOutsideList(usedAddresses.concat(newAddresses))
newAddresses.push(ipAddress)
const {newVM, addressAndHost} = await this::insertNewGlusterVm(xapi, xosansr, newSr, { ipAddress, brickSize })
$onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + ipAddress, true))
$onFailure(() => xapi.deleteVm(newVM, true))
const brickName = addressAndHost.brickName
newNodes.push({brickName, host: addressAndHost.host.$id, vm: {id: newVM.$id, ip: ipAddress}, underlyingSr: newSr})
}
const arbiterNode = data.nodes.find(n => n['arbiter'])
if (arbiterNode) {
await glusterCmd(glusterEndpoint,
`volume remove-brick xosan replica ${data.nodes.length - 1} ${arbiterNode.brickName} force`)
data.nodes = data.nodes.filter(n => n !== arbiterNode)
data.type = 'replica'
await xapi.xo.setData(xosansr, 'xosan_config', data)
await glusterCmd(glusterEndpoint, 'peer detach ' + arbiterNode.vm.ip, true)
await xapi.deleteVm(arbiterNode.vm.id, true)
}
await glusterCmd(glusterEndpoint, `volume add-brick xosan ${newNodes.map(n => n.brickName).join(' ')}`)
data.nodes = data.nodes.concat(newNodes)
await xapi.xo.setData(xosansr, 'xosan_config', data)
await xapi.call('SR.scan', xapi.getObject(xosansr).$ref)
} finally {
delete CURRENTLY_CREATING_SRS[xapi.pool.$id]
}
})
addBricks.description = 'add brick to XOSAN SR'
addBricks.permission = 'admin'
addBricks.params = {
xosansr: { type: 'string' },
lvmsrs: {
type: 'array',
items: {
type: 'string'
} },
brickSize: {type: 'number'}
}
addBricks.resolve = {
xosansr: ['sr', 'SR', 'administrate'],
lvmsrs: ['sr', 'SR', 'administrate']
}
export const removeBricks = defer.onFailure(async function ($onFailure, { xosansr, bricks }) {
const xapi = this.getXapi(xosansr)
if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) {
throw new Error('this there is already a XOSAN operation running on this pool')
}
CURRENTLY_CREATING_SRS[xapi.pool.$id] = true
try {
const data = xapi.xo.getData(xosansr, 'xosan_config')
// IPV6
const ips = map(bricks, b => b.split(':')[0])
const glusterEndpoint = this::_getGlusterEndpoint(xosansr)
// "peer detach" doesn't allow removal of locahost
remove(glusterEndpoint.addresses, ip => ips.includes(ip))
const dict = _getIPToVMDict(xapi, xosansr)
const brickVMs = map(bricks, b => dict[b])
await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`)
await asyncMap(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true))
remove(data.nodes, node => ips.includes(node.vm.ip))
await xapi.xo.setData(xosansr, 'xosan_config', data)
await xapi.call('SR.scan', xapi.getObject(xosansr).$ref)
await asyncMap(brickVMs, vm => xapi.deleteVm(vm.vm, true))
} finally {
delete CURRENTLY_CREATING_SRS[xapi.pool.$id]
}
})
removeBricks.description = 'remove brick from XOSAN SR'
removeBricks.permission = 'admin'
removeBricks.params = {
xosansr: { type: 'string' },
bricks: {
type: 'array',
items: { type: 'string' }
}
}
export function checkSrIsBusy ({ poolId }) {
return !!CURRENTLY_CREATING_SRS[poolId]
}
@ -400,7 +747,7 @@ POSSIBLE_CONFIGURATIONS[2] = [{ layout: 'replica_arbiter', redundancy: 3, capaci
POSSIBLE_CONFIGURATIONS[3] = [
{ layout: 'disperse', redundancy: 1, capacity: 2 },
{ layout: 'replica', redundancy: 3, capacity: 1 }]
POSSIBLE_CONFIGURATIONS[4] = [{ layout: 'replica', redundancy: 2, capacity: 1 }]
POSSIBLE_CONFIGURATIONS[4] = [{ layout: 'replica', redundancy: 2, capacity: 2 }]
POSSIBLE_CONFIGURATIONS[5] = [{ layout: 'disperse', redundancy: 1, capacity: 4 }]
POSSIBLE_CONFIGURATIONS[6] = [
{ layout: 'disperse', redundancy: 2, capacity: 4 },
@ -427,7 +774,7 @@ POSSIBLE_CONFIGURATIONS[15] = [
{ layout: 'replica', redundancy: 3, capacity: 5 }]
POSSIBLE_CONFIGURATIONS[16] = [{ layout: 'replica', redundancy: 2, capacity: 8 }]
export async function computeXosanPossibleOptions ({ lvmSrs }) {
export async function computeXosanPossibleOptions ({ lvmSrs, brickSize }) {
const count = lvmSrs.length
const configurations = POSSIBLE_CONFIGURATIONS[count]
if (!configurations) {
@ -437,9 +784,9 @@ export async function computeXosanPossibleOptions ({ lvmSrs }) {
const xapi = this.getXapi(lvmSrs[0])
const srs = map(lvmSrs, srId => xapi.getObject(srId))
const srSizes = map(srs, sr => sr.physical_size - sr.physical_utilisation)
const minSize = Math.min.apply(null, srSizes)
const brickSize = (minSize - XOSAN_VM_SYSTEM_DISK_SIZE) * XOSAN_DATA_DISK_USEAGE_RATIO
return configurations.map(conf => ({ ...conf, availableSpace: brickSize * conf.capacity }))
const minSize = Math.min.apply(null, srSizes.concat(brickSize))
const finalBrickSize = Math.floor((minSize - XOSAN_VM_SYSTEM_DISK_SIZE) * XOSAN_DATA_DISK_USEAGE_RATIO)
return configurations.map(conf => ({ ...conf, availableSpace: finalBrickSize * conf.capacity }))
}
}
@ -449,6 +796,9 @@ computeXosanPossibleOptions.params = {
items: {
type: 'string'
}
},
brickSize: {
type: 'number'
}
}

View File

@ -240,10 +240,6 @@ argparse@^1.0.7:
dependencies:
sprintf-js "~1.0.2"
arp-a@^0.5.1:
version "0.5.1"
resolved "https://registry.yarnpkg.com/arp-a/-/arp-a-0.5.1.tgz#c6b8b2dd449b1389653518ca9b5cb10c555558d3"
arr-diff@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/arr-diff/-/arr-diff-2.0.0.tgz#8f3b827f955a8bd669697e4a4256ac3ceae356cf"
@ -2115,7 +2111,17 @@ escape-string-regexp@^1.0.2, escape-string-regexp@^1.0.5:
version "1.0.5"
resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz#1b61c0562190a8dff6ae3bb2cf0200ca130b86d4"
escodegen@1.x.x, escodegen@^1.6.1:
escodegen@1.x.x, escodegen@~1.3.2:
version "1.3.3"
resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.3.3.tgz#f024016f5a88e046fd12005055e939802e6c5f23"
dependencies:
esprima "~1.1.1"
estraverse "~1.5.0"
esutils "~1.0.0"
optionalDependencies:
source-map "~0.1.33"
escodegen@^1.6.1:
version "1.8.1"
resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.8.1.tgz#5a5b53af4693110bebb0867aa3430dd3b70a1018"
dependencies:
@ -2135,16 +2141,6 @@ escodegen@~0.0.24:
optionalDependencies:
source-map ">= 0.1.2"
escodegen@~1.3.2:
version "1.3.3"
resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.3.3.tgz#f024016f5a88e046fd12005055e939802e6c5f23"
dependencies:
esprima "~1.1.1"
estraverse "~1.5.0"
esutils "~1.0.0"
optionalDependencies:
source-map "~0.1.33"
escope@^3.6.0:
version "3.6.0"
resolved "https://registry.yarnpkg.com/escope/-/escope-3.6.0.tgz#e01975e812781a163a6dadfdd80398dc64c889c3"
@ -5642,7 +5638,7 @@ read-pkg@^1.0.0:
normalize-package-data "^2.3.2"
path-type "^1.0.0"
readable-stream@1.1.x, readable-stream@^1.0.33, readable-stream@~1.1.9:
readable-stream@1.1.x, readable-stream@~1.1.9:
version "1.1.14"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
dependencies:
@ -5663,7 +5659,7 @@ readable-stream@2, readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stre
string_decoder "~1.0.3"
util-deprecate "~1.0.1"
"readable-stream@>=1.0.33-1 <1.1.0-0", readable-stream@~1.0.0, readable-stream@~1.0.17, readable-stream@~1.0.26, readable-stream@~1.0.27-1, readable-stream@~1.0.31:
"readable-stream@>=1.0.33-1 <1.1.0-0", readable-stream@^1.0.33, readable-stream@~1.0.0, readable-stream@~1.0.17, readable-stream@~1.0.26, readable-stream@~1.0.27-1, readable-stream@~1.0.31:
version "1.0.34"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c"
dependencies: