Adding initial retry layer version (#14954)

* Adding initial retry layer version

* Some simplification around the generated code

* Generating retry layer again

* Improving naming generation in store generated layers

* Address PR review comments

* Updating store layers

* Addressing PR review comments

* fixing lint errors

* Updating store layers

* Adding license header

* Applying the retry layer to the reaction_store

* Regenerating retry layer
This commit is contained in:
Jesús Espino
2020-08-12 20:05:16 +02:00
committed by GitHub
parent c89c56ab2e
commit 1b141678fe
12 changed files with 9780 additions and 3549 deletions

View File

@@ -46,6 +46,7 @@ import (
"github.com/mattermost/mattermost-server/v5/services/tracing"
"github.com/mattermost/mattermost-server/v5/store"
"github.com/mattermost/mattermost-server/v5/store/localcachelayer"
"github.com/mattermost/mattermost-server/v5/store/retrylayer"
"github.com/mattermost/mattermost-server/v5/store/searchlayer"
"github.com/mattermost/mattermost-server/v5/store/sqlstore"
"github.com/mattermost/mattermost-server/v5/store/timerlayer"
@@ -293,7 +294,7 @@ func NewServer(options ...Option) (*Server, error) {
s.sqlStore = sqlstore.NewSqlSupplier(s.Config().SqlSettings, s.Metrics)
searchStore := searchlayer.NewSearchLayer(
localcachelayer.NewLocalCacheLayer(
s.sqlStore,
retrylayer.New(s.sqlStore),
s.Metrics,
s.Cluster,
s.CacheProvider,

View File

@@ -28,6 +28,10 @@ func isError(typeName string) bool {
return strings.Contains(typeName, APP_ERROR_TYPE) || strings.Contains(typeName, ERROR_TYPE)
}
func isAppError(typeName string) bool {
return strings.Contains(typeName, APP_ERROR_TYPE)
}
func main() {
if err := buildTimerLayer(); err != nil {
log.Fatal(err)
@@ -35,6 +39,22 @@ func main() {
if err := buildOpenTracingLayer(); err != nil {
log.Fatal(err)
}
if err := buildRetryLayer(); err != nil {
log.Fatal(err)
}
}
func buildRetryLayer() error {
code, err := generateLayer("RetryLayer", "retry_layer.go.tmpl")
if err != nil {
return err
}
formatedCode, err := format.Source(code)
if err != nil {
return err
}
return ioutil.WriteFile(path.Join("retrylayer/retrylayer.go"), formatedCode, 0644)
}
func buildTimerLayer() error {
@@ -213,19 +233,33 @@ func generateLayer(name, templateFile string) ([]byte, error) {
},
"genResultsVars": func(results []string) string {
vars := []string{}
for i := range results {
vars = append(vars, fmt.Sprintf("resultVar%d", i))
for i, typeName := range results {
if isError(typeName) {
vars = append(vars, "err")
} else if i == 0 {
vars = append(vars, "result")
} else {
vars = append(vars, fmt.Sprintf("resultVar%d", i))
}
}
return strings.Join(vars, ", ")
},
"errorToBoolean": func(results []string) string {
for i, typeName := range results {
for _, typeName := range results {
if isError(typeName) {
return fmt.Sprintf("resultVar%d == nil", i)
return fmt.Sprintf("err == nil")
}
}
return "true"
},
"isAppError": func(results []string) bool {
for _, typeName := range results {
if isAppError(typeName) {
return true
}
}
return false
},
"errorPresent": func(results []string) bool {
for _, typeName := range results {
if isError(typeName) {
@@ -235,9 +269,9 @@ func generateLayer(name, templateFile string) ([]byte, error) {
return false
},
"errorVar": func(results []string) string {
for i, typeName := range results {
for _, typeName := range results {
if isError(typeName) {
return fmt.Sprintf("resultVar%d", i)
return "err"
}
}
return ""
@@ -262,6 +296,19 @@ func generateLayer(name, templateFile string) ([]byte, error) {
}
return strings.Join(paramsWithType, ", ")
},
"joinParamsWithTypeOutsideStore": func(params []methodParam) string {
paramsWithType := []string{}
for _, param := range params {
if param.Type == "ChannelSearchOpts" || param.Type == "UserGetByIdsOpts" {
paramsWithType = append(paramsWithType, fmt.Sprintf("%s store.%s", param.Name, param.Type))
} else if param.Type == "*UserGetByIdsOpts" {
paramsWithType = append(paramsWithType, fmt.Sprintf("%s *store.UserGetByIdsOpts", param.Name))
} else {
paramsWithType = append(paramsWithType, fmt.Sprintf("%s %s", param.Name, param.Type))
}
}
return strings.Join(paramsWithType, ", ")
},
}
t := template.Must(template.New(templateFile).Funcs(myFuncs).ParseFiles("layer_generators/" + templateFile))

View File

@@ -0,0 +1,104 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
// Code generated by "make store-layers"
// DO NOT EDIT
package retrylayer
import (
"context"
"github.com/lib/pq"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
"github.com/pkg/errors"
"github.com/go-sql-driver/mysql"
)
const mySQLDeadlockCode = uint16(1213)
type {{.Name}} struct {
store.Store
{{range $index, $element := .SubStores}} {{$index}}Store store.{{$index}}Store
{{end}}
}
{{range $index, $element := .SubStores}}func (s *{{$.Name}}) {{$index}}() store.{{$index}}Store {
return s.{{$index}}Store
}
{{end}}
{{range $index, $element := .SubStores}}type {{$.Name}}{{$index}}Store struct {
store.{{$index}}Store
Root *{{$.Name}}
}
{{end}}
func isRepeatableError(err error) bool {
var pqErr *pq.Error
var mysqlErr *mysql.MySQLError
switch {
case errors.As(errors.Cause(err), &pqErr):
if pqErr.Code == "40001" || pqErr.Code == "40P01" {
return true
}
case errors.As(errors.Cause(err), &mysqlErr):
if mysqlErr.Number == mySQLDeadlockCode {
return true
}
}
return false
}
{{range $substoreName, $substore := .SubStores}}
{{range $index, $element := $substore.Methods}}
func (s *{{$.Name}}{{$substoreName}}Store) {{$index}}({{$element.Params | joinParamsWithTypeOutsideStore}}) {{$element.Results | joinResultsForSignature}} {
{{if $element.Results | len | eq 0}}
s.{{$substoreName}}Store.{{$index}}({{$element.Params | joinParams}})
{{else}}
{{if $element.Results | errorPresent}}
{{if $element.Results | isAppError}}
return s.{{$substoreName}}Store.{{$index}}({{$element.Params | joinParams}})
{{else}}
tries := 0
for {
{{$element.Results | genResultsVars}} := s.{{$substoreName}}Store.{{$index}}({{$element.Params | joinParams}})
if {{$element.Results | errorVar}} == nil {
return {{$element.Results | genResultsVars}}
}
if !isRepeatableError({{$element.Results | errorVar}}) {
return {{$element.Results | genResultsVars}}
}
tries++
if tries >= 3 {
{{$element.Results | errorVar}} = errors.Wrap({{$element.Results | errorVar}}, "giving up after 3 consecutive repeatable transaction failures")
return {{$element.Results | genResultsVars}}
}
}
{{end}}
{{else}}
return s.{{$substoreName}}Store.{{$index}}({{$element.Params | joinParams}})
{{end}}
{{end}}
}
{{end}}
{{end}}
{{range $index, $element := .Methods}}
func (s *{{$.Name}}) {{$index}}({{$element.Params | joinParamsWithTypeOutsideStore}}) {{$element.Results | joinResultsForSignature}} {
{{if $element.Results | len | eq 0}}s.Store.{{$index}}({{$element.Params | joinParams}})
{{else}}return s.Store.{{$index}}({{$element.Params | joinParams}})
{{end}}}
{{end}}
func New(childStore store.Store) *{{.Name}} {
newStore := {{.Name}}{
Store: childStore,
}
{{range $substoreName, $substore := .SubStores}}
newStore.{{$substoreName}}Store = &{{$.Name}}{{$substoreName}}Store{{"{"}}{{$substoreName}}Store: childStore.{{$substoreName}}(), Root: &newStore}{{end}}
return &newStore
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,117 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package retrylayer
import (
"testing"
"github.com/go-sql-driver/mysql"
"github.com/lib/pq"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store/storetest/mocks"
"github.com/pkg/errors"
)
func genStore() *mocks.Store {
mock := &mocks.Store{}
mock.On("Audit").Return(&mocks.AuditStore{})
mock.On("Bot").Return(&mocks.BotStore{})
mock.On("Channel").Return(&mocks.ChannelStore{})
mock.On("ChannelMemberHistory").Return(&mocks.ChannelMemberHistoryStore{})
mock.On("ClusterDiscovery").Return(&mocks.ClusterDiscoveryStore{})
mock.On("Command").Return(&mocks.CommandStore{})
mock.On("CommandWebhook").Return(&mocks.CommandWebhookStore{})
mock.On("Compliance").Return(&mocks.ComplianceStore{})
mock.On("Emoji").Return(&mocks.EmojiStore{})
mock.On("FileInfo").Return(&mocks.FileInfoStore{})
mock.On("Group").Return(&mocks.GroupStore{})
mock.On("Job").Return(&mocks.JobStore{})
mock.On("License").Return(&mocks.LicenseStore{})
mock.On("LinkMetadata").Return(&mocks.LinkMetadataStore{})
mock.On("OAuth").Return(&mocks.OAuthStore{})
mock.On("Plugin").Return(&mocks.PluginStore{})
mock.On("Post").Return(&mocks.PostStore{})
mock.On("Preference").Return(&mocks.PreferenceStore{})
mock.On("Reaction").Return(&mocks.ReactionStore{})
mock.On("Role").Return(&mocks.RoleStore{})
mock.On("Scheme").Return(&mocks.SchemeStore{})
mock.On("Session").Return(&mocks.SessionStore{})
mock.On("Status").Return(&mocks.StatusStore{})
mock.On("System").Return(&mocks.SystemStore{})
mock.On("Team").Return(&mocks.TeamStore{})
mock.On("TermsOfService").Return(&mocks.TermsOfServiceStore{})
mock.On("Token").Return(&mocks.TokenStore{})
mock.On("User").Return(&mocks.UserStore{})
mock.On("UserAccessToken").Return(&mocks.UserAccessTokenStore{})
mock.On("UserTermsOfService").Return(&mocks.UserTermsOfServiceStore{})
mock.On("Webhook").Return(&mocks.WebhookStore{})
return mock
}
func TestRetry(t *testing.T) {
t.Run("on regular error should not retry", func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
mockBotStore.On("Get", "test", false).Return(nil, errors.New("regular error")).Times(1)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
t.Run("on success should not retry", func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
mockBotStore.On("Get", "test", false).Return(&model.Bot{}, nil).Times(1)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
t.Run("on mysql repeatable error should retry", func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
mysqlErr := mysql.MySQLError{Number: uint16(1213), Message: "Deadlock"}
mockBotStore.On("Get", "test", false).Return(nil, errors.Wrap(&mysqlErr, "test-error")).Times(3)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
t.Run("on mysql not repeatable error should not retry", func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
mysqlErr := mysql.MySQLError{Number: uint16(1000), Message: "Not repeatable error"}
mockBotStore.On("Get", "test", false).Return(nil, errors.Wrap(&mysqlErr, "test-error")).Times(1)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
t.Run("on postgres repeatable error should retry", func(t *testing.T) {
for _, errCode := range []string{"40001", "40P01"} {
t.Run("error "+errCode, func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
pqErr := pq.Error{Code: pq.ErrorCode(errCode)}
mockBotStore.On("Get", "test", false).Return(nil, errors.Wrap(&pqErr, "test-error")).Times(3)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
}
})
t.Run("on postgres not repeatable error should not retry", func(t *testing.T) {
mock := genStore()
mockBotStore := mock.Bot().(*mocks.BotStore)
pqErr := pq.Error{Code: "20000"}
mockBotStore.On("Get", "test", false).Return(nil, errors.Wrap(&pqErr, "test-error")).Times(1)
mock.On("Bot").Return(&mockBotStore)
layer := New(mock)
layer.Bot().Get("test", false)
mockBotStore.AssertExpectations(t)
})
}

View File

@@ -736,28 +736,25 @@ func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64)
}
var newChannel *model.Channel
err := store.WithDeadlockRetry(func() error {
transaction, err := s.GetMaster().Begin()
if err != nil {
return errors.Wrap(err, "begin_transaction")
}
defer finalizeTransaction(transaction)
transaction, err := s.GetMaster().Begin()
if err != nil {
return nil, errors.Wrap(err, "begin_transaction")
}
defer finalizeTransaction(transaction)
newChannel, err = s.saveChannelT(transaction, channel, maxChannelsPerTeam)
if err != nil {
return err
}
newChannel, err = s.saveChannelT(transaction, channel, maxChannelsPerTeam)
if err != nil {
return newChannel, err
}
// Additionally propagate the write to the PublicChannels table.
if err := s.upsertPublicChannelT(transaction, newChannel); err != nil {
return errors.Wrap(err, "upsert_public_channel")
}
// Additionally propagate the write to the PublicChannels table.
if err = s.upsertPublicChannelT(transaction, newChannel); err != nil {
return nil, errors.Wrap(err, "upsert_public_channel")
}
if err := transaction.Commit(); err != nil {
return errors.Wrap(err, "commit_transaction")
}
return nil
})
if err = transaction.Commit(); err != nil {
return nil, errors.Wrap(err, "commit_transaction")
}
// There are cases when in case of conflict, the original channel value is returned.
// So we return both and let the caller do the checks.
return newChannel, err

View File

@@ -56,24 +56,18 @@ func (s *SqlReactionStore) Save(reaction *model.Reaction) (*model.Reaction, erro
}
func (s *SqlReactionStore) Delete(reaction *model.Reaction) (*model.Reaction, error) {
err := store.WithDeadlockRetry(func() error {
transaction, err := s.GetMaster().Begin()
if err != nil {
return errors.Wrap(err, "begin_transaction")
}
defer finalizeTransaction(transaction)
if err := deleteReactionAndUpdatePost(transaction, reaction); err != nil {
return errors.Wrap(err, "deleteReactionAndUpdatePost")
}
if err := transaction.Commit(); err != nil {
return errors.Wrap(err, "commit_transaction")
}
return nil
})
transaction, err := s.GetMaster().Begin()
if err != nil {
return nil, errors.Wrap(err, "failed to delete reaction")
return nil, errors.Wrap(err, "begin_transaction")
}
defer finalizeTransaction(transaction)
if err := deleteReactionAndUpdatePost(transaction, reaction); err != nil {
return nil, errors.Wrap(err, "deleteReactionAndUpdatePost")
}
if err := transaction.Commit(); err != nil {
return nil, errors.Wrap(err, "commit_transaction")
}
return reaction, nil
@@ -127,28 +121,22 @@ func (s *SqlReactionStore) DeleteAllWithEmojiName(emojiName string) error {
return errors.Wrapf(err, "failed to get Reactions with emojiName=%s", emojiName)
}
err := store.WithDeadlockRetry(func() error {
_, err := s.GetMaster().Exec(
`DELETE FROM
Reactions
WHERE
EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName})
return err
})
_, err := s.GetMaster().Exec(
`DELETE FROM
Reactions
WHERE
EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName})
if err != nil {
return errors.Wrapf(err, "failed to delete Reactions with emojiName=%s", emojiName)
}
for _, reaction := range reactions {
reaction := reaction
err := store.WithDeadlockRetry(func() error {
_, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_ON_DELETE_QUERY,
map[string]interface{}{
"PostId": reaction.PostId,
"UpdateAt": model.GetMillis(),
})
return err
})
_, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_ON_DELETE_QUERY,
map[string]interface{}{
"PostId": reaction.PostId,
"UpdateAt": model.GetMillis(),
})
if err != nil {
mlog.Warn("Unable to update Post.HasReactions while removing reactions",
mlog.String("post_id", reaction.PostId),

View File

@@ -9,11 +9,7 @@ import (
"context"
"time"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
type StoreResult struct {
@@ -775,31 +771,3 @@ type UserGetByIdsOpts struct {
// Since filters the users based on their UpdateAt timestamp.
Since int64
}
const mySQLDeadlockCode = uint16(1213)
// WithDeadlockRetry retries a given f if it throws a deadlock error.
// It breaks after a threshold and propagates the error upwards.
// TODO: This can be a separate retry layer in itself where transaction retries
// are automatically applied.
func WithDeadlockRetry(f func() error) error {
var err error
for i := 0; i < 3; i++ {
err = f()
if err == nil {
// No error, return nil.
return nil
}
// XXX: Possibly add check for postgres deadlocks later.
// But deadlocks are very rarely seen in postgres.
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == mySQLDeadlockCode {
mlog.Warn("A deadlock happened. Retrying.", mlog.Err(err))
// This is a deadlock, retry.
continue
}
// Some other error, return as-is.
return err
}
return errors.Wrap(err, "giving up after 3 consecutive deadlocks")
}

View File

@@ -1,56 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package store
import (
"testing"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type deadlock struct {
numRetries int
hasRetried int
}
func newDeadlock(numRetries int) *deadlock {
return &deadlock{
numRetries: numRetries,
}
}
func (d *deadlock) f() error {
if d.numRetries == d.hasRetried {
return nil
}
d.hasRetried++
return &mysql.MySQLError{
Number: mySQLDeadlockCode,
}
}
func TestDeadlockRetry(t *testing.T) {
t.Run("NoDeadlock", func(t *testing.T) {
d := newDeadlock(0)
err := WithDeadlockRetry(d.f)
require.NoError(t, err)
assert.Equal(t, 0, d.hasRetried)
})
t.Run("1Deadlock", func(t *testing.T) {
d := newDeadlock(1)
err := WithDeadlockRetry(d.f)
require.NoError(t, err)
assert.Equal(t, 1, d.hasRetried)
})
t.Run("AlwaysDeadlock", func(t *testing.T) {
d := newDeadlock(4)
err := WithDeadlockRetry(d.f)
require.Error(t, err)
assert.Equal(t, 3, d.hasRetried)
})
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
"github.com/mattermost/mattermost-server/v5/store/retrylayer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -429,6 +430,8 @@ func testReactionBulkGetForPosts(t *testing.T, ss store.Store) {
// testReactionDeadlock is a best-case attempt to recreate the deadlock scenario.
// It at least deadlocks 2 times out of 5.
func testReactionDeadlock(t *testing.T, ss store.Store) {
ss = retrylayer.New(ss)
post, err := ss.Post().Save(&model.Post{
ChannelId: model.NewId(),
UserId: model.NewId(),

File diff suppressed because it is too large Load Diff