mirror of
https://github.com/finos/SymphonyElectron.git
synced 2024-11-25 02:10:32 -06:00
SEARCH-438
- Implemented the real-time indexing in a queue and flush
This commit is contained in:
parent
6b1cb11db7
commit
1cbc443024
@ -192,7 +192,8 @@
|
||||
sendMessage.addEventListener('click', function () {
|
||||
if (realTimeIndexing.value !== "") {
|
||||
let message = realTimeIndexing.value;
|
||||
resultsEl.innerHTML = search.realTimeIndexing(message);
|
||||
search.batchRealTimeIndexing(JSON.parse(message));
|
||||
resultsEl.innerHTML = 'success';
|
||||
} else {
|
||||
resultsEl.innerHTML = "Please check the entered value"
|
||||
}
|
||||
|
39
js/search/queue.js
Normal file
39
js/search/queue.js
Normal file
@ -0,0 +1,39 @@
|
||||
let messagesData = [];
|
||||
|
||||
let makeBoundTimedCollector = function(isIndexing, timeout, callback) {
|
||||
let timer;
|
||||
|
||||
return function (...args) {
|
||||
if (!timer){
|
||||
timer = setTimeout(function(){
|
||||
if (!isIndexing) {
|
||||
flush(getQueue())
|
||||
}
|
||||
}, timeout);
|
||||
}
|
||||
|
||||
let queue = getQueue();
|
||||
queue.push(args[0]);
|
||||
|
||||
if (!isIndexing()) {
|
||||
flush(queue);
|
||||
}
|
||||
};
|
||||
|
||||
function flush(queue) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
resetQueue();
|
||||
callback(JSON.stringify(queue));
|
||||
}
|
||||
|
||||
function getQueue(){
|
||||
return messagesData
|
||||
}
|
||||
|
||||
function resetQueue(){
|
||||
messagesData = []
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = makeBoundTimedCollector;
|
@ -8,6 +8,10 @@ const app = electron.app;
|
||||
const path = require('path');
|
||||
const isDevEnv = require('../utils/misc.js').isDevEnv;
|
||||
const isMac = require('../utils/misc.js').isMac;
|
||||
const makeBoundTimedCollector = require('./queue');
|
||||
const searchConfig = require('./searchConfig');
|
||||
const log = require('../log.js');
|
||||
const logLevels = require('../enums/logLevels.js');
|
||||
|
||||
// Search library
|
||||
const libSymphonySearch = require('./searchLibrary');
|
||||
@ -26,14 +30,6 @@ const TEMP_REAL_TIME_INDEX = isDevEnv ? './data/temp_realtime_index' : path.join
|
||||
const INDEX_PREFIX = isDevEnv ? './data/search_index' : path.join(userData, 'data/search_index');
|
||||
// Folder contains real time, batch and user index
|
||||
const INDEX_DATA_FOLDER = isDevEnv ? './data' : path.join(userData, 'data');
|
||||
//3 Months
|
||||
const SEARCH_PERIOD_SUBTRACTOR = 3 * 31 * 24 * 60 * 60 * 1000;
|
||||
const MINIMUM_DATE = '0000000000000';
|
||||
const MAXIMUM_DATE = '9999999999999';
|
||||
const INDEX_VERSION = 'v1';
|
||||
|
||||
const SORT_BY_SCORE = 0;
|
||||
const BATCH_RANDOM_INDEX_PATH_LENGTH = 20;
|
||||
|
||||
// library path contractor
|
||||
const winArchPath = process.arch === 'ia32' ? 'library/indexvalidator-x86.exe' : 'library/indexvalidator-x64.exe';
|
||||
@ -58,13 +54,16 @@ class Search {
|
||||
this.isInitialized = false;
|
||||
this.userId = userId;
|
||||
this.key = key;
|
||||
this.indexFolderName = INDEX_PREFIX + '_' + this.userId + '_' + INDEX_VERSION;
|
||||
this.indexFolderName = INDEX_PREFIX + '_' + this.userId + '_' + searchConfig.INDEX_VERSION;
|
||||
this.dataFolder = INDEX_DATA_FOLDER;
|
||||
this.realTimeIndex = TEMP_REAL_TIME_INDEX;
|
||||
this.batchIndex = TEMP_BATCH_INDEX_FOLDER;
|
||||
this.messageData = [];
|
||||
this.isRealTimeIndexing = false;
|
||||
this.crypto = new Crypto(userId, key);
|
||||
this.decryptAndInit();
|
||||
this.collector = makeBoundTimedCollector(this.checkIsRealTimeIndexing.bind(this),
|
||||
searchConfig.REAL_TIME_INDEXING_TIME, this.realTimeIndexing.bind(this));
|
||||
}
|
||||
|
||||
decryptAndInit() {
|
||||
@ -96,10 +95,10 @@ class Search {
|
||||
libSymphonySearch.symSERemoveFolder(this.batchIndex);
|
||||
Search.indexValidator(this.indexFolderName);
|
||||
Search.indexValidator(this.realTimeIndex);
|
||||
let indexDateStartFrom = new Date().getTime() - SEARCH_PERIOD_SUBTRACTOR;
|
||||
let indexDateStartFrom = new Date().getTime() - searchConfig.SEARCH_PERIOD_SUBTRACTOR;
|
||||
// Deleting all the messages except 3 Months from now
|
||||
libSymphonySearch.symSEDeleteMessages(this.indexFolderName, null,
|
||||
MINIMUM_DATE, indexDateStartFrom.toString());
|
||||
searchConfig.MINIMUM_DATE, indexDateStartFrom.toString());
|
||||
this.isInitialized = true;
|
||||
}
|
||||
|
||||
@ -112,23 +111,34 @@ class Search {
|
||||
indexBatch(messages) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!messages) {
|
||||
log.send(logLevels.ERROR, 'Messages was not provided for batch indexing');
|
||||
reject(new Error('Messages is required'));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(JSON.parse(messages) instanceof Array)) {
|
||||
reject(new Error('Messages must be an array'));
|
||||
try {
|
||||
let msg = JSON.parse(messages);
|
||||
if (!(msg instanceof Array)) {
|
||||
log.send(logLevels.ERROR, 'Message must be an array batch indexing');
|
||||
reject(new Error('Messages must be an array'));
|
||||
return;
|
||||
}
|
||||
} catch(e) {
|
||||
log.send(logLevels.ERROR, 'Batch indexing parse Error -> ' + e);
|
||||
reject(new Error(e));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.isInitialized) {
|
||||
log.send(logLevels.ERROR, 'Library not initialized');
|
||||
reject(new Error('Library not initialized'));
|
||||
return;
|
||||
}
|
||||
|
||||
const indexId = randomString.generate(BATCH_RANDOM_INDEX_PATH_LENGTH);
|
||||
const indexId = randomString.generate(searchConfig.BATCH_RANDOM_INDEX_PATH_LENGTH);
|
||||
libSymphonySearch.symSECreatePartialIndexAsync(this.batchIndex, indexId, messages, (err, res) => {
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Error indexing the batch ->' + err);
|
||||
reject(new Error(err));
|
||||
}
|
||||
resolve(res);
|
||||
@ -144,6 +154,7 @@ class Search {
|
||||
return new Promise((resolve, reject) => {
|
||||
libSymphonySearch.symSEMergePartialIndexAsync(this.indexFolderName, this.batchIndex, (err, res) => {
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Error merging the index ->' + err);
|
||||
reject(new Error(err));
|
||||
}
|
||||
libSymphonySearch.symSERemoveFolder(this.batchIndex);
|
||||
@ -152,26 +163,51 @@ class Search {
|
||||
});
|
||||
}
|
||||
|
||||
batchRealTimeIndexing(message) {
|
||||
this.collector(message);
|
||||
}
|
||||
|
||||
checkIsRealTimeIndexing() {
|
||||
return this.isRealTimeIndexing
|
||||
}
|
||||
|
||||
/**
|
||||
* An array of messages to be indexed
|
||||
* in real time
|
||||
* @param message
|
||||
*/
|
||||
realTimeIndexing(message) {
|
||||
|
||||
if (!message) {
|
||||
log.send(logLevels.ERROR, 'Error message not provided for real-time indexing');
|
||||
return new Error('Message is required');
|
||||
}
|
||||
|
||||
if (!(JSON.parse(message) instanceof Array)){
|
||||
return new Error('Message must be an array');
|
||||
try {
|
||||
let msg = JSON.parse(message);
|
||||
if (!(msg instanceof Array)) {
|
||||
log.send(logLevels.ERROR, 'Message must be an array real-time indexing');
|
||||
return (new Error('Messages must be an array'));
|
||||
}
|
||||
} catch(e) {
|
||||
log.send(logLevels.ERROR, 'Real-time indexing parse Error -> ' + e);
|
||||
return (new Error(e));
|
||||
}
|
||||
|
||||
if (!this.isInitialized) {
|
||||
log.send(logLevels.ERROR, 'Library not initialized');
|
||||
return new Error('Library not initialized');
|
||||
}
|
||||
|
||||
let result = libSymphonySearch.symSEIndexRealTime(this.realTimeIndex, message);
|
||||
return result === 0 ? "Successful" : result
|
||||
this.isRealTimeIndexing = true;
|
||||
return libSymphonySearch.symSEIndexRealTimeAsync(this.realTimeIndex, message, (err, result) => {
|
||||
this.isRealTimeIndexing = false;
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Indexing the real-time data -> ' + err);
|
||||
return new Error(err);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -211,6 +247,7 @@ class Search {
|
||||
return this.crypto.encryption().then(() => {
|
||||
return 'Success'
|
||||
}).catch((e) => {
|
||||
log.send(logLevels.ERROR, 'Encrypting the index folder failed ->' + e);
|
||||
return (new Error(e));
|
||||
});
|
||||
}
|
||||
@ -238,11 +275,13 @@ class Search {
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.isInitialized) {
|
||||
log.send(logLevels.ERROR, 'Library not initialized');
|
||||
reject(new Error('Library not initialized'));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!fs.existsSync(this.indexFolderName) || !fs.existsSync(this.realTimeIndex)) {
|
||||
log.send(logLevels.ERROR, 'Index folder does not exist.');
|
||||
reject('Index folder does not exist.');
|
||||
return;
|
||||
}
|
||||
@ -254,8 +293,8 @@ class Search {
|
||||
return;
|
||||
}
|
||||
|
||||
let sd = new Date().getTime() - SEARCH_PERIOD_SUBTRACTOR;
|
||||
let sd_time = MINIMUM_DATE;
|
||||
let sd = new Date().getTime() - searchConfig.SEARCH_PERIOD_SUBTRACTOR;
|
||||
let sd_time = searchConfig.MINIMUM_DATE;
|
||||
if (startDate && startDate !== "" && typeof startDate === 'object') {
|
||||
sd_time = new Date(startDate).getTime();
|
||||
if (sd_time >= sd) {
|
||||
@ -263,7 +302,7 @@ class Search {
|
||||
}
|
||||
}
|
||||
|
||||
let ed_time = MAXIMUM_DATE;
|
||||
let ed_time = searchConfig.MAXIMUM_DATE;
|
||||
if (endDate && endDate !== "" && typeof endDate === 'object') {
|
||||
ed_time = new Date(endDate).getTime();
|
||||
}
|
||||
@ -277,7 +316,7 @@ class Search {
|
||||
}
|
||||
|
||||
if (!_sortOrder && _sortOrder === "" && typeof _sortOrder !== 'number' && Math.round(_sortOrder) !== _sortOrder) {
|
||||
_sortOrder = SORT_BY_SCORE;
|
||||
_sortOrder = searchConfig.SORT_BY_SCORE;
|
||||
}
|
||||
|
||||
const returnedResult = libSymphonySearch.symSESearch(this.indexFolderName, this.realTimeIndex, q, sd_time.toString(), ed_time.toString(), _offset, _limit, _sortOrder);
|
||||
@ -298,17 +337,20 @@ class Search {
|
||||
getLatestMessageTimestamp() {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.isInitialized) {
|
||||
log.send(logLevels.ERROR, 'Library not initialized');
|
||||
reject('Not initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!fs.existsSync(this.indexFolderName)) {
|
||||
log.send(logLevels.ERROR, 'Index folder does not exist.');
|
||||
reject('Index folder does not exist.');
|
||||
return;
|
||||
}
|
||||
|
||||
libSymphonySearch.symSEGetLastMessageTimestampAsync(this.indexFolderName, (err, res) => {
|
||||
if (err) {
|
||||
log.send(logLevels.ERROR, 'Error getting the index timestamp ->' + err);
|
||||
reject(new Error(err));
|
||||
}
|
||||
const returnedResult = res;
|
||||
@ -449,6 +491,7 @@ class Search {
|
||||
if (data.status === 'OK') {
|
||||
return data;
|
||||
}
|
||||
log.send(logLevels.ERROR, 'Unable validate index folder');
|
||||
return new Error('Unable validate index folder')
|
||||
} catch (err) {
|
||||
throw new Error(err);
|
||||
|
11
js/search/searchConfig.js
Normal file
11
js/search/searchConfig.js
Normal file
@ -0,0 +1,11 @@
|
||||
const searchConfig = {
|
||||
SEARCH_PERIOD_SUBTRACTOR: 3 * 31 * 24 * 60 * 60 * 1000,
|
||||
REAL_TIME_INDEXING_TIME: 60000,
|
||||
MINIMUM_DATE: '0000000000000',
|
||||
MAXIMUM_DATE: '9999999999999',
|
||||
INDEX_VERSION: 'v1',
|
||||
SORT_BY_SCORE: 0,
|
||||
BATCH_RANDOM_INDEX_PATH_LENGTH: 20
|
||||
};
|
||||
|
||||
module.exports = searchConfig;
|
Loading…
Reference in New Issue
Block a user