@@ -8,7 +8,6 @@ import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import isEmpty from 'lodash/isEmpty.js'
|
||||
import map from 'lodash/map.js'
|
||||
import omit from 'lodash/omit.js'
|
||||
import { createClient as createRedisClient } from 'redis'
|
||||
import { v4 as generateUuid } from 'uuid'
|
||||
|
||||
import Collection, { ModelAlreadyExists } from '../collection.mjs'
|
||||
@@ -46,9 +45,9 @@ export default class Redis extends Collection {
|
||||
|
||||
this.indexes = indexes
|
||||
this.prefix = prefix
|
||||
const redis = (this.redis = connection || createRedisClient(uri))
|
||||
const redis = (this.redis = connection)
|
||||
|
||||
redis.sadd('xo::namespaces', namespace)::ignoreErrors()
|
||||
redis.sAdd('xo::namespaces', namespace)::ignoreErrors()
|
||||
|
||||
const key = `${prefix}:version`
|
||||
redis
|
||||
@@ -79,21 +78,21 @@ export default class Redis extends Collection {
|
||||
return
|
||||
}
|
||||
|
||||
await redis.sadd(`${prefix}::indexes`, indexes)
|
||||
await redis.sAdd(`${prefix}::indexes`, indexes)
|
||||
|
||||
await asyncMapSettled(indexes, index =>
|
||||
redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys))
|
||||
)
|
||||
|
||||
const idsIndex = `${prefix}_ids`
|
||||
await asyncMapSettled(redis.smembers(idsIndex), id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(values =>
|
||||
await asyncMapSettled(redis.sMembers(idsIndex), id =>
|
||||
redis.hGetAll(`${prefix}:${id}`).then(values =>
|
||||
values == null
|
||||
? redis.srem(idsIndex, id) // entry no longer exists
|
||||
? redis.sRem(idsIndex, id) // entry no longer exists
|
||||
: asyncMapSettled(indexes, index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
return redis.sAdd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
)
|
||||
@@ -107,7 +106,7 @@ export default class Redis extends Collection {
|
||||
const models = []
|
||||
return Promise.all(
|
||||
map(ids, id => {
|
||||
return redis.hgetall(prefix + id).then(model => {
|
||||
return redis.hGetAll(prefix + id).then(model => {
|
||||
// If empty, consider it a no match.
|
||||
if (isEmpty(model)) {
|
||||
return
|
||||
@@ -136,7 +135,7 @@ export default class Redis extends Collection {
|
||||
}
|
||||
const { id } = model
|
||||
|
||||
const newEntry = await redis.sadd(prefix + '_ids', id)
|
||||
const newEntry = await redis.sAdd(prefix + '_ids', id)
|
||||
|
||||
if (!newEntry) {
|
||||
if (!replace) {
|
||||
@@ -145,11 +144,11 @@ export default class Redis extends Collection {
|
||||
|
||||
// remove the previous values from indexes
|
||||
if (indexes.length !== 0) {
|
||||
const previous = await redis.hgetall(`${prefix}:${id}`)
|
||||
const previous = await redis.hGetAll(`${prefix}:${id}`)
|
||||
await asyncMapSettled(indexes, index => {
|
||||
const value = previous[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
return redis.sRem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -168,7 +167,7 @@ export default class Redis extends Collection {
|
||||
})
|
||||
|
||||
const key = `${prefix}:${id}`
|
||||
const promises = [redis.del(key), redis.hmset(key, ...params)]
|
||||
const promises = [redis.del(key), redis.hSet(key, ...params)]
|
||||
|
||||
// Update indexes.
|
||||
forEach(indexes, index => {
|
||||
@@ -178,7 +177,7 @@ export default class Redis extends Collection {
|
||||
}
|
||||
|
||||
const key = prefix + '_' + index + ':' + String(value).toLowerCase()
|
||||
promises.push(redis.sadd(key, id))
|
||||
promises.push(redis.sAdd(key, id))
|
||||
})
|
||||
|
||||
await Promise.all(promises)
|
||||
@@ -192,7 +191,7 @@ export default class Redis extends Collection {
|
||||
const { prefix, redis } = this
|
||||
|
||||
if (isEmpty(properties)) {
|
||||
return redis.smembers(prefix + '_ids').then(ids => this._extract(ids))
|
||||
return redis.sMembers(prefix + '_ids').then(ids => this._extract(ids))
|
||||
}
|
||||
|
||||
// Special treatment for the identifier.
|
||||
@@ -213,7 +212,7 @@ export default class Redis extends Collection {
|
||||
}
|
||||
|
||||
const keys = map(properties, (value, index) => `${prefix}_${index}:${String(value).toLowerCase()}`)
|
||||
return redis.sinter(...keys).then(ids => this._extract(ids))
|
||||
return redis.sInter(...keys).then(ids => this._extract(ids))
|
||||
}
|
||||
|
||||
_remove(ids) {
|
||||
@@ -224,20 +223,20 @@ export default class Redis extends Collection {
|
||||
const { indexes, prefix, redis } = this
|
||||
|
||||
// update main index
|
||||
let promise = redis.srem(prefix + '_ids', ...ids)
|
||||
let promise = redis.sRem(prefix + '_ids', ...ids)
|
||||
|
||||
// update other indexes
|
||||
if (indexes.length !== 0) {
|
||||
promise = Promise.all([
|
||||
promise,
|
||||
asyncMapSettled(ids, id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(
|
||||
redis.hGetAll(`${prefix}:${id}`).then(
|
||||
values =>
|
||||
values != null &&
|
||||
asyncMapSettled(indexes, index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
return redis.sRem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
|
||||
import { createClient as createRedisClient } from 'redis'
|
||||
import appConf from 'app-conf'
|
||||
import fromCallback from 'promise-toolbox/fromCallback'
|
||||
import fromEvent from 'promise-toolbox/fromEvent'
|
||||
|
||||
import RedisCollection from './collection/redis.mjs'
|
||||
|
||||
@@ -18,7 +16,7 @@ async function getDb(namespace) {
|
||||
const { connection } = this
|
||||
return new RedisCollection({
|
||||
connection,
|
||||
indexes: await fromCallback.call(connection, 'smembers', `xo:${namespace}::indexes`),
|
||||
indexes: await connection.sMembers(`xo:${namespace}::indexes`),
|
||||
namespace,
|
||||
})
|
||||
}
|
||||
@@ -53,7 +51,7 @@ function sortKeys(object) {
|
||||
const COMMANDS = {
|
||||
async ls(args) {
|
||||
if (args.length === 0) {
|
||||
const namespaces = await fromCallback.call(this.connection, 'smembers', 'xo::namespaces')
|
||||
const namespaces = await this.connection.sMembers('xo::namespaces')
|
||||
namespaces.sort()
|
||||
for (const ns of namespaces) {
|
||||
console.log(ns)
|
||||
@@ -103,14 +101,15 @@ xo-server-logs ls <namespace> [<pattern>...]
|
||||
path,
|
||||
url,
|
||||
})
|
||||
await connection.connect()
|
||||
// await repl({ context: { redis: connection } })
|
||||
try {
|
||||
const fn = COMMANDS[args.shift()]
|
||||
assert(fn !== undefined, 'command must be one of: ' + Object.keys(COMMANDS).join(', '))
|
||||
|
||||
await fn.call({ connection, getDb }, args)
|
||||
} finally {
|
||||
connection.quit()
|
||||
await fromEvent(connection, 'end')
|
||||
await connection.quit()
|
||||
}
|
||||
}
|
||||
main(process.argv.slice(2))
|
||||
|
||||
@@ -44,11 +44,11 @@ export default class Xo extends EventEmitter {
|
||||
// Connects to Redis.
|
||||
{
|
||||
const { socket: path, uri: url } = config.redis || {}
|
||||
const redis = createRedisClient({ path, url })
|
||||
|
||||
this._redis = createRedisClient({
|
||||
path,
|
||||
url,
|
||||
})
|
||||
this._redis = redis
|
||||
this.hooks.on('start', () => redis.connect())
|
||||
this.hooks.on('stop', () => redis.quit())
|
||||
}
|
||||
|
||||
this.hooks.on('start', () => this._watchObjects())
|
||||
|
||||
Reference in New Issue
Block a user