DeleteAll for KV (#9431)

Expire K/V Values

Regenerate Code

pathfix

Update Expiry on Update

Check for Exit Signal

gofmt

Rewrote Go Routine

Remove tempoarily cleanup loop

fix expiretime

TEST: Expired Watchdog as GoRoutine

Check if Srv is nil

Use Scheduler/Worker for Expired Key CleanUp

add license

fix scheduler job type; DoJob Restructuring

Remove unused imports and constants

move db migration from 5.4 to 5.5
This commit is contained in:
Daniel Schalla
2018-10-10 19:55:12 +02:00
committed by Christopher Speller
parent bd04d7f756
commit c36e85c912
20 changed files with 501 additions and 4 deletions

View File

@@ -61,7 +61,7 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann
}
}
} else if ps.DriverName() == model.DATABASE_DRIVER_MYSQL {
if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue) VALUES(:PluginId, :Key, :Value) ON DUPLICATE KEY UPDATE PValue = :Value", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value}); err != nil {
if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue, ExpireAt) VALUES(:PluginId, :Key, :Value, :ExpireAt) ON DUPLICATE KEY UPDATE PValue = :Value, ExpireAt = :ExpireAt", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value, "ExpireAt": kv.ExpireAt}); err != nil {
result.Err = model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
@@ -74,8 +74,8 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann
func (ps SqlPluginStore) Get(pluginId, key string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var kv *model.PluginKeyValue
if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key", map[string]interface{}{"PluginId": pluginId, "Key": key}); err != nil {
currentTime := model.GetMillis()
if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND (ExpireAt = 0 OR ExpireAt > :CurrentTime)", map[string]interface{}{"PluginId": pluginId, "Key": key, "CurrentTime": currentTime}); err != nil {
if err == sql.ErrNoRows {
result.Err = model.NewAppError("SqlPluginStore.Get", "store.sql_plugin_store.get.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusNotFound)
} else {
@@ -97,6 +97,27 @@ func (ps SqlPluginStore) Delete(pluginId, key string) store.StoreChannel {
})
}
func (ps SqlPluginStore) DeleteAllForPlugin(pluginId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId", map[string]interface{}{"PluginId": pluginId}); err != nil {
result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
} else {
result.Data = true
}
})
}
func (ps SqlPluginStore) DeleteAllExpired() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
currentTime := model.GetMillis()
if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE ExpireAt != 0 AND ExpireAt < :CurrentTime", map[string]interface{}{"CurrentTime": currentTime}); err != nil {
result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError)
} else {
result.Data = true
}
})
}
func (ps SqlPluginStore) List(pluginId string, offset int, limit int) store.StoreChannel {
if limit <= 0 {
limit = DEFAULT_PLUGIN_KEY_FETCH_LIMIT

View File

@@ -510,7 +510,7 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
func UpgradeDatabaseToVersion55(sqlStore SqlStore) {
// TODO: Uncomment following condition when version 5.5.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_5_4_0, VERSION_5_5_0) {
sqlStore.CreateColumnIfNotExists("PluginKeyValueStore", "ExpireAt", "bigint(20)", "bigint", "0")
// saveSchemaVersion(sqlStore, VERSION_5_5_0)
// }
}

View File

@@ -502,6 +502,8 @@ type PluginStore interface {
SaveOrUpdate(keyVal *model.PluginKeyValue) StoreChannel
Get(pluginId, key string) StoreChannel
Delete(pluginId, key string) StoreChannel
DeleteAllForPlugin(PluginId string) StoreChannel
DeleteAllExpired() StoreChannel
List(pluginId string, page, perPage int) StoreChannel
}

View File

@@ -29,6 +29,38 @@ func (_m *PluginStore) Delete(pluginId string, key string) store.StoreChannel {
return r0
}
// DeleteAllExpired provides a mock function with given fields:
func (_m *PluginStore) DeleteAllExpired() store.StoreChannel {
ret := _m.Called()
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func() store.StoreChannel); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// DeleteAllForPlugin provides a mock function with given fields: PluginId
func (_m *PluginStore) DeleteAllForPlugin(PluginId string) store.StoreChannel {
ret := _m.Called(PluginId)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
r0 = rf(PluginId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
}
}
return r0
}
// Get provides a mock function with given fields: pluginId, key
func (_m *PluginStore) Get(pluginId string, key string) store.StoreChannel {
ret := _m.Called(pluginId, key)

View File

@@ -13,7 +13,10 @@ import (
func TestPluginStore(t *testing.T, ss store.Store) {
t.Run("PluginSaveGet", func(t *testing.T) { testPluginSaveGet(t, ss) })
t.Run("PluginSaveGetExpiry", func(t *testing.T) { testPluginSaveGetExpiry(t, ss) })
t.Run("PluginDelete", func(t *testing.T) { testPluginDelete(t, ss) })
t.Run("PluginDeleteAll", func(t *testing.T) { testPluginDeleteAll(t, ss) })
t.Run("PluginDeleteExpired", func(t *testing.T) { testPluginDeleteExpired(t, ss) })
}
func testPluginSaveGet(t *testing.T, ss store.Store) {
@@ -21,6 +24,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
PluginId: model.NewId(),
Key: model.NewId(),
Value: []byte(model.NewId()),
ExpireAt: 0,
}
if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
@@ -38,6 +42,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
assert.Equal(t, kv.PluginId, received.PluginId)
assert.Equal(t, kv.Key, received.Key)
assert.Equal(t, kv.Value, received.Value)
assert.Equal(t, kv.ExpireAt, received.ExpireAt)
}
// Try inserting when already exists
@@ -56,6 +61,52 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
}
}
func testPluginSaveGetExpiry(t *testing.T, ss store.Store) {
kv := &model.PluginKeyValue{
PluginId: model.NewId(),
Key: model.NewId(),
Value: []byte(model.NewId()),
ExpireAt: model.GetMillis() + 30000,
}
if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
t.Fatal(result.Err)
}
defer func() {
<-ss.Plugin().Delete(kv.PluginId, kv.Key)
}()
if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.PluginKeyValue)
assert.Equal(t, kv.PluginId, received.PluginId)
assert.Equal(t, kv.Key, received.Key)
assert.Equal(t, kv.Value, received.Value)
assert.Equal(t, kv.ExpireAt, received.ExpireAt)
}
kv = &model.PluginKeyValue{
PluginId: model.NewId(),
Key: model.NewId(),
Value: []byte(model.NewId()),
ExpireAt: model.GetMillis() - 5000,
}
if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
t.Fatal(result.Err)
}
defer func() {
<-ss.Plugin().Delete(kv.PluginId, kv.Key)
}()
if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err == nil {
t.Fatal("result.Err should not be nil")
}
}
func testPluginDelete(t *testing.T, ss store.Store) {
kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: model.NewId(),
@@ -67,3 +118,67 @@ func testPluginDelete(t *testing.T, ss store.Store) {
t.Fatal(result.Err)
}
}
func testPluginDeleteAll(t *testing.T, ss store.Store) {
pluginId := model.NewId()
kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: pluginId,
Key: model.NewId(),
Value: []byte(model.NewId()),
})).(*model.PluginKeyValue)
kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: pluginId,
Key: model.NewId(),
Value: []byte(model.NewId()),
})).(*model.PluginKeyValue)
if result := <-ss.Plugin().DeleteAllForPlugin(pluginId); result.Err != nil {
t.Fatal(result.Err)
}
if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil {
t.Fatal("result.Err should not be nil")
}
if result := <-ss.Plugin().Get(pluginId, kv2.Key); result.Err == nil {
t.Fatal("result.Err should not be nil")
}
}
func testPluginDeleteExpired(t *testing.T, ss store.Store) {
pluginId := model.NewId()
kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: pluginId,
Key: model.NewId(),
Value: []byte(model.NewId()),
ExpireAt: model.GetMillis() - 6000,
})).(*model.PluginKeyValue)
kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: pluginId,
Key: model.NewId(),
Value: []byte(model.NewId()),
ExpireAt: 0,
})).(*model.PluginKeyValue)
if result := <-ss.Plugin().DeleteAllExpired(); result.Err != nil {
t.Fatal(result.Err)
}
if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil {
t.Fatal("result.Err should not be nil")
}
if result := <-ss.Plugin().Get(kv2.PluginId, kv2.Key); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.PluginKeyValue)
assert.Equal(t, kv2.PluginId, received.PluginId)
assert.Equal(t, kv2.Key, received.Key)
assert.Equal(t, kv2.Value, received.Value)
assert.Equal(t, kv2.ExpireAt, received.ExpireAt)
}
}