Files
mattermost/store/sqlstore/plugin_store.go
Joshua Bezaleel Abednego 4ba6c35813 [MM-24664] Plugin_store queries squirrel refactor (#14523)
* Use squirrel to build query for plugin store

* Typo of PluginkeyValueStore to PluginKeyValueStore

* wrong parameter of queryBuilder in Get method

* Use casting to int for comparison on ExpireAt

* Revert query for CompareAndSet and CompareAndDelete temporarily

* Delete query of expired value

* Update query when oldValue is not nil

* Check count query when there is no row affected

* Delete query on CompareAndDelete

* Put squirrel in separate import group

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
Co-authored-by: Agniva De Sarker <agnivade@yahoo.co.in>
2020-07-15 22:22:35 +05:30

384 lines
13 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sqlstore
import (
"bytes"
"database/sql"
"fmt"
"net/http"
sq "github.com/Masterminds/squirrel"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
)
const (
defaultPluginKeyFetchLimit = 10
)
type SqlPluginStore struct {
SqlStore
}
func newSqlPluginStore(sqlStore SqlStore) store.PluginStore {
s := &SqlPluginStore{sqlStore}
for _, db := range sqlStore.GetAllConns() {
table := db.AddTableWithName(model.PluginKeyValue{}, "PluginKeyValueStore").SetKeys(false, "PluginId", "Key")
table.ColMap("PluginId").SetMaxSize(190)
table.ColMap("Key").SetMaxSize(50)
table.ColMap("Value").SetMaxSize(8192)
}
return s
}
func (ps SqlPluginStore) createIndexesIfNotExists() {
}
func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) (*model.PluginKeyValue, *model.AppError) {
if err := kv.IsValid(); err != nil {
return nil, err
}
if kv.Value == nil {
// Setting a key to nil is the same as removing it
err := ps.Delete(kv.PluginId, kv.Key)
if err != nil {
return nil, err
}
return kv, nil
}
if ps.DriverName() == model.DATABASE_DRIVER_POSTGRES {
// Unfortunately PostgreSQL pre-9.5 does not have an atomic upsert, so we use
// separate update and insert queries to accomplish our upsert
if rowsAffected, err := ps.GetMaster().Update(kv); err != nil {
return nil, model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
} else if rowsAffected == 0 {
// No rows were affected by the update, so let's try an insert
if err := ps.GetMaster().Insert(kv); err != nil {
return nil, model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusBadRequest)
}
}
} else if ps.DriverName() == model.DATABASE_DRIVER_MYSQL {
query := ps.getQueryBuilder().
Insert("PluginKeyValueStore").
Columns("PluginId", "PKey", "PValue", "ExpireAt").
Values(kv.PluginId, kv.Key, kv.Value, kv.ExpireAt).
SuffixExpr(sq.Expr("ON DUPLICATE KEY UPDATE PValue = ?, ExpireAt = ?", kv.Value, kv.ExpireAt))
queryString, args, err := query.ToSql()
if err != nil {
return nil, model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql.build_query.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if _, err := ps.GetMaster().Exec(queryString, args...); err != nil {
return nil, model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
}
}
return kv, nil
}
func (ps SqlPluginStore) CompareAndSet(kv *model.PluginKeyValue, oldValue []byte) (bool, *model.AppError) {
if err := kv.IsValid(); err != nil {
return false, err
}
if kv.Value == nil {
// Setting a key to nil is the same as removing it
return ps.CompareAndDelete(kv, oldValue)
}
if oldValue == nil {
// Delete any existing, expired value.
query := ps.getQueryBuilder().
Delete("PluginKeyValueStore").
Where(sq.Eq{"PluginId": kv.PluginId}).
Where(sq.Eq{"PKey": kv.Key}).
Where(sq.NotEq{"ExpireAt": int(0)}).
Where(sq.Lt{"ExpireAt": model.GetMillis()})
queryString, args, err := query.ToSql()
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql.build_query.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if _, err := ps.GetMaster().Exec(queryString, args...); err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.delete.app_error", nil, err.Error(), http.StatusInternalServerError)
}
// Insert if oldValue is nil
if err := ps.GetMaster().Insert(kv); err != nil {
// If the error is from unique constraints violation, it's the result of a
// race condition, return false and no error. Otherwise we have a real error and
// need to return it.
if IsUniqueConstraintError(err, []string{"PRIMARY", "PluginId", "Key", "PKey", "pkey"}) {
return false, nil
} else {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
}
}
} else {
currentTime := model.GetMillis()
// Update if oldValue is not nil
query := ps.getQueryBuilder().
Update("PluginKeyValueStore").
Set("PValue", kv.Value).
Set("ExpireAt", kv.ExpireAt).
Where(sq.Eq{"PluginId": kv.PluginId}).
Where(sq.Eq{"PKey": kv.Key}).
Where(sq.Eq{"PValue": oldValue}).
Where(sq.Or{
sq.Eq{"ExpireAt": int(0)},
sq.Gt{"ExpireAt": currentTime},
})
queryString, args, err := query.ToSql()
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql.build_query.app_error", nil, err.Error(), http.StatusInternalServerError)
}
updateResult, err := ps.GetMaster().Exec(queryString, args...)
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if rowsAffected, err := updateResult.RowsAffected(); err != nil {
// Failed to update
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
} else if rowsAffected == 0 {
if ps.DriverName() == model.DATABASE_DRIVER_MYSQL && bytes.Equal(oldValue, kv.Value) {
// ROW_COUNT on MySQL is zero even if the row existed but no changes to the row were required.
// Check if the row exists with the required value to distinguish this case. Strictly speaking,
// this isn't a good use of CompareAndSet anyway, since there's no corresponding guarantee of
// atomicity. Nevertheless, let's return results consistent with Postgres and with what might
// be expected in this case.
query := ps.getQueryBuilder().
Select("COUNT(*)").
From("PluginKeyValueStore").
Where(sq.Eq{"PluginId": kv.PluginId}).
Where(sq.Eq{"PKey": kv.Key}).
Where(sq.Eq{"PValue": kv.Value}).
Where(sq.Or{
sq.Eq{"ExpireAt": int(0)},
sq.Gt{"ExpireAt": currentTime},
})
queryString, args, err := query.ToSql()
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql.build_query.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", kv.PluginId, kv.Key, err.Error()), http.StatusInternalServerError)
}
count, err := ps.GetReplica().SelectInt(queryString, args...)
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.compare_and_set.mysql_select.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", kv.PluginId, kv.Key, err.Error()), http.StatusInternalServerError)
}
if count == 0 {
return false, nil
} else if count == 1 {
return true, nil
} else {
return false, model.NewAppError("SqlPluginStore.CompareAndSet", "store.sql_plugin_store.compare_and_set.too_many_rows.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, count=%d", kv.PluginId, kv.Key, count), http.StatusInternalServerError)
}
}
// No rows were affected by the update, where condition was not satisfied,
// return false, but no error.
return false, nil
}
}
return true, nil
}
func (ps SqlPluginStore) CompareAndDelete(kv *model.PluginKeyValue, oldValue []byte) (bool, *model.AppError) {
if err := kv.IsValid(); err != nil {
return false, err
}
if oldValue == nil {
// nil can't be stored. Return showing that we didn't do anything
return false, nil
}
query := ps.getQueryBuilder().
Delete("PluginKeyValueStore").
Where(sq.Eq{"PluginId": kv.PluginId}).
Where(sq.Eq{"PKey": kv.Key}).
Where(sq.Eq{"PValue": oldValue}).
Where(sq.Or{
sq.Eq{"ExpireAt": int(0)},
sq.Gt{"ExpireAt": model.GetMillis()},
})
queryString, args, err := query.ToSql()
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndDelete", "store.sql.build_query.app_error", nil, err.Error(), http.StatusInternalServerError)
}
deleteResult, err := ps.GetMaster().Exec(queryString, args...)
if err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndDelete", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if rowsAffected, err := deleteResult.RowsAffected(); err != nil {
return false, model.NewAppError("SqlPluginStore.CompareAndDelete", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
} else if rowsAffected == 0 {
return false, nil
}
return true, nil
}
func (ps SqlPluginStore) SetWithOptions(pluginId string, key string, value []byte, opt model.PluginKVSetOptions) (bool, *model.AppError) {
if err := opt.IsValid(); err != nil {
return false, err
}
kv, err := model.NewPluginKeyValueFromOptions(pluginId, key, value, opt)
if err != nil {
return false, err
}
if opt.Atomic {
return ps.CompareAndSet(kv, opt.OldValue)
}
savedKv, err := ps.SaveOrUpdate(kv)
if err != nil {
return false, err
}
return savedKv != nil, nil
}
func (ps SqlPluginStore) Get(pluginId, key string) (*model.PluginKeyValue, *model.AppError) {
currentTime := model.GetMillis()
failure := func(err error, statusCode int) *model.AppError {
return model.NewAppError(
"SqlPluginStore.Get",
"store.sql_plugin_store.get.app_error",
nil,
fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()),
statusCode,
)
}
query := ps.getQueryBuilder().Select("PluginId, PKey, PValue, ExpireAt").
From("PluginKeyValueStore").
Where(sq.Eq{"PluginId": pluginId}).
Where(sq.Eq{"PKey": key}).
Where(sq.Or{sq.Eq{"ExpireAt": 0}, sq.Gt{"ExpireAt": currentTime}})
queryString, args, err := query.ToSql()
if err != nil {
return nil, failure(err, http.StatusInternalServerError)
}
row := ps.GetReplica().Db.QueryRow(queryString, args...)
var kv model.PluginKeyValue
if err := row.Scan(&kv.PluginId, &kv.Key, &kv.Value, &kv.ExpireAt); err != nil {
if err == sql.ErrNoRows {
return nil, failure(err, http.StatusNotFound)
}
return nil, failure(err, http.StatusInternalServerError)
}
return &kv, nil
}
func (ps SqlPluginStore) Delete(pluginId, key string) *model.AppError {
query := ps.getQueryBuilder().
Delete("PluginKeyValueStore").
Where(sq.Eq{"PluginId": pluginId}).
Where(sq.Eq{"Pkey": key})
queryString, args, err := query.ToSql()
if err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql.build_query.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusInternalServerError)
}
if _, err := ps.GetMaster().Exec(queryString, args...); err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusInternalServerError)
}
return nil
}
func (ps SqlPluginStore) DeleteAllForPlugin(pluginId string) *model.AppError {
query := ps.getQueryBuilder().
Delete("PluginKeyValueStore").
Where(sq.Eq{"PluginId": pluginId})
queryString, args, err := query.ToSql()
if err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql.build_query.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
}
if _, err := ps.GetMaster().Exec(queryString, args...); err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
}
return nil
}
func (ps SqlPluginStore) DeleteAllExpired() *model.AppError {
currentTime := model.GetMillis()
query := ps.getQueryBuilder().
Delete("PluginKeyValueStore").
Where(sq.NotEq{"ExpireAt": 0}).
Where(sq.Lt{"ExpireAt": currentTime})
queryString, args, err := query.ToSql()
if err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql.build_query.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError)
}
if _, err := ps.GetMaster().Exec(queryString, args...); err != nil {
return model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError)
}
return nil
}
func (ps SqlPluginStore) List(pluginId string, offset int, limit int) ([]string, *model.AppError) {
if limit <= 0 {
limit = defaultPluginKeyFetchLimit
}
if offset <= 0 {
offset = 0
}
var keys []string
query := ps.getQueryBuilder().
Select("Pkey").
From("PluginKeyValueStore").
Where(sq.Eq{"PluginId": pluginId}).
Where(sq.Or{
sq.Eq{"ExpireAt": int(0)},
sq.Gt{"ExpireAt": model.GetMillis()},
}).
OrderBy("PKey").
Limit(uint64(limit)).
Offset(uint64(offset))
queryString, args, err := query.ToSql()
if err != nil {
return nil, model.NewAppError("SqlPluginStore.List", "store.sql.build_query.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
}
_, err = ps.GetReplica().Select(&keys, queryString, args...)
if err != nil {
return nil, model.NewAppError("SqlPluginStore.List", "store.sql_plugin_store.list.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
}
return keys, nil
}