mirror of
https://github.com/finos/SymphonyElectron.git
synced 2025-02-25 18:55:29 -06:00
Merge pull request #288 from keerthi16/SEARCH-563
SEARCH-563 (Deleting the temp files)
This commit is contained in:
commit
d49cffb185
@ -10,6 +10,9 @@ let Transform = stream.Transform;
|
||||
let util = require('util');
|
||||
let crypto = require('crypto');
|
||||
|
||||
const log = require('../log.js');
|
||||
const logLevels = require('../enums/logLevels.js');
|
||||
|
||||
let KEY_LENGTH = 32; // bytes
|
||||
let GCM_NONCE_LENGTH = 12; //bytes
|
||||
let GCM_MAC_LENGTH = 16; //bytes
|
||||
@ -151,7 +154,8 @@ DecryptionStream.prototype._transform = function(chunk, enc, cb) {
|
||||
DecryptionStream.prototype._flush = function(cb) {
|
||||
let mac = pullOutMac(this._cipherTextChunks);
|
||||
if (!mac) {
|
||||
return this.emit('error', new Error('Decryption failed: bad cipher text.'));
|
||||
log.send(logLevels.ERROR, 'Crypto: Decryption failed: bad cipher text.');
|
||||
return cb();
|
||||
}
|
||||
this._decipher.setAuthTag(mac);
|
||||
let decrypted = this._cipherTextChunks.map(function(item) {
|
||||
@ -160,6 +164,7 @@ DecryptionStream.prototype._flush = function(cb) {
|
||||
try {
|
||||
this._decipher.final();
|
||||
} catch (e) {
|
||||
log.send(logLevels.ERROR, 'Crypto: Decryption failed: ' + e);
|
||||
return cb();
|
||||
}
|
||||
decrypted.forEach(function(item) {
|
||||
|
@ -20,10 +20,11 @@ class Crypto {
|
||||
constructor(userId, key) {
|
||||
this.indexDataFolder = `${searchConfig.FOLDERS_CONSTANTS.PREFIX_NAME_PATH}_${userId}_${searchConfig.INDEX_VERSION}`;
|
||||
this.permanentIndexName = `${searchConfig.FOLDERS_CONSTANTS.PREFIX_NAME}_${userId}_${searchConfig.INDEX_VERSION}`;
|
||||
this.dump = DUMP_PATH;
|
||||
this.key = key;
|
||||
this.encryptedIndex = `${DUMP_PATH}/${this.permanentIndexName}.enc`;
|
||||
this.dataFolder = searchConfig.FOLDERS_CONSTANTS.INDEX_PATH;
|
||||
this.lz4Temp = `${DUMP_PATH}/${this.permanentIndexName}${searchConfig.TAR_LZ4_EXT}`;
|
||||
this.decryptedTemp = `${DUMP_PATH}/decrypted${searchConfig.TAR_LZ4_EXT}`;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,22 +52,37 @@ class Crypto {
|
||||
if (response && response.stderr) {
|
||||
log.send(logLevels.WARN, 'Crypto: Child process stderr while compression, ' + response.stderr);
|
||||
}
|
||||
const input = fs.createReadStream(`${this.dump}/${this.permanentIndexName}${searchConfig.TAR_LZ4_EXT}`);
|
||||
const input = fs.createReadStream(this.lz4Temp);
|
||||
const outputEncryption = fs.createWriteStream(this.encryptedIndex);
|
||||
let config = {
|
||||
key: key
|
||||
};
|
||||
const encrypt = crypto.encrypt(config);
|
||||
let encrypt;
|
||||
try {
|
||||
encrypt = crypto.encrypt(config);
|
||||
} catch (e) {
|
||||
log.send(logLevels.ERROR, 'Error encrypting : ' + e);
|
||||
if (fs.existsSync(this.lz4Temp)) {
|
||||
fs.unlinkSync(this.lz4Temp);
|
||||
}
|
||||
reject();
|
||||
return;
|
||||
}
|
||||
|
||||
let encryptionProcess = input.pipe(encrypt).pipe(outputEncryption);
|
||||
|
||||
encryptionProcess.on('finish', (err) => {
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Crypto: Error while encrypting the compressed file: ' + err);
|
||||
if (fs.existsSync(this.lz4Temp)) {
|
||||
fs.unlinkSync(this.lz4Temp);
|
||||
}
|
||||
reject(new Error(err));
|
||||
return;
|
||||
}
|
||||
fs.unlinkSync(`${this.dump}/${this.permanentIndexName}${searchConfig.TAR_LZ4_EXT}`);
|
||||
if (fs.existsSync(this.lz4Temp)) {
|
||||
fs.unlinkSync(this.lz4Temp);
|
||||
}
|
||||
resolve('Success');
|
||||
});
|
||||
});
|
||||
@ -88,23 +104,33 @@ class Crypto {
|
||||
}
|
||||
|
||||
const input = fs.createReadStream(this.encryptedIndex);
|
||||
const output = fs.createWriteStream(`${this.dump}/decrypted${searchConfig.TAR_LZ4_EXT}`);
|
||||
const output = fs.createWriteStream(this.decryptedTemp);
|
||||
let config = {
|
||||
key: this.key
|
||||
};
|
||||
const decrypt = crypto.decrypt(config);
|
||||
let decrypt;
|
||||
try {
|
||||
decrypt = crypto.decrypt(config);
|
||||
} catch (e) {
|
||||
log.send(logLevels.ERROR, 'Error decrypting : ' + e);
|
||||
if (fs.existsSync(this.decryptedTemp)) {
|
||||
fs.unlinkSync(this.decryptedTemp);
|
||||
}
|
||||
reject();
|
||||
return;
|
||||
}
|
||||
|
||||
let decryptionProcess = input.pipe(decrypt).pipe(output);
|
||||
|
||||
decryptionProcess.on('finish', () => {
|
||||
|
||||
if (!fs.existsSync(`${this.dump}/decrypted${searchConfig.TAR_LZ4_EXT}`)){
|
||||
if (!fs.existsSync(this.decryptedTemp)){
|
||||
log.send(logLevels.ERROR, 'decrypted.tar.lz4 file not found');
|
||||
reject();
|
||||
return;
|
||||
}
|
||||
|
||||
lz4.deCompression(`${this.dump}/decrypted${searchConfig.TAR_LZ4_EXT}`,(error, response) => {
|
||||
lz4.deCompression(this.decryptedTemp,(error, response) => {
|
||||
if (error) {
|
||||
log.send(logLevels.ERROR, 'Crypto: Error while deCompression, ' + error);
|
||||
// no return, need to unlink if error
|
||||
@ -113,7 +139,7 @@ class Crypto {
|
||||
if (response && response.stderr) {
|
||||
log.send(logLevels.WARN, 'Crypto: Child process stderr while deCompression, ' + response.stderr);
|
||||
}
|
||||
fs.unlink(`${this.dump}/decrypted${searchConfig.TAR_LZ4_EXT}`, () => {
|
||||
fs.unlink(this.decryptedTemp, () => {
|
||||
resolve('success');
|
||||
});
|
||||
})
|
||||
|
@ -115,6 +115,12 @@ class Search {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!fs.existsSync(this.dataFolder)) {
|
||||
log.send(logLevels.ERROR, 'User index folder not found');
|
||||
reject(new Error('User index folder not found'));
|
||||
return;
|
||||
}
|
||||
|
||||
const indexId = randomString.generate(searchConfig.BATCH_RANDOM_INDEX_PATH_LENGTH);
|
||||
libSymphonySearch.symSECreatePartialIndexAsync(this.batchIndex, indexId, messages, (err, res) => {
|
||||
if (err) {
|
||||
@ -133,6 +139,13 @@ class Search {
|
||||
*/
|
||||
mergeIndexBatches() {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
if (!fs.existsSync(this.dataFolder)) {
|
||||
log.send(logLevels.ERROR, 'User index folder not found');
|
||||
reject(new Error('User index folder not found'));
|
||||
return;
|
||||
}
|
||||
|
||||
libSymphonySearch.symSEMergePartialIndexAsync(this.indexFolderName, this.batchIndex, (err, res) => {
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Error merging the index ->' + err);
|
||||
@ -186,6 +199,11 @@ class Search {
|
||||
throw new Error('Library not initialized');
|
||||
}
|
||||
|
||||
if (!fs.existsSync(this.dataFolder)) {
|
||||
log.send(logLevels.ERROR, 'User index folder not found');
|
||||
throw new Error('User index folder not found');
|
||||
}
|
||||
|
||||
this.isRealTimeIndexing = true;
|
||||
return libSymphonySearch.symSEIndexRealTimeAsync(this.realTimeIndex, message, (err, result) => {
|
||||
this.isRealTimeIndexing = false;
|
||||
|
@ -54,7 +54,9 @@ describe('Tests for Search', function() {
|
||||
realTimeIndexPath = path.join(userConfigDir, 'data', 'temp_realtime_index');
|
||||
tempBatchPath = path.join(userConfigDir, 'data', 'temp_batch_indexes');
|
||||
dataFolderPath = path.join(searchConfig.FOLDERS_CONSTANTS.EXEC_PATH, '..', 'data');
|
||||
|
||||
if (fs.existsSync(dataFolderPath)) {
|
||||
fs.unlinkSync(dataFolderPath)
|
||||
}
|
||||
done();
|
||||
});
|
||||
});
|
||||
@ -347,7 +349,7 @@ describe('Tests for Search', function() {
|
||||
|
||||
expect(function () {
|
||||
SearchApi.realTimeIndexing()
|
||||
}).toThrow(new Error('RealTime Indexing: Messages is required'));
|
||||
}).toThrow();
|
||||
|
||||
SearchApi.isInitialized = false;
|
||||
expect(function () {
|
||||
|
Loading…
Reference in New Issue
Block a user