From 08d7c189eb30ef715abb248a0ea2cf7cd5b6059d Mon Sep 17 00:00:00 2001 From: Nicolas Raynaud Date: Thu, 31 Aug 2017 08:35:48 -0700 Subject: [PATCH] feat(xosan): beta 2 (#598) --- src/api/xosan.js | 772 ++++++++++++++++++++++++++++++++++------------- yarn.lock | 30 +- 2 files changed, 574 insertions(+), 228 deletions(-) diff --git a/src/api/xosan.js b/src/api/xosan.js index 1264011ea..c72b31401 100644 --- a/src/api/xosan.js +++ b/src/api/xosan.js @@ -1,106 +1,120 @@ -import arp from 'arp-a' import createLogger from 'debug' import defer from 'golike-defer' import execa from 'execa' -import fromPairs from 'lodash/fromPairs' import fs from 'fs-extra' import map from 'lodash/map' -import splitLines from 'split-lines' -import { tap } from 'promise-toolbox' +import { tap, delay } from 'promise-toolbox' import { + includes, + isArray, + remove, filter, - includes + range } from 'lodash' - import { asyncMap, - pFromCallback, - splitFirst + parseXml } from '../utils' const debug = createLogger('xo:xosan') const SSH_KEY_FILE = 'id_rsa_xosan' const NETWORK_PREFIX = '172.31.100.' - -const XOSAN_VM_SYSTEM_DISK_SIZE = 10 * 1024 * 1024 * 1024 +const VM_FIRST_NUMBER = 101 +const GIGABYTE = 1024 * 1024 * 1024 +const XOSAN_VM_SYSTEM_DISK_SIZE = 10 * GIGABYTE const XOSAN_DATA_DISK_USEAGE_RATIO = 0.99 const XOSAN_MAX_DISK_SIZE = 2093050 * 1024 * 1024 // a bit under 2To const CURRENTLY_CREATING_SRS = {} -export async function getVolumeInfo ({ sr }) { +function _getIPToVMDict (xapi, sr) { + const dict = {} + const data = xapi.xo.getData(sr, 'xosan_config') + if (data && data.nodes) { + data.nodes.forEach(conf => { + try { + dict[conf.brickName] = {vm: xapi.getObject(conf.vm.id), sr: conf.underlyingSr} + } catch (e) { + // pass + } + }) + } + return dict +} + +function _getGlusterEndpoint (sr) { const xapi = this.getXapi(sr) - const giantIPtoVMDict = {} const data = xapi.xo.getData(sr, 'xosan_config') if (!data || !data.nodes) { return null } - const nodes = data.nodes - nodes.forEach(conf => { - giantIPtoVMDict[conf.vm.ip] = xapi.getObject(conf.vm.id) - }) - const oneHostAndVm = nodes[0] - const resultCmd = await remoteSsh(xapi, { - host: xapi.getObject(oneHostAndVm.host), - address: oneHostAndVm.vm.ip - }, 'gluster volume info xosan') - const result = resultCmd['stdout'] + return { xapi, data: data, hosts: map(data.nodes, node => xapi.getObject(node.host)), addresses: map(data.nodes, node => node.vm.ip) } +} - /* - Volume Name: xosan - Type: Disperse - Volume ID: 1d4d0e57-8b6b-43f9-9d40-c48be1df7548 - Status: Started - Snapshot Count: 0 - Number of Bricks: 1 x (2 + 1) = 3 - Transport-type: tcp - Bricks: - Brick1: 192.168.0.201:/bricks/brick1/xosan1 - Brick2: 192.168.0.202:/bricks/brick1/xosan1 - Brick3: 192.168.0.203:/bricks/brick1/xosan1 - Options Reconfigured: - client.event-threads: 16 - server.event-threads: 16 - performance.client-io-threads: on - nfs.disable: on - performance.readdir-ahead: on - transport.address-family: inet - features.shard: on - features.shard-block-size: 64MB - network.remote-dio: enable - cluster.eager-lock: enable - performance.io-cache: off - performance.read-ahead: off - performance.quick-read: off - performance.stat-prefetch: on - performance.strict-write-ordering: off - cluster.server-quorum-type: server - cluster.quorum-type: auto - */ - const info = fromPairs( - splitLines(result.trim()).map(line => - splitFirst(line, ':').map(val => val.trim()) - ) - ) +async function rateLimitedRetry (action, shouldRetry, retryCount = 20) { + let retryDelay = 500 * (1 + Math.random() / 20) + let result + while (retryCount > 0 && (result = await action()) && shouldRetry(result)) { + retryDelay *= 1.1 + debug('waiting ' + retryDelay + 'ms and retrying') + await delay(retryDelay) + retryCount-- + } + return result +} - const getNumber = item => +item.substr(5) - const brickKeys = filter(Object.keys(info), key => key.match(/^Brick[1-9]/)).sort((i1, i2) => getNumber(i1) - getNumber(i2)) +export async function getVolumeInfo ({ sr, infoType }) { + const glusterEndpoint = this::_getGlusterEndpoint(sr) - // expected brickKeys : [ 'Brick1', 'Brick2', 'Brick3' ] - info['Bricks'] = brickKeys.map(key => { - const ip = info[key].split(':')[0] - return { config: info[key], ip: ip, vm: giantIPtoVMDict[ip] } - }) - const entry = await pFromCallback(cb => arp.table(cb)) - if (entry) { - const brick = info['Bricks'].find(element => element.config.split(':')[0] === entry.ip) - if (brick) { - brick.mac = entry.mac + function parseHeal (parsed) { + const bricks = [] + parsed['healInfo']['bricks']['brick'].forEach(brick => { + bricks.push(brick) + if (brick['file'] && !isArray(brick['file'])) { + brick['file'] = [brick['file']] + } + }) + return {commandStatus: true, result: {bricks}} + } + + function parseStatus (parsed) { + const brickDictByUuid = {} + const volume = parsed['volStatus']['volumes']['volume'] + volume['node'].forEach(node => { + brickDictByUuid[node.peerid] = brickDictByUuid[node.peerid] || [] + brickDictByUuid[node.peerid].push(node) + }) + return { + commandStatus: true, + result: {nodes: brickDictByUuid, tasks: volume['tasks']} } } - return info + function parseInfo (parsed) { + const volume = parsed['volInfo']['volumes']['volume'] + volume['bricks'] = volume['bricks']['brick'] + volume['options'] = volume['options']['option'] + return {commandStatus: true, result: volume} + } + + const infoTypes = { + heal: {command: 'heal xosan info', handler: parseHeal}, + status: {command: 'status xosan', handler: parseStatus}, + statusDetail: {command: 'status xosan detail', handler: parseStatus}, + statusMem: {command: 'status xosan mem', handler: parseStatus}, + info: {command: 'info xosan', handler: parseInfo} + } + const foundType = infoTypes[infoType] + if (!foundType) { + throw new Error('getVolumeInfo(): "' + infoType + '" is an invalid type') + } + + const cmdShouldRetry = + result => !result['commandStatus'] && result.parsed && result.parsed['cliOutput']['opErrno'] === '30802' + const runCmd = async () => glusterCmd(glusterEndpoint, 'volume ' + foundType.command, true) + let commandResult = await rateLimitedRetry(runCmd, cmdShouldRetry) + return commandResult['commandStatus'] ? foundType.handler(commandResult.parsed['cliOutput']) : commandResult } getVolumeInfo.description = 'info on gluster volume' @@ -109,6 +123,9 @@ getVolumeInfo.permission = 'admin' getVolumeInfo.params = { sr: { type: 'string' + }, + infoType: { + type: 'string' } } getVolumeInfo.resolve = { @@ -118,41 +135,8 @@ function floor2048 (value) { return 2048 * Math.floor(value / 2048) } -async function copyVm (xapi, originalVm, params) { - return { vm: await xapi.copyVm(originalVm, params.sr), params } -} - -async function prepareGlusterVm (xapi, vmAndParam, xosanNetwork, increaseDataDisk = true) { - let vm = vmAndParam.vm - // refresh the object so that sizes are correct - const params = vmAndParam.params - const ip = params.xenstore_data['vm-data/ip'] - const sr = xapi.getObject(params.sr.$id) - await xapi._waitObjectState(sr.$id, sr => Boolean(sr.$PBDs)) - const host = sr.$PBDs[0].$host - const firstVif = vm.$VIFs[0] - if (xosanNetwork.$id !== firstVif.$network.$id) { - await xapi.call('VIF.move', firstVif.$ref, xosanNetwork.$ref) - } - await xapi.editVm(vm, { - name_label: params.name_label, - name_description: params.name_description - }) - await xapi.call('VM.set_xenstore_data', vm.$ref, params.xenstore_data) - if (increaseDataDisk) { - const dataDisk = vm.$VBDs.map(vbd => vbd.$VDI).find(vdi => vdi && vdi.name_label === 'xosan_data') - const srFreeSpace = sr.physical_size - sr.physical_utilisation - // we use a percentage because it looks like the VDI overhead is proportional - const newSize = floor2048((srFreeSpace + dataDisk.virtual_size) * XOSAN_DATA_DISK_USEAGE_RATIO) - await xapi._resizeVdi(dataDisk, Math.min(newSize, XOSAN_MAX_DISK_SIZE)) - } - await xapi.startVm(vm) - debug('waiting for boot of ', ip) - // wait until we find the assigned IP in the networks, we are just checking the boot is complete - const vmIsUp = vm => Boolean(vm.$guest_metrics && includes(vm.$guest_metrics.networks, ip)) - vm = await xapi._waitObjectState(vm.$id, vmIsUp) - debug('booted ', ip) - return { address: ip, host, vm } +async function copyVm (xapi, originalVm, sr) { + return { sr, vm: await xapi.copyVm(originalVm, sr) } } async function callPlugin (xapi, host, command, params) { @@ -160,20 +144,70 @@ async function callPlugin (xapi, host, command, params) { return JSON.parse(await xapi.call('host.call_plugin', host.$ref, 'xosan.py', command, params)) } -async function remoteSsh (xapi, hostAndAddress, cmd) { - const result = await callPlugin(xapi, hostAndAddress.host, 'run_ssh', { - destination: 'root@' + hostAndAddress.address, - cmd: cmd - }) - if (result.exit !== 0) { - throw new Error('ssh error: ' + JSON.stringify(result)) +async function remoteSsh (glusterEndpoint, cmd, ignoreError = false) { + let result + for (let address of glusterEndpoint.addresses) { + for (let host of glusterEndpoint.hosts) { + try { + result = await callPlugin(glusterEndpoint.xapi, host, 'run_ssh', {destination: 'root@' + address, cmd: cmd}) + break + } catch (exception) { + if (exception['code'] !== 'HOST_OFFLINE') { + throw exception + } + } + } + debug(result.command.join(' '), '\n =>exit:', result.exit, '\n =>err :', result.stderr, + '\n =>out (1000 chars) :', result.stdout.substring(0, 1000)) + // 255 seems to be ssh's own error codes. + if (result.exit !== 255) { + if (!ignoreError && result.exit !== 0) { + throw new Error('ssh error: ' + JSON.stringify(result)) + } + return result + } } - debug(result) - return result + throw new Error(result ? 'ssh error: ' + JSON.stringify(result) : 'no suitable SSH host: ' + + JSON.stringify(glusterEndpoint)) } -async function setPifIp (xapi, pif, address) { - await xapi.call('PIF.reconfigure_ip', pif.$ref, 'Static', address, '255.255.255.0', NETWORK_PREFIX + '1', '') +function findErrorMessage (commandResut) { + if (commandResut['exit'] === 0 && commandResut.parsed) { + const cliOut = commandResut.parsed['cliOutput'] + if (cliOut['opErrstr'] && cliOut['opErrstr'].length) { + return cliOut['opErrstr'] + } + // "peer probe" returns it's "already in peer" error in cliOutput/output + if (cliOut['output'] && cliOut['output'].length) { + return cliOut['output'] + } + } + return commandResut['stderr'].length ? commandResut['stderr'] : commandResut['stdout'] +} + +async function glusterCmd (glusterEndpoint, cmd, ignoreError = false) { + const result = await remoteSsh(glusterEndpoint, `gluster --mode=script --xml ${cmd}`, true) + try { + result.parsed = parseXml(result['stdout']) + } catch (e) { + // pass, we never know if a message can be parsed or not, so we just try + } + if (result['exit'] === 0) { + const cliOut = result.parsed['cliOutput'] + // we have found cases where opErrno is !=0 and opRet was 0, albeit the operation was an error. + result.commandStatus = cliOut['opRet'].trim() === '0' && cliOut['opErrno'].trim() === '0' + result.error = findErrorMessage(result) + } else { + result.commandStatus = false + // "gluster volume status" timeout error message is reported on stdout instead of stderr + result.error = findErrorMessage(result) + } + if (!ignoreError && !result.commandStatus) { + const error = new Error(`error in gluster "${result.error}"`) + error.result = result + throw error + } + return result } const createNetworkAndInsertHosts = defer.onFailure(async function ($onFailure, xapi, pif, vlan) { @@ -182,14 +216,15 @@ const createNetworkAndInsertHosts = defer.onFailure(async function ($onFailure, name: 'XOSAN network', description: 'XOSAN network', pifId: pif._xapiId, - mtu: 9000, + mtu: pif.mtu, vlan: +vlan }) $onFailure(() => xapi.deleteNetwork(xosanNetwork)) - await Promise.all(xosanNetwork.$PIFs.map(pif => setPifIp(xapi, pif, NETWORK_PREFIX + (hostIpLastNumber++)))) - + await Promise.all(xosanNetwork.$PIFs.map(pif => xapi.call('PIF.reconfigure_ip', pif.$ref, 'Static', + NETWORK_PREFIX + (hostIpLastNumber++), '255.255.255.0', NETWORK_PREFIX + '1', ''))) return xosanNetwork }) + async function getOrCreateSshKey (xapi) { let sshKey = xapi.xo.getData(xapi.pool, 'xosan_ssh_key') @@ -212,7 +247,29 @@ async function getOrCreateSshKey (xapi) { return sshKey } -async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, glusterType, arbiter = null) { + +const _probePoolAndWaitForPresence = defer.onFailure(async function ($onFailure, glusterEndpoint, addresses) { + await asyncMap(addresses, async (address) => { + await glusterCmd(glusterEndpoint, 'peer probe ' + address) + $onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true)) + }) + function shouldRetry (peers) { + for (let peer of peers) { + if (peer.state === '4') { + return true + } + if (peer.state === '6') { + throw new Error(`${peer.hostname} is not in pool ("${peer.stateStr}")`) + } + } + return false + } + + const getPoolStatus = async () => (await glusterCmd(glusterEndpoint, 'pool list')).parsed.cliOutput.peerStatus.peer + return rateLimitedRetry(getPoolStatus, shouldRetry) +}) + +async function configureGluster (redundancy, ipAndHosts, glusterEndpoint, glusterType, arbiter = null) { const configByType = { replica_arbiter: { creation: 'replica 3 arbiter 1', @@ -220,7 +277,7 @@ async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, g }, replica: { creation: 'replica ' + redundancy + ' ', - extra: ['gluster volume set xosan cluster.data-self-heal on'] + extra: ['volume set xosan cluster.data-self-heal on'] }, disperse: { creation: 'disperse ' + ipAndHosts.length + ' redundancy ' + redundancy + ' ', @@ -228,33 +285,32 @@ async function configureGluster (redundancy, ipAndHosts, xapi, firstIpAndHost, g } } let brickVms = arbiter ? ipAndHosts.concat(arbiter) : ipAndHosts - for (let i = 1; i < brickVms.length; i++) { - await remoteSsh(xapi, firstIpAndHost, 'gluster peer probe ' + brickVms[i].address) - } + await _probePoolAndWaitForPresence(glusterEndpoint, map(brickVms.slice(1), bv => bv.address)) const creation = configByType[glusterType].creation - const volumeCreation = 'gluster volume create xosan ' + creation + - ' ' + brickVms.map(ipAndHost => (ipAndHost.address + ':/bricks/xosan/xosandir')).join(' ') + const volumeCreation = 'volume create xosan ' + creation + ' ' + + brickVms.map(ipAndHost => ipAndHost.brickName).join(' ') debug('creating volume: ', volumeCreation) - await remoteSsh(xapi, firstIpAndHost, volumeCreation) - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan network.remote-dio enable') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan cluster.eager-lock enable') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.io-cache off') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.read-ahead off') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.quick-read off') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.strict-write-ordering off') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan client.event-threads 8') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan server.event-threads 8') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.io-thread-count 64') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan performance.stat-prefetch on') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan features.shard on') - await remoteSsh(xapi, firstIpAndHost, 'gluster volume set xosan features.shard-block-size 512MB') + await glusterCmd(glusterEndpoint, volumeCreation) + await glusterCmd(glusterEndpoint, 'volume set xosan network.remote-dio enable') + await glusterCmd(glusterEndpoint, 'volume set xosan cluster.eager-lock enable') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.io-cache off') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.read-ahead off') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.quick-read off') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.strict-write-ordering off') + await glusterCmd(glusterEndpoint, 'volume set xosan client.event-threads 8') + await glusterCmd(glusterEndpoint, 'volume set xosan server.event-threads 8') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.io-thread-count 64') + await glusterCmd(glusterEndpoint, 'volume set xosan performance.stat-prefetch on') + await glusterCmd(glusterEndpoint, 'volume set xosan features.shard on') + await glusterCmd(glusterEndpoint, 'volume set xosan features.shard-block-size 512MB') for (const confChunk of configByType[glusterType].extra) { - await remoteSsh(xapi, firstIpAndHost, confChunk) + await glusterCmd(glusterEndpoint, confChunk) } - await remoteSsh(xapi, firstIpAndHost, 'gluster volume start xosan') + await glusterCmd(glusterEndpoint, 'volume start xosan') } -export const createSR = defer.onFailure(async function ($onFailure, { template, pif, vlan, srs, glusterType, redundancy }) { +export const createSR = defer.onFailure(async function ($onFailure, { template, pif, vlan, srs, glusterType, + redundancy, brickSize, memorySize }) { if (!this.requestResource) { throw new Error('requestResource is not a function') } @@ -263,7 +319,7 @@ export const createSR = defer.onFailure(async function ($onFailure, { template, return // TODO: throw an error } - let vmIpLastNumber = 101 + let vmIpLastNumber = VM_FIRST_NUMBER const xapi = this.getXapi(srs[0]) if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) { throw new Error('createSR is already running for this pool') @@ -275,86 +331,70 @@ export const createSR = defer.onFailure(async function ($onFailure, { template, $onFailure(() => xapi.deleteNetwork(xosanNetwork)) const sshKey = await getOrCreateSshKey(xapi) const srsObjects = map(srs, srId => xapi.getObject(srId)) - - const vmParameters = map(srs, srId => { - const sr = xapi.getObject(srId) - const host = sr.$PBDs[0].$host - return { - sr, - host, - name_label: `XOSAN - ${sr.name_label} - ${host.name_label}`, - name_description: 'Xosan VM storing data on volume ' + sr.name_label, - // the values of the xenstore_data object *have* to be string, don't forget. - xenstore_data: { - 'vm-data/hostname': 'XOSAN' + sr.name_label, - 'vm-data/sshkey': sshKey.public, - 'vm-data/ip': NETWORK_PREFIX + (vmIpLastNumber++), - 'vm-data/mtu': String(xosanNetwork.MTU), - 'vm-data/vlan': String(vlan) - } - } - }) - await Promise.all(vmParameters.map(vmParam => callPlugin(xapi, vmParam.host, 'receive_ssh_keys', { + await Promise.all(srsObjects.map(sr => callPlugin(xapi, sr.$PBDs[0].$host, 'receive_ssh_keys', { private_key: sshKey.private, public_key: sshKey.public, force: 'true' }))) - const firstVM = await xapi.importVm( - await this.requestResource('xosan', template.id, template.version), - { srId: vmParameters[0].sr.$ref, type: 'xva' } - ) - $onFailure(() => xapi.deleteVm(firstVM)) - await xapi.editVm(firstVM, { - autoPoweron: true - }) - const copiedVms = await asyncMap(vmParameters.slice(1), param => - copyVm(xapi, firstVM, param)::tap(({ vm }) => + const firstSr = srsObjects[0] + const firstVM = await this::_importGlusterVM(xapi, template, firstSr) + $onFailure(() => xapi.deleteVm(firstVM, true)) + const copiedVms = await asyncMap(srsObjects.slice(1), sr => + copyVm(xapi, firstVM, sr)::tap(({ vm }) => $onFailure(() => xapi.deleteVm(vm)) ) ) - const vmsAndParams = [{ + const vmsAndSrs = [{ vm: firstVM, - params: vmParameters[0] + sr: firstSr }].concat(copiedVms) let arbiter = null if (srs.length === 2) { - const sr = vmParameters[0].sr - const arbiterConfig = { - sr: sr, - host: vmParameters[0].host, - name_label: vmParameters[0].name_label + ' arbiter', - name_description: 'Xosan VM storing data on volume ' + sr.name_label, - xenstore_data: { - 'vm-data/hostname': 'XOSAN' + sr.name_label + '_arb', - 'vm-data/sshkey': sshKey.public, - 'vm-data/ip': NETWORK_PREFIX + (vmIpLastNumber++), - 'vm-data/mtu': String(xosanNetwork.MTU), - 'vm-data/vlan': String(vlan) - } - } - const arbiterVm = await copyVm(xapi, firstVM, arbiterConfig) - $onFailure(() => xapi.deleteVm(arbiterVm.vm)) - arbiter = await prepareGlusterVm(xapi, arbiterVm, xosanNetwork, false) + const sr = firstSr + const arbiterIP = NETWORK_PREFIX + (vmIpLastNumber++) + const arbiterVm = await xapi.copyVm(firstVM, sr) + $onFailure(() => xapi.deleteVm(arbiterVm, true)) + arbiter = await _prepareGlusterVm(xapi, sr, arbiterVm, xosanNetwork, arbiterIP, {labelSuffix: '_arbiter', + increaseDataDisk: false, + memorySize}) + arbiter.arbiter = true } - const ipAndHosts = await Promise.all(map(vmsAndParams, vmAndParam => prepareGlusterVm(xapi, vmAndParam, xosanNetwork))) - const firstIpAndHost = ipAndHosts[0] - await configureGluster(redundancy, ipAndHosts, xapi, firstIpAndHost, glusterType, arbiter) + const ipAndHosts = await asyncMap(vmsAndSrs, vmAndSr => _prepareGlusterVm(xapi, vmAndSr.sr, vmAndSr.vm, xosanNetwork, + NETWORK_PREFIX + (vmIpLastNumber++), {maxDiskSize: brickSize, memorySize})) + const glusterEndpoint = { xapi, hosts: map(ipAndHosts, ih => ih.host), addresses: map(ipAndHosts, ih => ih.address) } + await configureGluster(redundancy, ipAndHosts, glusterEndpoint, glusterType, arbiter) debug('xosan gluster volume started') - const config = { server: firstIpAndHost.address + ':/xosan', backupserver: ipAndHosts[1].address } - const xosanSr = await xapi.call('SR.create', srsObjects[0].$PBDs[0].$host.$ref, config, 0, 'XOSAN', 'XOSAN', 'xosan', '', true, {}) + // We use 10 IPs of the gluster VM range as backup, in the hope that even if the first VM gets destroyed we find at least + // one VM to give mount the volfile. + // It is not possible to edit the device_config after the SR is created and this data is only used at mount time when rebooting + // the hosts. + const backupservers = map(range(VM_FIRST_NUMBER, VM_FIRST_NUMBER + 10), ipLastByte => NETWORK_PREFIX + ipLastByte).join(':') + const config = { server: ipAndHosts[0].address + ':/xosan', backupservers } + const xosanSrRef = await xapi.call('SR.create', firstSr.$PBDs[0].$host.$ref, config, 0, 'XOSAN', 'XOSAN', + 'xosan', '', true, {}) + // we just forget because the cleanup actions are stacked in the $onFailure system + $onFailure(() => xapi.forgetSr(xosanSrRef)) if (arbiter) { ipAndHosts.push(arbiter) } - // we just forget because the cleanup actions will be executed before. - $onFailure(() => xapi.forgetSr(xosanSr)) - await xapi.xo.setData(xosanSr, 'xosan_config', { - nodes: ipAndHosts.map(param => ({ - host: param.host.$id, - vm: { id: param.vm.$id, ip: param.address } - })), - network: xosanNetwork.$id + const nodes = ipAndHosts.map(param => ({ + brickName: param.brickName, + host: param.host.$id, + vm: {id: param.vm.$id, ip: param.address}, + underlyingSr: param.underlyingSr.$id, + arbiter: !!param['arbiter'] + })) + await xapi.xo.setData(xosanSrRef, 'xosan_config', { + version: 'beta2', + nodes: nodes, + template: template, + network: xosanNetwork.$id, + type: glusterType, + redundancy }) + debug('scanning new SR') + await xapi.call('SR.scan', xosanSrRef) } finally { delete CURRENTLY_CREATING_SRS[xapi.pool.$id] } @@ -388,6 +428,313 @@ createSR.resolve = { pif: ['pif', 'PIF', 'administrate'] } +async function umountDisk (localEndpoint, diskMountPoint) { + await remoteSsh(localEndpoint, `killall -v -w /usr/sbin/xfs_growfs; fuser -v ${diskMountPoint}; umount ${diskMountPoint} && sed -i '\\_${diskMountPoint}\\S_d' /etc/fstab && rm -rf ${diskMountPoint}`) +} + +async function createNewDisk (xapi, sr, vm, diskSize) { + const newDisk = await xapi.createVdi(diskSize, {sr: sr, name_label: 'xosan_data', name_description: 'Created by XO'}) + await xapi.attachVdiToVm(newDisk, vm) + let vbd = await xapi._waitObjectState(newDisk.$id, disk => Boolean(disk.$VBDs.length)).$VBDs[0] + vbd = await xapi._waitObjectState(vbd.$id, vbd => Boolean(vbd.device.length)) + return '/dev/' + vbd.device +} + +async function mountNewDisk (localEndpoint, hostname, newDeviceFiledeviceFile) { + const brickRootCmd = 'bash -c \'mkdir -p /bricks; for TESTVAR in {1..9}; do TESTDIR="/bricks/xosan$TESTVAR" ;if mkdir $TESTDIR; then echo $TESTDIR; exit 0; fi ; done ; exit 1\'' + const newBrickRoot = (await remoteSsh(localEndpoint, brickRootCmd)).stdout.trim() + const brickName = `${hostname}:${newBrickRoot}/xosandir` + const mountBrickCmd = `mkfs.xfs -i size=512 ${newDeviceFiledeviceFile}; mkdir -p ${newBrickRoot}; echo "${newDeviceFiledeviceFile} ${newBrickRoot} xfs defaults 0 0" >> /etc/fstab; mount -a` + await remoteSsh(localEndpoint, mountBrickCmd) + return brickName +} + +async function replaceBrickOnSameVM (xosansr, previousBrick, newLvmSr, brickSize) { + // TODO: a bit of user input validation on 'previousBrick', it's going to ssh + const previousIp = previousBrick.split(':')[0] + brickSize = brickSize === undefined ? Infinity : brickSize + const xapi = this.getXapi(xosansr) + const data = xapi.xo.getData(xosansr, 'xosan_config') + const nodes = data.nodes + const nodeIndex = nodes.findIndex(node => node.vm.ip === previousIp) + const glusterEndpoint = this::_getGlusterEndpoint(xosansr) + const previousVM = _getIPToVMDict(xapi, xosansr)[previousBrick].vm + const newDeviceFile = await createNewDisk(xapi, newLvmSr, previousVM, brickSize) + const localEndpoint = { + xapi, + hosts: map(nodes, node => xapi.getObject(node.host)), + addresses: [previousIp] + } + const previousBrickRoot = previousBrick.split(':')[1].split('/').slice(0, 3).join('/') + const previousBrickDevice = (await remoteSsh(localEndpoint, `grep " ${previousBrickRoot} " /proc/mounts | cut -d ' ' -f 1 | sed 's_/dev/__'`)).stdout.trim() + const brickName = await mountNewDisk(localEndpoint, previousIp, newDeviceFile) + await glusterCmd(glusterEndpoint, `volume replace-brick xosan ${previousBrick} ${brickName} commit force`) + nodes[nodeIndex].brickName = brickName + nodes[nodeIndex].underlyingSr = newLvmSr + await xapi.xo.setData(xosansr, 'xosan_config', data) + await umountDisk(localEndpoint, previousBrickRoot) + const previousVBD = previousVM.$VBDs.find(vbd => vbd.device === previousBrickDevice) + await xapi.disconnectVbd(previousVBD) + await xapi.deleteVdi(previousVBD.VDI) + await xapi.call('SR.scan', xapi.getObject(xosansr).$ref) +} + +export async function replaceBrick ({ xosansr, previousBrick, newLvmSr, brickSize, onSameVM = true }) { + if (onSameVM) { + return this::replaceBrickOnSameVM(xosansr, previousBrick, newLvmSr, brickSize) + } + // TODO: a bit of user input validation on 'previousBrick', it's going to ssh + const previousIp = previousBrick.split(':')[0] + brickSize = brickSize === undefined ? Infinity : brickSize + const xapi = this.getXapi(xosansr) + const nodes = xapi.xo.getData(xosansr, 'xosan_config').nodes + const newIpAddress = _findAFreeIPAddress(nodes) + const nodeIndex = nodes.findIndex(node => node.vm.ip === previousIp) + const stayingNodes = filter(nodes, (node, index) => index !== nodeIndex) + const glusterEndpoint = { xapi, + hosts: map(stayingNodes, node => xapi.getObject(node.host)), + addresses: map(stayingNodes, node => node.vm.ip) } + const previousVMEntry = _getIPToVMDict(xapi, xosansr)[previousBrick] + const arbiter = nodes[nodeIndex].arbiter + let { data, newVM, addressAndHost } = await this::insertNewGlusterVm(xapi, xosansr, newLvmSr, + {labelSuffix: arbiter ? '_arbiter' : '', glusterEndpoint, newIpAddress, increaseDataDisk: !arbiter, brickSize}) + await glusterCmd(glusterEndpoint, `volume replace-brick xosan ${previousBrick} ${addressAndHost.brickName} commit force`) + await glusterCmd(glusterEndpoint, 'peer detach ' + previousIp) + data.nodes.splice(nodeIndex, 1, { + brickName: addressAndHost.brickName, + host: addressAndHost.host.$id, + arbiter: arbiter, + vm: {ip: addressAndHost.address, id: newVM.$id}, + underlyingSr: newLvmSr + }) + await xapi.xo.setData(xosansr, 'xosan_config', data) + if (previousVMEntry) { + await xapi.deleteVm(previousVMEntry.vm, true) + } + await xapi.call('SR.scan', xapi.getObject(xosansr).$ref) +} + +replaceBrick.description = 'replaceBrick brick in gluster volume' +replaceBrick.permission = 'admin' +replaceBrick.params = { + xosansr: { type: 'string' }, + previousBrick: { type: 'string' }, + newLvmSr: { type: 'string' }, + brickSize: { type: 'number' } +} + +replaceBrick.resolve = { + xosansr: ['sr', 'SR', 'administrate'] +} + +async function _prepareGlusterVm (xapi, lvmSr, newVM, xosanNetwork, ipAddress, {labelSuffix = '', increaseDataDisk = true, + maxDiskSize = Infinity, memorySize = 2 * GIGABYTE}) { + const host = lvmSr.$PBDs[0].$host + const xenstoreData = { + 'vm-data/hostname': 'XOSAN' + lvmSr.name_label + labelSuffix, + 'vm-data/sshkey': (await getOrCreateSshKey(xapi)).public, + 'vm-data/ip': ipAddress, + 'vm-data/mtu': String(xosanNetwork.MTU), + 'vm-data/vlan': String(xosanNetwork.$PIFs[0].vlan || 0) + } + const ip = ipAddress + const sr = xapi.getObject(lvmSr.$id) + // refresh the object so that sizes are correct + await xapi._waitObjectState(sr.$id, sr => Boolean(sr.$PBDs)) + const firstVif = newVM.$VIFs[0] + if (xosanNetwork.$id !== firstVif.$network.$id) { + try { + await xapi.call('VIF.move', firstVif.$ref, xosanNetwork.$ref) + } catch (error) { + if (error.code === 'MESSAGE_METHOD_UNKNOWN') { + // VIF.move has been introduced in xenserver 7.0 + await xapi.deleteVif(firstVif.$id) + await xapi.createVif(newVM.$id, xosanNetwork.$id, firstVif) + } + } + } + await xapi.addTag(newVM.$id, `XOSAN-${xapi.pool.name_label}`) + await xapi.editVm(newVM, { + name_label: `XOSAN - ${lvmSr.name_label} - ${host.name_label} ${labelSuffix}`, + name_description: 'Xosan VM storage', + // https://bugs.xenserver.org/browse/XSO-762 + memoryMax: memorySize, + memoryMin: memorySize, + memoryStaticMax: memorySize, + memory: memorySize + }) + await xapi.call('VM.set_xenstore_data', newVM.$ref, xenstoreData) + const rootDisk = newVM.$VBDs.map(vbd => vbd && vbd.$VDI).find(vdi => vdi && vdi.name_label === 'xosan_root') + const rootDiskSize = rootDisk.virtual_size + await xapi.startVm(newVM) + debug('waiting for boot of ', ip) + // wait until we find the assigned IP in the networks, we are just checking the boot is complete + const vmIsUp = vm => Boolean(vm.$guest_metrics && includes(vm.$guest_metrics.networks, ip)) + const vm = await xapi._waitObjectState(newVM.$id, vmIsUp) + debug('booted ', ip) + const localEndpoint = {xapi: xapi, hosts: [host], addresses: [ip]} + const srFreeSpace = sr.physical_size - sr.physical_utilisation + // we use a percentage because it looks like the VDI overhead is proportional + const newSize = Math.min(floor2048(Math.min(maxDiskSize - rootDiskSize, srFreeSpace * XOSAN_DATA_DISK_USEAGE_RATIO)), + XOSAN_MAX_DISK_SIZE) + const smallDiskSize = 1073741824 + const deviceFile = await createNewDisk(xapi, lvmSr, newVM, increaseDataDisk ? newSize : smallDiskSize) + const brickName = await mountNewDisk(localEndpoint, ip, deviceFile) + return { address: ip, host, vm, underlyingSr: lvmSr, brickName } +} + +async function _importGlusterVM (xapi, template, lvmsrId) { + const templateStream = await this.requestResource('xosan', template.id, template.version) + const newVM = await xapi.importVm(templateStream, { srId: lvmsrId, type: 'xva' }) + await xapi.editVm(newVM, { + autoPoweron: true + }) + return newVM +} + +function _findAFreeIPAddress (nodes) { + return _findIPAddressOutsideList(map(nodes, n => n.vm.ip)) +} + +function _findIPAddressOutsideList (reservedList) { + const vmIpLastNumber = 101 + for (let i = vmIpLastNumber; i < 255; i++) { + const candidate = NETWORK_PREFIX + i + if (!reservedList.find(a => a === candidate)) { + return candidate + } + } + return null +} + +const _median = arr => { + arr.sort((a, b) => a - b) + return arr[Math.floor(arr.length / 2)] +} + +const insertNewGlusterVm = defer.onFailure(async function ($onFailure, xapi, xosansr, lvmsrId, {labelSuffix = '', + glusterEndpoint = null, ipAddress = null, increaseDataDisk = true, brickSize = Infinity}) { + const data = xapi.xo.getData(xosansr, 'xosan_config') + if (ipAddress === null) { + ipAddress = _findAFreeIPAddress(data.nodes) + } + const vmsMemories = [] + for (let node of data.nodes) { + try { + vmsMemories.push(xapi.getObject(node.vm.id).memory_dynamic_max) + } catch (e) { + // pass + } + } + const xosanNetwork = xapi.getObject(data.network) + const srObject = xapi.getObject(lvmsrId) + // can't really copy an existing VM, because existing gluster VMs disks might too large to be copied. + const newVM = await this::_importGlusterVM(xapi, data.template, lvmsrId) + $onFailure(() => xapi.deleteVm(newVM, true)) + const addressAndHost = await _prepareGlusterVm(xapi, srObject, newVM, xosanNetwork, ipAddress, {labelSuffix, + increaseDataDisk, + maxDiskSize: brickSize, + memorySize: vmsMemories.length ? _median(vmsMemories) : 2 * GIGABYTE}) + if (!glusterEndpoint) { + glusterEndpoint = this::_getGlusterEndpoint(xosansr) + } + await _probePoolAndWaitForPresence(glusterEndpoint, [addressAndHost.address]) + return { data, newVM, addressAndHost, glusterEndpoint } +}) + +export const addBricks = defer.onFailure(async function ($onFailure, {xosansr, lvmsrs, brickSize}) { + const xapi = this.getXapi(xosansr) + if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) { + throw new Error('createSR is already running for this pool') + } + CURRENTLY_CREATING_SRS[xapi.pool.$id] = true + try { + const data = xapi.xo.getData(xosansr, 'xosan_config') + const usedAddresses = map(data.nodes, n => n.vm.ip) + const glusterEndpoint = this::_getGlusterEndpoint(xosansr) + const newAddresses = [] + const newNodes = [] + for (let newSr of lvmsrs) { + const ipAddress = _findIPAddressOutsideList(usedAddresses.concat(newAddresses)) + newAddresses.push(ipAddress) + const {newVM, addressAndHost} = await this::insertNewGlusterVm(xapi, xosansr, newSr, { ipAddress, brickSize }) + $onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + ipAddress, true)) + $onFailure(() => xapi.deleteVm(newVM, true)) + const brickName = addressAndHost.brickName + newNodes.push({brickName, host: addressAndHost.host.$id, vm: {id: newVM.$id, ip: ipAddress}, underlyingSr: newSr}) + } + const arbiterNode = data.nodes.find(n => n['arbiter']) + if (arbiterNode) { + await glusterCmd(glusterEndpoint, + `volume remove-brick xosan replica ${data.nodes.length - 1} ${arbiterNode.brickName} force`) + data.nodes = data.nodes.filter(n => n !== arbiterNode) + data.type = 'replica' + await xapi.xo.setData(xosansr, 'xosan_config', data) + await glusterCmd(glusterEndpoint, 'peer detach ' + arbiterNode.vm.ip, true) + await xapi.deleteVm(arbiterNode.vm.id, true) + } + await glusterCmd(glusterEndpoint, `volume add-brick xosan ${newNodes.map(n => n.brickName).join(' ')}`) + data.nodes = data.nodes.concat(newNodes) + await xapi.xo.setData(xosansr, 'xosan_config', data) + await xapi.call('SR.scan', xapi.getObject(xosansr).$ref) + } finally { + delete CURRENTLY_CREATING_SRS[xapi.pool.$id] + } +}) + +addBricks.description = 'add brick to XOSAN SR' +addBricks.permission = 'admin' +addBricks.params = { + xosansr: { type: 'string' }, + lvmsrs: { + type: 'array', + items: { + type: 'string' + } }, + brickSize: {type: 'number'} +} + +addBricks.resolve = { + xosansr: ['sr', 'SR', 'administrate'], + lvmsrs: ['sr', 'SR', 'administrate'] +} + +export const removeBricks = defer.onFailure(async function ($onFailure, { xosansr, bricks }) { + const xapi = this.getXapi(xosansr) + if (CURRENTLY_CREATING_SRS[xapi.pool.$id]) { + throw new Error('this there is already a XOSAN operation running on this pool') + } + CURRENTLY_CREATING_SRS[xapi.pool.$id] = true + try { + const data = xapi.xo.getData(xosansr, 'xosan_config') + // IPV6 + const ips = map(bricks, b => b.split(':')[0]) + const glusterEndpoint = this::_getGlusterEndpoint(xosansr) + // "peer detach" doesn't allow removal of locahost + remove(glusterEndpoint.addresses, ip => ips.includes(ip)) + const dict = _getIPToVMDict(xapi, xosansr) + const brickVMs = map(bricks, b => dict[b]) + await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`) + await asyncMap(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true)) + remove(data.nodes, node => ips.includes(node.vm.ip)) + await xapi.xo.setData(xosansr, 'xosan_config', data) + await xapi.call('SR.scan', xapi.getObject(xosansr).$ref) + await asyncMap(brickVMs, vm => xapi.deleteVm(vm.vm, true)) + } finally { + delete CURRENTLY_CREATING_SRS[xapi.pool.$id] + } +}) + +removeBricks.description = 'remove brick from XOSAN SR' +removeBricks.permission = 'admin' +removeBricks.params = { + xosansr: { type: 'string' }, + bricks: { + type: 'array', + items: { type: 'string' } + } +} + export function checkSrIsBusy ({ poolId }) { return !!CURRENTLY_CREATING_SRS[poolId] } @@ -400,7 +747,7 @@ POSSIBLE_CONFIGURATIONS[2] = [{ layout: 'replica_arbiter', redundancy: 3, capaci POSSIBLE_CONFIGURATIONS[3] = [ { layout: 'disperse', redundancy: 1, capacity: 2 }, { layout: 'replica', redundancy: 3, capacity: 1 }] -POSSIBLE_CONFIGURATIONS[4] = [{ layout: 'replica', redundancy: 2, capacity: 1 }] +POSSIBLE_CONFIGURATIONS[4] = [{ layout: 'replica', redundancy: 2, capacity: 2 }] POSSIBLE_CONFIGURATIONS[5] = [{ layout: 'disperse', redundancy: 1, capacity: 4 }] POSSIBLE_CONFIGURATIONS[6] = [ { layout: 'disperse', redundancy: 2, capacity: 4 }, @@ -427,7 +774,7 @@ POSSIBLE_CONFIGURATIONS[15] = [ { layout: 'replica', redundancy: 3, capacity: 5 }] POSSIBLE_CONFIGURATIONS[16] = [{ layout: 'replica', redundancy: 2, capacity: 8 }] -export async function computeXosanPossibleOptions ({ lvmSrs }) { +export async function computeXosanPossibleOptions ({ lvmSrs, brickSize }) { const count = lvmSrs.length const configurations = POSSIBLE_CONFIGURATIONS[count] if (!configurations) { @@ -437,9 +784,9 @@ export async function computeXosanPossibleOptions ({ lvmSrs }) { const xapi = this.getXapi(lvmSrs[0]) const srs = map(lvmSrs, srId => xapi.getObject(srId)) const srSizes = map(srs, sr => sr.physical_size - sr.physical_utilisation) - const minSize = Math.min.apply(null, srSizes) - const brickSize = (minSize - XOSAN_VM_SYSTEM_DISK_SIZE) * XOSAN_DATA_DISK_USEAGE_RATIO - return configurations.map(conf => ({ ...conf, availableSpace: brickSize * conf.capacity })) + const minSize = Math.min.apply(null, srSizes.concat(brickSize)) + const finalBrickSize = Math.floor((minSize - XOSAN_VM_SYSTEM_DISK_SIZE) * XOSAN_DATA_DISK_USEAGE_RATIO) + return configurations.map(conf => ({ ...conf, availableSpace: finalBrickSize * conf.capacity })) } } @@ -449,6 +796,9 @@ computeXosanPossibleOptions.params = { items: { type: 'string' } + }, + brickSize: { + type: 'number' } } diff --git a/yarn.lock b/yarn.lock index cbc1d9946..cd7f3f936 100644 --- a/yarn.lock +++ b/yarn.lock @@ -240,10 +240,6 @@ argparse@^1.0.7: dependencies: sprintf-js "~1.0.2" -arp-a@^0.5.1: - version "0.5.1" - resolved "https://registry.yarnpkg.com/arp-a/-/arp-a-0.5.1.tgz#c6b8b2dd449b1389653518ca9b5cb10c555558d3" - arr-diff@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/arr-diff/-/arr-diff-2.0.0.tgz#8f3b827f955a8bd669697e4a4256ac3ceae356cf" @@ -2115,7 +2111,17 @@ escape-string-regexp@^1.0.2, escape-string-regexp@^1.0.5: version "1.0.5" resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz#1b61c0562190a8dff6ae3bb2cf0200ca130b86d4" -escodegen@1.x.x, escodegen@^1.6.1: +escodegen@1.x.x, escodegen@~1.3.2: + version "1.3.3" + resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.3.3.tgz#f024016f5a88e046fd12005055e939802e6c5f23" + dependencies: + esprima "~1.1.1" + estraverse "~1.5.0" + esutils "~1.0.0" + optionalDependencies: + source-map "~0.1.33" + +escodegen@^1.6.1: version "1.8.1" resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.8.1.tgz#5a5b53af4693110bebb0867aa3430dd3b70a1018" dependencies: @@ -2135,16 +2141,6 @@ escodegen@~0.0.24: optionalDependencies: source-map ">= 0.1.2" -escodegen@~1.3.2: - version "1.3.3" - resolved "https://registry.yarnpkg.com/escodegen/-/escodegen-1.3.3.tgz#f024016f5a88e046fd12005055e939802e6c5f23" - dependencies: - esprima "~1.1.1" - estraverse "~1.5.0" - esutils "~1.0.0" - optionalDependencies: - source-map "~0.1.33" - escope@^3.6.0: version "3.6.0" resolved "https://registry.yarnpkg.com/escope/-/escope-3.6.0.tgz#e01975e812781a163a6dadfdd80398dc64c889c3" @@ -5642,7 +5638,7 @@ read-pkg@^1.0.0: normalize-package-data "^2.3.2" path-type "^1.0.0" -readable-stream@1.1.x, readable-stream@^1.0.33, readable-stream@~1.1.9: +readable-stream@1.1.x, readable-stream@~1.1.9: version "1.1.14" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9" dependencies: @@ -5663,7 +5659,7 @@ readable-stream@2, readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stre string_decoder "~1.0.3" util-deprecate "~1.0.1" -"readable-stream@>=1.0.33-1 <1.1.0-0", readable-stream@~1.0.0, readable-stream@~1.0.17, readable-stream@~1.0.26, readable-stream@~1.0.27-1, readable-stream@~1.0.31: +"readable-stream@>=1.0.33-1 <1.1.0-0", readable-stream@^1.0.33, readable-stream@~1.0.0, readable-stream@~1.0.17, readable-stream@~1.0.26, readable-stream@~1.0.27-1, readable-stream@~1.0.31: version "1.0.34" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c" dependencies: