2019-11-29 12:59:40 +01:00
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
2017-11-27 17:23:35 -05:00
package sqlstore
import (
MM-21328: Fix KVCompareAndSet when new==old (#13612)
`KVCompareAndSet(key, sameValue, sameValue)` can fail spuriously on MySQL if the underlying `UPDATE` requires no actual changes. As per the [MySQL documentation](https://dev.mysql.com/doc/refman/8.0/en/mysql-affected-rows.html), we can't rely on rows affected in this case:
> For UPDATE statements, the affected-rows value by default is the number of rows actually changed. If you specify the CLIENT_FOUND_ROWS flag to mysql_real_connect() when connecting to mysqld, the affected-rows value is the number of rows “found”; that is, matched by the WHERE clause.
It's not tenable to change `CLIENT_FOUND_ROWS` for the all connection, so handle this case in the code instead by running a `SELECT` after the fact. Note that `KVCompareAndSet` has no guarantee of atomicity in this case, but neither would `CompareAndSwap` on which this is method was inspired.
Finally, note that no changes are required for Postgres, which has sane semantics as the default.
Fixes: https://mattermost.atlassian.net/browse/MM-21328
Co-authored-by: mattermod <mattermod@users.noreply.github.com>
2020-01-16 15:29:10 -04:00
"bytes"
2017-11-27 17:23:35 -05:00
"database/sql"
"fmt"
"net/http"
2019-11-28 14:39:38 +01:00
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
2017-11-27 17:23:35 -05:00
)
2018-10-03 13:04:37 -07:00
const (
DEFAULT_PLUGIN_KEY_FETCH_LIMIT = 10
)
2017-11-27 17:23:35 -05:00
type SqlPluginStore struct {
SqlStore
}
func NewSqlPluginStore ( sqlStore SqlStore ) store . PluginStore {
s := & SqlPluginStore { sqlStore }
for _ , db := range sqlStore . GetAllConns ( ) {
table := db . AddTableWithName ( model . PluginKeyValue { } , "PluginKeyValueStore" ) . SetKeys ( false , "PluginId" , "Key" )
2017-12-05 18:19:33 -05:00
table . ColMap ( "PluginId" ) . SetMaxSize ( 190 )
table . ColMap ( "Key" ) . SetMaxSize ( 50 )
2017-11-27 17:23:35 -05:00
table . ColMap ( "Value" ) . SetMaxSize ( 8192 )
}
return s
}
func ( ps SqlPluginStore ) CreateIndexesIfNotExists ( ) {
}
2019-07-09 09:08:28 -04:00
func ( ps SqlPluginStore ) SaveOrUpdate ( kv * model . PluginKeyValue ) ( * model . PluginKeyValue , * model . AppError ) {
if err := kv . IsValid ( ) ; err != nil {
return nil , err
}
2017-11-27 17:23:35 -05:00
2020-01-31 09:58:48 -04:00
if kv . Value == nil {
// Setting a key to nil is the same as removing it
err := ps . Delete ( kv . PluginId , kv . Key )
if err != nil {
return nil , err
}
return kv , nil
}
2019-07-09 09:08:28 -04:00
if ps . DriverName ( ) == model . DATABASE_DRIVER_POSTGRES {
// Unfortunately PostgreSQL pre-9.5 does not have an atomic upsert, so we use
// separate update and insert queries to accomplish our upsert
if rowsAffected , err := ps . GetMaster ( ) . Update ( kv ) ; err != nil {
return nil , model . NewAppError ( "SqlPluginStore.SaveOrUpdate" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
} else if rowsAffected == 0 {
// No rows were affected by the update, so let's try an insert
if err := ps . GetMaster ( ) . Insert ( kv ) ; err != nil {
// If the error is from unique constraints violation, it's the result of a
// valid race and we can report success. Otherwise we have a real error and
// need to return it
if ! IsUniqueConstraintError ( err , [ ] string { "PRIMARY" , "PluginId" , "Key" , "PKey" } ) {
return nil , model . NewAppError ( "SqlPluginStore.SaveOrUpdate" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
2017-11-27 17:23:35 -05:00
}
}
}
2019-07-09 09:08:28 -04:00
} else if ps . DriverName ( ) == model . DATABASE_DRIVER_MYSQL {
if _ , err := ps . GetMaster ( ) . Exec ( "INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue, ExpireAt) VALUES(:PluginId, :Key, :Value, :ExpireAt) ON DUPLICATE KEY UPDATE PValue = :Value, ExpireAt = :ExpireAt" , map [ string ] interface { } { "PluginId" : kv . PluginId , "Key" : kv . Key , "Value" : kv . Value , "ExpireAt" : kv . ExpireAt } ) ; err != nil {
return nil , model . NewAppError ( "SqlPluginStore.SaveOrUpdate" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
}
}
2017-11-27 17:23:35 -05:00
2019-07-09 09:08:28 -04:00
return kv , nil
2017-11-27 17:23:35 -05:00
}
2019-04-23 13:35:17 -04:00
func ( ps SqlPluginStore ) CompareAndSet ( kv * model . PluginKeyValue , oldValue [ ] byte ) ( bool , * model . AppError ) {
if err := kv . IsValid ( ) ; err != nil {
return false , err
}
2019-08-21 23:25:38 -03:00
if kv . Value == nil {
// Setting a key to nil is the same as removing it
return ps . CompareAndDelete ( kv , oldValue )
}
2019-04-23 13:35:17 -04:00
if oldValue == nil {
// Insert if oldValue is nil
if err := ps . GetMaster ( ) . Insert ( kv ) ; err != nil {
// If the error is from unique constraints violation, it's the result of a
// race condition, return false and no error. Otherwise we have a real error and
// need to return it.
if IsUniqueConstraintError ( err , [ ] string { "PRIMARY" , "PluginId" , "Key" , "PKey" } ) {
return false , nil
} else {
return false , model . NewAppError ( "SqlPluginStore.CompareAndSet" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
}
}
} else {
// Update if oldValue is not nil
updateResult , err := ps . GetMaster ( ) . Exec (
2019-11-04 09:49:54 -03:00
` UPDATE PluginKeyValueStore SET PValue = :New, ExpireAt = :ExpireAt WHERE PluginId = :PluginId AND PKey = :Key AND PValue = :Old ` ,
2019-04-23 13:35:17 -04:00
map [ string ] interface { } {
"PluginId" : kv . PluginId ,
"Key" : kv . Key ,
"Old" : oldValue ,
"New" : kv . Value ,
2019-11-04 09:49:54 -03:00
"ExpireAt" : kv . ExpireAt ,
2019-04-23 13:35:17 -04:00
} ,
)
if err != nil {
return false , model . NewAppError ( "SqlPluginStore.CompareAndSet" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
}
if rowsAffected , err := updateResult . RowsAffected ( ) ; err != nil {
// Failed to update
return false , model . NewAppError ( "SqlPluginStore.CompareAndSet" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
} else if rowsAffected == 0 {
MM-21328: Fix KVCompareAndSet when new==old (#13612)
`KVCompareAndSet(key, sameValue, sameValue)` can fail spuriously on MySQL if the underlying `UPDATE` requires no actual changes. As per the [MySQL documentation](https://dev.mysql.com/doc/refman/8.0/en/mysql-affected-rows.html), we can't rely on rows affected in this case:
> For UPDATE statements, the affected-rows value by default is the number of rows actually changed. If you specify the CLIENT_FOUND_ROWS flag to mysql_real_connect() when connecting to mysqld, the affected-rows value is the number of rows “found”; that is, matched by the WHERE clause.
It's not tenable to change `CLIENT_FOUND_ROWS` for the all connection, so handle this case in the code instead by running a `SELECT` after the fact. Note that `KVCompareAndSet` has no guarantee of atomicity in this case, but neither would `CompareAndSwap` on which this is method was inspired.
Finally, note that no changes are required for Postgres, which has sane semantics as the default.
Fixes: https://mattermost.atlassian.net/browse/MM-21328
Co-authored-by: mattermod <mattermod@users.noreply.github.com>
2020-01-16 15:29:10 -04:00
if ps . DriverName ( ) == model . DATABASE_DRIVER_MYSQL && bytes . Equal ( oldValue , kv . Value ) {
// ROW_COUNT on MySQL is zero even if the row existed but no changes to the row were required.
// Check if the row exists with the required value to distinguish this case. Strictly speaking,
// this isn't a good use of CompareAndSet anyway, since there's no corresponding guarantee of
// atomicity. Nevertheless, let's return results consistent with Postgres and with what might
// be expected in this case.
count , err := ps . GetReplica ( ) . SelectInt (
"SELECT COUNT(*) FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND PValue = :Value" ,
map [ string ] interface { } {
"PluginId" : kv . PluginId ,
"Key" : kv . Key ,
"Value" : kv . Value ,
} ,
)
if err != nil {
return false , model . NewAppError ( "SqlPluginStore.CompareAndSet" , "store.sql_plugin_store.compare_and_set.mysql_select.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, key=%v, err=%v" , kv . PluginId , kv . Key , err . Error ( ) ) , http . StatusInternalServerError )
}
if count == 0 {
return false , nil
} else if count == 1 {
return true , nil
} else {
return false , model . NewAppError ( "SqlPluginStore.CompareAndSet" , "store.sql_plugin_store.compare_and_set.too_many_rows.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, key=%v, count=%d" , kv . PluginId , kv . Key , count ) , http . StatusInternalServerError )
}
}
2019-04-23 13:35:17 -04:00
// No rows were affected by the update, where condition was not satisfied,
// return false, but no error.
return false , nil
}
}
return true , nil
}
2019-08-21 23:25:38 -03:00
func ( ps SqlPluginStore ) CompareAndDelete ( kv * model . PluginKeyValue , oldValue [ ] byte ) ( bool , * model . AppError ) {
if err := kv . IsValid ( ) ; err != nil {
return false , err
}
if oldValue == nil {
// nil can't be stored. Return showing that we didn't do anything
return false , nil
}
deleteResult , err := ps . GetMaster ( ) . Exec (
` DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND PValue = :Old ` ,
map [ string ] interface { } {
"PluginId" : kv . PluginId ,
"Key" : kv . Key ,
"Old" : oldValue ,
} ,
)
if err != nil {
return false , model . NewAppError ( "SqlPluginStore.CompareAndDelete" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
}
if rowsAffected , err := deleteResult . RowsAffected ( ) ; err != nil {
return false , model . NewAppError ( "SqlPluginStore.CompareAndDelete" , "store.sql_plugin_store.save.app_error" , nil , err . Error ( ) , http . StatusInternalServerError )
} else if rowsAffected == 0 {
return false , nil
}
return true , nil
}
2019-12-03 10:46:15 +01:00
func ( ps SqlPluginStore ) SetWithOptions ( pluginId string , key string , value [ ] byte , opt model . PluginKVSetOptions ) ( bool , * model . AppError ) {
2019-11-04 09:49:54 -03:00
if err := opt . IsValid ( ) ; err != nil {
return false , err
}
kv , err := model . NewPluginKeyValueFromOptions ( pluginId , key , value , opt )
if err != nil {
return false , err
}
if opt . Atomic {
2019-12-03 10:46:15 +01:00
return ps . CompareAndSet ( kv , opt . OldValue )
2019-11-04 09:49:54 -03:00
}
savedKv , err := ps . SaveOrUpdate ( kv )
if err != nil {
return false , err
}
return savedKv != nil , nil
}
2019-07-08 14:13:10 +03:00
func ( ps SqlPluginStore ) Get ( pluginId , key string ) ( * model . PluginKeyValue , * model . AppError ) {
var kv * model . PluginKeyValue
currentTime := model . GetMillis ( )
if err := ps . GetReplica ( ) . SelectOne ( & kv , "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND (ExpireAt = 0 OR ExpireAt > :CurrentTime)" , map [ string ] interface { } { "PluginId" : pluginId , "Key" : key , "CurrentTime" : currentTime } ) ; err != nil {
if err == sql . ErrNoRows {
return nil , model . NewAppError ( "SqlPluginStore.Get" , "store.sql_plugin_store.get.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, key=%v, err=%v" , pluginId , key , err . Error ( ) ) , http . StatusNotFound )
2017-11-27 17:23:35 -05:00
}
2019-07-08 14:13:10 +03:00
return nil , model . NewAppError ( "SqlPluginStore.Get" , "store.sql_plugin_store.get.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, key=%v, err=%v" , pluginId , key , err . Error ( ) ) , http . StatusInternalServerError )
}
return kv , nil
2017-11-27 17:23:35 -05:00
}
2019-07-16 16:10:17 +03:00
func ( ps SqlPluginStore ) Delete ( pluginId , key string ) * model . AppError {
if _ , err := ps . GetMaster ( ) . Exec ( "DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key" , map [ string ] interface { } { "PluginId" : pluginId , "Key" : key } ) ; err != nil {
return model . NewAppError ( "SqlPluginStore.Delete" , "store.sql_plugin_store.delete.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, key=%v, err=%v" , pluginId , key , err . Error ( ) ) , http . StatusInternalServerError )
}
return nil
2017-11-27 17:23:35 -05:00
}
2018-10-03 13:04:37 -07:00
2019-07-10 21:54:38 +03:00
func ( ps SqlPluginStore ) DeleteAllForPlugin ( pluginId string ) * model . AppError {
if _ , err := ps . GetMaster ( ) . Exec ( "DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId" , map [ string ] interface { } { "PluginId" : pluginId } ) ; err != nil {
return model . NewAppError ( "SqlPluginStore.Delete" , "store.sql_plugin_store.delete.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, err=%v" , pluginId , err . Error ( ) ) , http . StatusInternalServerError )
}
return nil
2018-10-10 19:55:12 +02:00
}
2019-07-09 18:15:35 +03:00
func ( ps SqlPluginStore ) DeleteAllExpired ( ) * model . AppError {
currentTime := model . GetMillis ( )
if _ , err := ps . GetMaster ( ) . Exec ( "DELETE FROM PluginKeyValueStore WHERE ExpireAt != 0 AND ExpireAt < :CurrentTime" , map [ string ] interface { } { "CurrentTime" : currentTime } ) ; err != nil {
return model . NewAppError ( "SqlPluginStore.Delete" , "store.sql_plugin_store.delete.app_error" , nil , fmt . Sprintf ( "current_time=%v, err=%v" , currentTime , err . Error ( ) ) , http . StatusInternalServerError )
}
return nil
2018-10-10 19:55:12 +02:00
}
2019-07-08 18:35:33 +03:00
func ( ps SqlPluginStore ) List ( pluginId string , offset int , limit int ) ( [ ] string , * model . AppError ) {
2018-10-03 13:04:37 -07:00
if limit <= 0 {
limit = DEFAULT_PLUGIN_KEY_FETCH_LIMIT
}
if offset <= 0 {
offset = 0
}
2019-07-08 18:35:33 +03:00
var keys [ ] string
_ , err := ps . GetReplica ( ) . Select ( & keys , "SELECT PKey FROM PluginKeyValueStore WHERE PluginId = :PluginId order by PKey limit :Limit offset :Offset" , map [ string ] interface { } { "PluginId" : pluginId , "Limit" : limit , "Offset" : offset } )
if err != nil {
return nil , model . NewAppError ( "SqlPluginStore.List" , "store.sql_plugin_store.list.app_error" , nil , fmt . Sprintf ( "plugin_id=%v, err=%v" , pluginId , err . Error ( ) ) , http . StatusInternalServerError )
}
return keys , nil
2018-10-03 13:04:37 -07:00
}