Merge pull request #241 from keerthi16/SEARCH-438

SEARCH-438 (Queuing the real-time indexing)
This commit is contained in:
Vikas Shashidhar
2017-11-23 21:53:41 +05:30
committed by GitHub
10 changed files with 129 additions and 24 deletions

View File

@@ -192,7 +192,8 @@
sendMessage.addEventListener('click', function () {
if (realTimeIndexing.value !== "") {
let message = realTimeIndexing.value;
resultsEl.innerHTML = search.realTimeIndexing(message);
search.batchRealTimeIndexing(JSON.parse(message));
resultsEl.innerHTML = 'success';
} else {
resultsEl.innerHTML = "Please check the entered value"
}

39
js/search/queue.js Normal file
View File

@@ -0,0 +1,39 @@
let messagesData = [];
let makeBoundTimedCollector = function(isIndexing, timeout, callback) {
let timer;
return function (...args) {
if (!timer){
timer = setTimeout(function(){
if (!isIndexing) {
flush(getQueue());
}
}, timeout);
}
let queue = getQueue();
queue.push(args[0]);
if (!isIndexing()) {
flush(queue);
}
};
function flush(queue) {
clearTimeout(timer);
timer = null;
resetQueue();
callback(JSON.stringify(queue));
}
function getQueue(){
return messagesData;
}
function resetQueue(){
messagesData = [];
}
};
module.exports = makeBoundTimedCollector;

View File

@@ -8,6 +8,10 @@ const app = electron.app;
const path = require('path');
const isDevEnv = require('../utils/misc.js').isDevEnv;
const isMac = require('../utils/misc.js').isMac;
const makeBoundTimedCollector = require('./queue');
const searchConfig = require('./searchConfig');
const log = require('../log.js');
const logLevels = require('../enums/logLevels.js');
// Search library
const libSymphonySearch = require('./searchLibrary');
@@ -26,14 +30,6 @@ const TEMP_REAL_TIME_INDEX = isDevEnv ? './data/temp_realtime_index' : path.join
const INDEX_PREFIX = isDevEnv ? './data/search_index' : path.join(userData, 'data/search_index');
// Folder contains real time, batch and user index
const INDEX_DATA_FOLDER = isDevEnv ? './data' : path.join(userData, 'data');
//3 Months
const SEARCH_PERIOD_SUBTRACTOR = 3 * 31 * 24 * 60 * 60 * 1000;
const MINIMUM_DATE = '0000000000000';
const MAXIMUM_DATE = '9999999999999';
const INDEX_VERSION = 'v1';
const SORT_BY_SCORE = 0;
const BATCH_RANDOM_INDEX_PATH_LENGTH = 20;
// library path contractor
const winArchPath = process.arch === 'ia32' ? 'library/indexvalidator-x86.exe' : 'library/indexvalidator-x64.exe';
@@ -58,13 +54,16 @@ class Search {
this.isInitialized = false;
this.userId = userId;
this.key = key;
this.indexFolderName = INDEX_PREFIX + '_' + this.userId + '_' + INDEX_VERSION;
this.indexFolderName = INDEX_PREFIX + '_' + this.userId + '_' + searchConfig.INDEX_VERSION;
this.dataFolder = INDEX_DATA_FOLDER;
this.realTimeIndex = TEMP_REAL_TIME_INDEX;
this.batchIndex = TEMP_BATCH_INDEX_FOLDER;
this.messageData = [];
this.isRealTimeIndexing = false;
this.crypto = new Crypto(userId, key);
this.decryptAndInit();
this.collector = makeBoundTimedCollector(this.checkIsRealTimeIndexing.bind(this),
searchConfig.REAL_TIME_INDEXING_TIME, this.realTimeIndexing.bind(this));
}
/**
@@ -100,10 +99,10 @@ class Search {
libSymphonySearch.symSERemoveFolder(this.batchIndex);
Search.indexValidator(this.indexFolderName);
Search.indexValidator(this.realTimeIndex);
let indexDateStartFrom = new Date().getTime() - SEARCH_PERIOD_SUBTRACTOR;
let indexDateStartFrom = new Date().getTime() - searchConfig.SEARCH_PERIOD_SUBTRACTOR;
// Deleting all the messages except 3 Months from now
libSymphonySearch.symSEDeleteMessages(this.indexFolderName, null,
MINIMUM_DATE, indexDateStartFrom.toString());
searchConfig.MINIMUM_DATE, indexDateStartFrom.toString());
this.isInitialized = true;
}
@@ -116,24 +115,36 @@ class Search {
indexBatch(messages) {
return new Promise((resolve, reject) => {
if (!messages) {
reject(new Error('Messages is required'));
log.send(logLevels.ERROR, 'Batch Indexing: Messages not provided');
reject(new Error('Batch Indexing: Messages is required'));
return;
}
if (!(JSON.parse(messages) instanceof Array)) {
reject(new Error('Messages must be an array'));
try {
let msg = JSON.parse(messages);
if (!(msg instanceof Array)) {
log.send(logLevels.ERROR, 'Batch Indexing: Messages must be an array');
reject(new Error('Batch Indexing: Messages must be an array'));
return;
}
} catch(e) {
log.send(logLevels.ERROR, 'Batch Indexing: parse error -> ' + e);
reject(new Error(e));
return;
}
if (!this.isInitialized) {
log.send(logLevels.ERROR, 'Library not initialized');
reject(new Error('Library not initialized'));
return;
}
const indexId = randomString.generate(BATCH_RANDOM_INDEX_PATH_LENGTH);
const indexId = randomString.generate(searchConfig.BATCH_RANDOM_INDEX_PATH_LENGTH);
libSymphonySearch.symSECreatePartialIndexAsync(this.batchIndex, indexId, messages, (err, res) => {
if (err) {
log.send(logLevels.ERROR, 'Batch Indexing: error ->' + err);
reject(new Error(err));
return;
}
resolve(res);
});
@@ -148,7 +159,9 @@ class Search {
return new Promise((resolve, reject) => {
libSymphonySearch.symSEMergePartialIndexAsync(this.indexFolderName, this.batchIndex, (err, res) => {
if (err) {
log.send(logLevels.ERROR, 'Error merging the index ->' + err);
reject(new Error(err));
return;
}
libSymphonySearch.symSERemoveFolder(this.batchIndex);
resolve(res);
@@ -156,6 +169,24 @@ class Search {
});
}
/**
* Batching the real time
* messages for queue and flush
* @param {Object} message
*/
batchRealTimeIndexing(message) {
this.collector(message);
}
/**
* Returns the current state of the
* real-time indexing
* @returns {boolean}
*/
checkIsRealTimeIndexing() {
return this.isRealTimeIndexing;
}
/**
* An array of messages to be indexed
* in real time
@@ -163,19 +194,35 @@ class Search {
*/
realTimeIndexing(message) {
if (!message) {
return new Error('Message is required');
log.send(logLevels.ERROR, 'RealTime Indexing: Messages not provided');
return new Error('RealTime Indexing: Messages is required');
}
if (!(JSON.parse(message) instanceof Array)){
return new Error('Message must be an array');
try {
let msg = JSON.parse(message);
if (!(msg instanceof Array)) {
log.send(logLevels.ERROR, 'RealTime Indexing: Messages must be an array real-time indexing');
return (new Error('RealTime Indexing: Messages must be an array'));
}
} catch(e) {
log.send(logLevels.ERROR, 'RealTime Indexing: parse error -> ' + e);
return (new Error(e));
}
if (!this.isInitialized) {
log.send(logLevels.ERROR, 'Library not initialized');
return new Error('Library not initialized');
}
let result = libSymphonySearch.symSEIndexRealTime(this.realTimeIndex, message);
return result === 0 ? "Successful" : result
this.isRealTimeIndexing = true;
return libSymphonySearch.symSEIndexRealTimeAsync(this.realTimeIndex, message, (err, result) => {
this.isRealTimeIndexing = false;
if (err) {
log.send(logLevels.ERROR, 'RealTime Indexing: error -> ' + err);
return new Error(err);
}
return result;
});
}
/**
@@ -215,6 +262,7 @@ class Search {
return this.crypto.encryption().then(() => {
return 'Success'
}).catch((e) => {
log.send(logLevels.ERROR, 'Encrypting the index folder failed ->' + e);
return (new Error(e));
});
}
@@ -242,11 +290,13 @@ class Search {
return new Promise((resolve, reject) => {
if (!this.isInitialized) {
log.send(logLevels.ERROR, 'Library not initialized');
reject(new Error('Library not initialized'));
return;
}
if (!fs.existsSync(this.indexFolderName) || !fs.existsSync(this.realTimeIndex)) {
log.send(logLevels.ERROR, 'Index folder does not exist.');
reject('Index folder does not exist.');
return;
}
@@ -258,7 +308,7 @@ class Search {
return;
}
let searchPeriod = new Date().getTime() - SEARCH_PERIOD_SUBTRACTOR;
let searchPeriod = new Date().getTime() - searchConfig.SEARCH_PERIOD_SUBTRACTOR;
let startDateTime = searchPeriod;
if (startDate) {
startDateTime = new Date(parseInt(startDate, 10)).getTime();
@@ -267,7 +317,7 @@ class Search {
}
}
let endDateTime = MAXIMUM_DATE;
let endDateTime = searchConfig.MAXIMUM_DATE;
if (endDate) {
let eTime = new Date(parseInt(endDate, 10)).getTime();
if (eTime) {
@@ -284,7 +334,7 @@ class Search {
}
if (!_sortOrder && _sortOrder === "" && typeof _sortOrder !== 'number' && Math.round(_sortOrder) !== _sortOrder) {
_sortOrder = SORT_BY_SCORE;
_sortOrder = searchConfig.SORT_BY_SCORE;
}
const returnedResult = libSymphonySearch.symSESearch(this.indexFolderName, this.realTimeIndex, q, startDateTime.toString(), endDateTime.toString(), _offset, _limit, _sortOrder);
@@ -305,17 +355,20 @@ class Search {
getLatestMessageTimestamp() {
return new Promise((resolve, reject) => {
if (!this.isInitialized) {
log.send(logLevels.ERROR, 'Library not initialized');
reject('Not initialized');
return;
}
if (!fs.existsSync(this.indexFolderName)) {
log.send(logLevels.ERROR, 'Index folder does not exist.');
reject('Index folder does not exist.');
return;
}
libSymphonySearch.symSEGetLastMessageTimestampAsync(this.indexFolderName, (err, res) => {
if (err) {
log.send(logLevels.ERROR, 'Error getting the index timestamp ->' + err);
reject(new Error(err));
}
const returnedResult = res;
@@ -456,6 +509,7 @@ class Search {
if (data.status === 'OK') {
return data;
}
log.send(logLevels.ERROR, 'Unable validate index folder');
return new Error('Unable validate index folder')
} catch (err) {
throw new Error(err);

11
js/search/searchConfig.js Normal file
View File

@@ -0,0 +1,11 @@
const searchConfig = {
SEARCH_PERIOD_SUBTRACTOR: 3 * 31 * 24 * 60 * 60 * 1000,
REAL_TIME_INDEXING_TIME: 60000,
MINIMUM_DATE: '0000000000000',
MAXIMUM_DATE: '9999999999999',
INDEX_VERSION: 'v1',
SORT_BY_SCORE: 0,
BATCH_RANDOM_INDEX_PATH_LENGTH: 20
};
module.exports = searchConfig;

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.