Files
mattermost/store/layered_store.go
Jonathan 375c0632fa PLT-7503: Create Message Export Scheduled Task and CLI Command (#7612)
* Created message export scheduled task

* Added CLI command to immediately kick off an export job

* Added email addresses for users joining and leaving the channel to the export

* Added support for both MySQL and PostgreSQL

* Fixing gofmt error

* Added a new ChannelMemberHistory store and associated tests

* Updating the ChannelMemberHistory channel as users create/join/leave channels

* Added user email to the message export object so it can be included in the actiance export xml

* Don't fail to log a leave event if a corresponding join event wasn't logged

* Adding copyright notices

* Adding message export settings to daily diagnostics report

* Added System Console integration for message export

* Cleaned up TODOs

* Made batch size configurable

* Added export from timestamp to CLI command

* Made ChannelMemberHistory table updates best effort

* Added a context-based timeout option to the message export CLI

* Minor PR updates/improvements

* Removed unnecessary fields from MessageExport object to reduce query overhead

* Removed JSON functions from the message export query in an effort to optimize performance

* Changed the way that channel member history queries and purges work to better account for edge cases

* Fixing a test I missed with the last refactor

* Added file copy functionality to file backend, improved config validation, added default config values

* Fixed file copy tests

* More concise use of the testing libraries

* Fixed context leak error

* Changed default export path to correctly place an 'export' directory under the 'data' directory

* Can't delete records from a read replica

* Fixed copy file tests

* Start job workers when license is applied, if configured to do so

* Suggestions from the PR

* Moar unit tests

* Fixed test imports
2017-11-30 09:07:04 -05:00

221 lines
5.5 KiB
Go

// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package store
import (
"context"
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/einterfaces"
"github.com/mattermost/mattermost-server/model"
)
const (
ENABLE_EXPERIMENTAL_REDIS = false
)
type LayeredStoreDatabaseLayer interface {
LayeredStoreSupplier
Store
}
type LayeredStore struct {
TmpContext context.Context
ReactionStore ReactionStore
DatabaseLayer LayeredStoreDatabaseLayer
LocalCacheLayer *LocalCacheSupplier
RedisLayer *RedisSupplier
LayerChainHead LayeredStoreSupplier
}
func NewLayeredStore(db LayeredStoreDatabaseLayer, metrics einterfaces.MetricsInterface, cluster einterfaces.ClusterInterface) Store {
store := &LayeredStore{
TmpContext: context.TODO(),
DatabaseLayer: db,
LocalCacheLayer: NewLocalCacheSupplier(metrics, cluster),
}
store.ReactionStore = &LayeredReactionStore{store}
// Setup the chain
if ENABLE_EXPERIMENTAL_REDIS {
l4g.Debug("Experimental redis enabled.")
store.RedisLayer = NewRedisSupplier()
store.RedisLayer.SetChainNext(store.DatabaseLayer)
store.LayerChainHead = store.RedisLayer
} else {
store.LocalCacheLayer.SetChainNext(store.DatabaseLayer)
store.LayerChainHead = store.LocalCacheLayer
}
return store
}
type QueryFunction func(LayeredStoreSupplier) *LayeredStoreSupplierResult
func (s *LayeredStore) RunQuery(queryFunction QueryFunction) StoreChannel {
storeChannel := make(StoreChannel)
go func() {
result := queryFunction(s.LayerChainHead)
storeChannel <- result.StoreResult
}()
return storeChannel
}
func (s *LayeredStore) Team() TeamStore {
return s.DatabaseLayer.Team()
}
func (s *LayeredStore) Channel() ChannelStore {
return s.DatabaseLayer.Channel()
}
func (s *LayeredStore) Post() PostStore {
return s.DatabaseLayer.Post()
}
func (s *LayeredStore) User() UserStore {
return s.DatabaseLayer.User()
}
func (s *LayeredStore) Audit() AuditStore {
return s.DatabaseLayer.Audit()
}
func (s *LayeredStore) ClusterDiscovery() ClusterDiscoveryStore {
return s.DatabaseLayer.ClusterDiscovery()
}
func (s *LayeredStore) Compliance() ComplianceStore {
return s.DatabaseLayer.Compliance()
}
func (s *LayeredStore) Session() SessionStore {
return s.DatabaseLayer.Session()
}
func (s *LayeredStore) OAuth() OAuthStore {
return s.DatabaseLayer.OAuth()
}
func (s *LayeredStore) System() SystemStore {
return s.DatabaseLayer.System()
}
func (s *LayeredStore) Webhook() WebhookStore {
return s.DatabaseLayer.Webhook()
}
func (s *LayeredStore) Command() CommandStore {
return s.DatabaseLayer.Command()
}
func (s *LayeredStore) CommandWebhook() CommandWebhookStore {
return s.DatabaseLayer.CommandWebhook()
}
func (s *LayeredStore) Preference() PreferenceStore {
return s.DatabaseLayer.Preference()
}
func (s *LayeredStore) License() LicenseStore {
return s.DatabaseLayer.License()
}
func (s *LayeredStore) Token() TokenStore {
return s.DatabaseLayer.Token()
}
func (s *LayeredStore) Emoji() EmojiStore {
return s.DatabaseLayer.Emoji()
}
func (s *LayeredStore) Status() StatusStore {
return s.DatabaseLayer.Status()
}
func (s *LayeredStore) FileInfo() FileInfoStore {
return s.DatabaseLayer.FileInfo()
}
func (s *LayeredStore) Reaction() ReactionStore {
return s.ReactionStore
}
func (s *LayeredStore) Job() JobStore {
return s.DatabaseLayer.Job()
}
func (s *LayeredStore) UserAccessToken() UserAccessTokenStore {
return s.DatabaseLayer.UserAccessToken()
}
func (s *LayeredStore) ChannelMemberHistory() ChannelMemberHistoryStore {
return s.DatabaseLayer.ChannelMemberHistory()
}
func (s *LayeredStore) Plugin() PluginStore {
return s.DatabaseLayer.Plugin()
}
func (s *LayeredStore) MarkSystemRanUnitTests() {
s.DatabaseLayer.MarkSystemRanUnitTests()
}
func (s *LayeredStore) Close() {
s.DatabaseLayer.Close()
}
func (s *LayeredStore) DropAllTables() {
s.DatabaseLayer.DropAllTables()
}
func (s *LayeredStore) TotalMasterDbConnections() int {
return s.DatabaseLayer.TotalMasterDbConnections()
}
func (s *LayeredStore) TotalReadDbConnections() int {
return s.DatabaseLayer.TotalReadDbConnections()
}
func (s *LayeredStore) TotalSearchDbConnections() int {
return s.DatabaseLayer.TotalSearchDbConnections()
}
type LayeredReactionStore struct {
*LayeredStore
}
func (s *LayeredReactionStore) Save(reaction *model.Reaction) StoreChannel {
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionSave(s.TmpContext, reaction)
})
}
func (s *LayeredReactionStore) Delete(reaction *model.Reaction) StoreChannel {
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionDelete(s.TmpContext, reaction)
})
}
func (s *LayeredReactionStore) GetForPost(postId string, allowFromCache bool) StoreChannel {
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionGetForPost(s.TmpContext, postId)
})
}
func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreChannel {
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName)
})
}
func (s *LayeredReactionStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel {
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionPermanentDeleteBatch(s.TmpContext, endTime, limit)
})
}