From 6f8577b4c175e7de7ccf24d2b88ca4f3589b8416 Mon Sep 17 00:00:00 2001 From: Ali F <25732808+ali-farooq0@users.noreply.github.com> Date: Tue, 23 Apr 2019 13:35:17 -0400 Subject: [PATCH] MM-14246 - Plugin framework: support transactional semantics with KV Store (#10634) * MM-14246 - Plugin framework: support transactional semantics with KV Store Rename old, new variable names Moving New function to the bottom * Made CompareAndUpdate sync, updated tests * Removed going through channel in CompareAndSetPluginKey * Inserting new key when oldValue is nil to KVCompareAndSet * Updated error text to include CompareAndSet --- app/plugin_api.go | 4 ++ app/plugin_api_test.go | 96 ++++++++++++++++++++++++++++ app/plugin_key_value_store.go | 21 ++++++ app/plugin_test.go | 57 +++++++++++++++++ plugin/api.go | 10 +++ plugin/client_rpc_generated.go | 31 +++++++++ plugin/plugintest/api.go | 23 +++++++ store/sqlstore/plugin_store.go | 45 +++++++++++++ store/store.go | 1 + store/storetest/mocks/PluginStore.go | 23 +++++++ 10 files changed, 311 insertions(+) diff --git a/app/plugin_api.go b/app/plugin_api.go index 1bb6f265f5..40344cc48f 100644 --- a/app/plugin_api.go +++ b/app/plugin_api.go @@ -662,6 +662,10 @@ func (api *PluginAPI) KVSet(key string, value []byte) *model.AppError { return api.app.SetPluginKey(api.id, key, value) } +func (api *PluginAPI) KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) { + return api.app.CompareAndSetPluginKey(api.id, key, oldValue, newValue) +} + func (api *PluginAPI) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { return api.app.SetPluginKeyWithExpiry(api.id, key, value, expireInSeconds) } diff --git a/app/plugin_api_test.go b/app/plugin_api_test.go index f487691df6..9dac5f62a5 100644 --- a/app/plugin_api_test.go +++ b/app/plugin_api_test.go @@ -659,3 +659,99 @@ func TestBasicAPIPlugins(t *testing.T) { } } } + +func TestPluginAPIKVCompareAndSet(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + api := th.SetupPluginAPI() + + testCases := []struct { + Description string + ExpectedValue []byte + }{ + { + Description: "Testing non-nil, non-empty value", + ExpectedValue: []byte("value1"), + }, + { + Description: "Testing empty value", + ExpectedValue: []byte(""), + }, + } + + for i, testCase := range testCases { + t.Run(testCase.Description, func(t *testing.T) { + expectedKey := fmt.Sprintf("Key%d", i) + expectedValueEmpty := []byte("") + expectedValue1 := testCase.ExpectedValue + expectedValue2 := []byte("value2") + expectedValue3 := []byte("value3") + + // Attempt update using an incorrect old value + updated, err := api.KVCompareAndSet(expectedKey, expectedValue2, expectedValue1) + require.Nil(t, err) + require.False(t, updated) + + // Make sure no key is already created + value, err := api.KVGet(expectedKey) + require.Nil(t, err) + require.Nil(t, value) + + // Insert using nil old value + updated, err = api.KVCompareAndSet(expectedKey, nil, expectedValue1) + require.Nil(t, err) + require.True(t, updated) + + // Get inserted value + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue1, value) + + // Attempt to insert again using nil old value + updated, err = api.KVCompareAndSet(expectedKey, nil, expectedValue2) + require.Nil(t, err) + require.False(t, updated) + + // Get old value to assert nothing has changed + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue1, value) + + // Update using correct old value + updated, err = api.KVCompareAndSet(expectedKey, expectedValue1, expectedValue2) + require.Nil(t, err) + require.True(t, updated) + + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue2, value) + + // Update using incorrect old value + updated, err = api.KVCompareAndSet(expectedKey, []byte("incorrect"), expectedValue3) + require.Nil(t, err) + require.False(t, updated) + + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue2, value) + + // Update using nil old value + updated, err = api.KVCompareAndSet(expectedKey, nil, expectedValue3) + require.Nil(t, err) + require.False(t, updated) + + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue2, value) + + // Update using empty old value + updated, err = api.KVCompareAndSet(expectedKey, expectedValueEmpty, expectedValue3) + require.Nil(t, err) + require.False(t, updated) + + value, err = api.KVGet(expectedKey) + require.Nil(t, err) + require.Equal(t, expectedValue2, value) + }) + } +} diff --git a/app/plugin_key_value_store.go b/app/plugin_key_value_store.go index a0ec13f18c..559b462f04 100644 --- a/app/plugin_key_value_store.go +++ b/app/plugin_key_value_store.go @@ -47,6 +47,27 @@ func (a *App) SetPluginKeyWithExpiry(pluginId string, key string, value []byte, return nil } +func (a *App) CompareAndSetPluginKey(pluginId string, key string, oldValue, newValue []byte) (bool, *model.AppError) { + kv := &model.PluginKeyValue{ + PluginId: pluginId, + Key: key, + Value: newValue, + } + + updated, err := a.Srv.Store.Plugin().CompareAndSet(kv, oldValue) + if err != nil { + mlog.Error("Failed to compare and set plugin key value", mlog.String("plugin_id", pluginId), mlog.String("key", key), mlog.Err(err)) + return updated, err + } + + // Clean up a previous entry using the hashed key, if it exists. + if result := <-a.Srv.Store.Plugin().Delete(pluginId, getKeyHash(key)); result.Err != nil { + mlog.Error("Failed to clean up previously hashed plugin key value", mlog.String("plugin_id", pluginId), mlog.String("key", key), mlog.Err(result.Err)) + } + + return updated, nil +} + func (a *App) GetPluginKey(pluginId string, key string) ([]byte, *model.AppError) { if result := <-a.Srv.Store.Plugin().Get(pluginId, key); result.Err == nil { return result.Data.(*model.PluginKeyValue).Value, nil diff --git a/app/plugin_test.go b/app/plugin_test.go index 01b03a912d..831a778b8c 100644 --- a/app/plugin_test.go +++ b/app/plugin_test.go @@ -125,6 +125,63 @@ func TestPluginKeyValueStore(t *testing.T) { assert.Equal(t, []string{"key", "key3", "key4", hashedKey2}, list) } +func TestPluginKeyValueStoreCompareAndSet(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + pluginId := "testpluginid" + + defer func() { + assert.Nil(t, th.App.DeletePluginKey(pluginId, "key")) + }() + + // Set using Set api for key2 + assert.Nil(t, th.App.SetPluginKey(pluginId, "key2", []byte("test"))) + ret, err := th.App.GetPluginKey(pluginId, "key2") + assert.Nil(t, err) + assert.Equal(t, []byte("test"), ret) + + // Attempt to insert value for key2 + updated, err := th.App.CompareAndSetPluginKey(pluginId, "key2", nil, []byte("test2")) + assert.Nil(t, err) + assert.False(t, updated) + ret, err = th.App.GetPluginKey(pluginId, "key2") + assert.Nil(t, err) + assert.Equal(t, []byte("test"), ret) + + // Insert new value for key + updated, err = th.App.CompareAndSetPluginKey(pluginId, "key", nil, []byte("test")) + assert.Nil(t, err) + assert.True(t, updated) + ret, err = th.App.GetPluginKey(pluginId, "key") + assert.Nil(t, err) + assert.Equal(t, []byte("test"), ret) + + // Should fail to insert again + updated, err = th.App.CompareAndSetPluginKey(pluginId, "key", nil, []byte("test3")) + assert.Nil(t, err) + assert.False(t, updated) + ret, err = th.App.GetPluginKey(pluginId, "key") + assert.Nil(t, err) + assert.Equal(t, []byte("test"), ret) + + // Test updating using incorrect old value + updated, err = th.App.CompareAndSetPluginKey(pluginId, "key", []byte("oldvalue"), []byte("test3")) + assert.Nil(t, err) + assert.False(t, updated) + ret, err = th.App.GetPluginKey(pluginId, "key") + assert.Nil(t, err) + assert.Equal(t, []byte("test"), ret) + + // Test updating using correct old value + updated, err = th.App.CompareAndSetPluginKey(pluginId, "key", []byte("test"), []byte("test2")) + assert.Nil(t, err) + assert.True(t, updated) + ret, err = th.App.GetPluginKey(pluginId, "key") + assert.Nil(t, err) + assert.Equal(t, []byte("test2"), ret) +} + func TestServePluginRequest(t *testing.T) { th := Setup(t).InitBasic() defer th.TearDown() diff --git a/plugin/api.go b/plugin/api.go index d43162525a..506c898704 100644 --- a/plugin/api.go +++ b/plugin/api.go @@ -459,6 +459,16 @@ type API interface { // KVSet will store a key-value pair, unique per plugin. KVSet(key string, value []byte) *model.AppError + // KVCompareAndSet will update a key-value pair, + // unique per plugin, to the given new value if the current value == the old value. + // Inserts a new key if oldValue == nil. + // Returns (false, err) if DB error occurred + // Returns (false, nil) if current value != old value or key already exists when inserting + // Returns (true, nil) if current value == old value or new key is inserted + // + // Minimum server version: 5.12 + KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) + // KVSet will store a key-value pair, unique per plugin with an expiry time // // Minimum server version: 5.6 diff --git a/plugin/client_rpc_generated.go b/plugin/client_rpc_generated.go index 4c69375317..4d8815c96a 100644 --- a/plugin/client_rpc_generated.go +++ b/plugin/client_rpc_generated.go @@ -3490,6 +3490,37 @@ func (s *apiRPCServer) KVSet(args *Z_KVSetArgs, returns *Z_KVSetReturns) error { return nil } +type Z_KVCompareAndSetArgs struct { + A string + B []byte + C []byte +} + +type Z_KVCompareAndSetReturns struct { + A bool + B *model.AppError +} + +func (g *apiRPCClient) KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) { + _args := &Z_KVCompareAndSetArgs{key, oldValue, newValue} + _returns := &Z_KVCompareAndSetReturns{} + if err := g.client.Call("Plugin.KVCompareAndSet", _args, _returns); err != nil { + log.Printf("RPC call to KVCompareAndSet API failed: %s", err.Error()) + } + return _returns.A, _returns.B +} + +func (s *apiRPCServer) KVCompareAndSet(args *Z_KVCompareAndSetArgs, returns *Z_KVCompareAndSetReturns) error { + if hook, ok := s.impl.(interface { + KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) + }); ok { + returns.A, returns.B = hook.KVCompareAndSet(args.A, args.B, args.C) + } else { + return encodableError(fmt.Errorf("API KVCompareAndSet called but not implemented.")) + } + return nil +} + type Z_KVSetWithExpiryArgs struct { A string B []byte diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go index 83006bd16f..10d1b4b26e 100644 --- a/plugin/plugintest/api.go +++ b/plugin/plugintest/api.go @@ -1862,6 +1862,29 @@ func (_m *API) HasPermissionToTeam(userId string, teamId string, permission *mod return r0 } +// KVCompareAndSet provides a mock function with given fields: key, oldValue, newValue +func (_m *API) KVCompareAndSet(key string, oldValue []byte, newValue []byte) (bool, *model.AppError) { + ret := _m.Called(key, oldValue, newValue) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, []byte, []byte) bool); ok { + r0 = rf(key, oldValue, newValue) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, []byte, []byte) *model.AppError); ok { + r1 = rf(key, oldValue, newValue) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 +} + // KVDelete provides a mock function with given fields: key func (_m *API) KVDelete(key string) *model.AppError { ret := _m.Called(key) diff --git a/store/sqlstore/plugin_store.go b/store/sqlstore/plugin_store.go index ae7e65d2dd..818a77c30c 100644 --- a/store/sqlstore/plugin_store.go +++ b/store/sqlstore/plugin_store.go @@ -71,6 +71,51 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann }) } +func (ps SqlPluginStore) CompareAndSet(kv *model.PluginKeyValue, oldValue []byte) (bool, *model.AppError) { + if err := kv.IsValid(); err != nil { + return false, err + } + + if oldValue == nil { + // Insert if oldValue is nil + if err := ps.GetMaster().Insert(kv); err != nil { + // If the error is from unique constraints violation, it's the result of a + // race condition, return false and no error. Otherwise we have a real error and + // need to return it. + if IsUniqueConstraintError(err, []string{"PRIMARY", "PluginId", "Key", "PKey"}) { + return false, nil + } else { + return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError) + } + } + } else { + // Update if oldValue is not nil + updateResult, err := ps.GetMaster().Exec( + `UPDATE PluginKeyValueStore SET PValue = :New WHERE PluginId = :PluginId AND PKey = :Key AND PValue = :Old`, + map[string]interface{}{ + "PluginId": kv.PluginId, + "Key": kv.Key, + "Old": oldValue, + "New": kv.Value, + }, + ) + if err != nil { + return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError) + } + + if rowsAffected, err := updateResult.RowsAffected(); err != nil { + // Failed to update + return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError) + } else if rowsAffected == 0 { + // No rows were affected by the update, where condition was not satisfied, + // return false, but no error. + return false, nil + } + } + + return true, nil +} + func (ps SqlPluginStore) Get(pluginId, key string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { var kv *model.PluginKeyValue diff --git a/store/store.go b/store/store.go index db54f13aef..60c10f0f9e 100644 --- a/store/store.go +++ b/store/store.go @@ -524,6 +524,7 @@ type UserAccessTokenStore interface { type PluginStore interface { SaveOrUpdate(keyVal *model.PluginKeyValue) StoreChannel + CompareAndSet(keyVal *model.PluginKeyValue, oldValue []byte) (bool, *model.AppError) Get(pluginId, key string) StoreChannel Delete(pluginId, key string) StoreChannel DeleteAllForPlugin(PluginId string) StoreChannel diff --git a/store/storetest/mocks/PluginStore.go b/store/storetest/mocks/PluginStore.go index b85e17fdfb..71ff9d688a 100644 --- a/store/storetest/mocks/PluginStore.go +++ b/store/storetest/mocks/PluginStore.go @@ -13,6 +13,29 @@ type PluginStore struct { mock.Mock } +// CompareAndSet provides a mock function with given fields: keyVal, oldValue +func (_m *PluginStore) CompareAndSet(keyVal *model.PluginKeyValue, oldValue []byte) (bool, *model.AppError) { + ret := _m.Called(keyVal, oldValue) + + var r0 bool + if rf, ok := ret.Get(0).(func(*model.PluginKeyValue, []byte) bool); ok { + r0 = rf(keyVal, oldValue) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(*model.PluginKeyValue, []byte) *model.AppError); ok { + r1 = rf(keyVal, oldValue) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 +} + // Delete provides a mock function with given fields: pluginId, key func (_m *PluginStore) Delete(pluginId string, key string) store.StoreChannel { ret := _m.Called(pluginId, key)