mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: rename remote write backends to write configs (#41467)
This commit is contained in:
parent
e5d7be3e1c
commit
e2ed140de2
@ -441,10 +441,10 @@ func (hs *HTTPServer) registerRoutes() {
|
|||||||
liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin)
|
liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin)
|
||||||
liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin)
|
liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin)
|
||||||
liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin)
|
liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin)
|
||||||
liveRoute.Get("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsListHTTP), reqOrgAdmin)
|
liveRoute.Get("/write-configs", routing.Wrap(hs.Live.HandleWriteConfigsListHTTP), reqOrgAdmin)
|
||||||
liveRoute.Post("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsPostHTTP), reqOrgAdmin)
|
liveRoute.Post("/write-configs", routing.Wrap(hs.Live.HandleWriteConfigsPostHTTP), reqOrgAdmin)
|
||||||
liveRoute.Put("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsPutHTTP), reqOrgAdmin)
|
liveRoute.Put("/write-configs", routing.Wrap(hs.Live.HandleWriteConfigsPutHTTP), reqOrgAdmin)
|
||||||
liveRoute.Delete("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsDeleteHTTP), reqOrgAdmin)
|
liveRoute.Delete("/write-configs", routing.Wrap(hs.Live.HandleWriteConfigsDeleteHTTP), reqOrgAdmin)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1005,19 +1005,19 @@ type DryRunRuleStorage struct {
|
|||||||
ChannelRules []pipeline.ChannelRule
|
ChannelRules []pipeline.ChannelRule
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunRuleStorage) GetRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendGetCmd) (pipeline.RemoteWriteBackend, bool, error) {
|
func (s *DryRunRuleStorage) GetWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigGetCmd) (pipeline.WriteConfig, bool, error) {
|
||||||
return pipeline.RemoteWriteBackend{}, false, errors.New("not implemented by dry run rule storage")
|
return pipeline.WriteConfig{}, false, errors.New("not implemented by dry run rule storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunRuleStorage) CreateRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendCreateCmd) (pipeline.RemoteWriteBackend, error) {
|
func (s *DryRunRuleStorage) CreateWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigCreateCmd) (pipeline.WriteConfig, error) {
|
||||||
return pipeline.RemoteWriteBackend{}, errors.New("not implemented by dry run rule storage")
|
return pipeline.WriteConfig{}, errors.New("not implemented by dry run rule storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunRuleStorage) UpdateRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendUpdateCmd) (pipeline.RemoteWriteBackend, error) {
|
func (s *DryRunRuleStorage) UpdateWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigUpdateCmd) (pipeline.WriteConfig, error) {
|
||||||
return pipeline.RemoteWriteBackend{}, errors.New("not implemented by dry run rule storage")
|
return pipeline.WriteConfig{}, errors.New("not implemented by dry run rule storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunRuleStorage) DeleteRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendDeleteCmd) error {
|
func (s *DryRunRuleStorage) DeleteWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigDeleteCmd) error {
|
||||||
return errors.New("not implemented by dry run rule storage")
|
return errors.New("not implemented by dry run rule storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1033,7 +1033,7 @@ func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ pipe
|
|||||||
return errors.New("not implemented by dry run rule storage")
|
return errors.New("not implemented by dry run rule storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunRuleStorage) ListRemoteWriteBackends(_ context.Context, _ int64) ([]pipeline.RemoteWriteBackend, error) {
|
func (s *DryRunRuleStorage) ListWriteConfigs(_ context.Context, _ int64) ([]pipeline.WriteConfig, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1161,60 +1161,60 @@ func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) respo
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleRemoteWriteBackendsListHTTP ...
|
// HandleWriteConfigsListHTTP ...
|
||||||
func (g *GrafanaLive) HandleRemoteWriteBackendsListHTTP(c *models.ReqContext) response.Response {
|
func (g *GrafanaLive) HandleWriteConfigsListHTTP(c *models.ReqContext) response.Response {
|
||||||
backends, err := g.pipelineStorage.ListRemoteWriteBackends(c.Req.Context(), c.OrgId)
|
backends, err := g.pipelineStorage.ListWriteConfigs(c.Req.Context(), c.OrgId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Failed to get remote write backends", err)
|
return response.Error(http.StatusInternalServerError, "Failed to get write configs", err)
|
||||||
}
|
}
|
||||||
result := make([]pipeline.RemoteWriteBackendDto, 0, len(backends))
|
result := make([]pipeline.WriteConfigDto, 0, len(backends))
|
||||||
for _, b := range backends {
|
for _, b := range backends {
|
||||||
result = append(result, pipeline.RemoteWriteBackendToDto(b))
|
result = append(result, pipeline.WriteConfigToDto(b))
|
||||||
}
|
}
|
||||||
return response.JSON(http.StatusOK, util.DynMap{
|
return response.JSON(http.StatusOK, util.DynMap{
|
||||||
"remoteWriteBackends": result,
|
"writeConfigs": result,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleChannelRulesPostHTTP ...
|
// HandleWriteConfigsPostHTTP ...
|
||||||
func (g *GrafanaLive) HandleRemoteWriteBackendsPostHTTP(c *models.ReqContext) response.Response {
|
func (g *GrafanaLive) HandleWriteConfigsPostHTTP(c *models.ReqContext) response.Response {
|
||||||
body, err := ioutil.ReadAll(c.Req.Body)
|
body, err := ioutil.ReadAll(c.Req.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
||||||
}
|
}
|
||||||
var cmd pipeline.RemoteWriteBackendCreateCmd
|
var cmd pipeline.WriteConfigCreateCmd
|
||||||
err = json.Unmarshal(body, &cmd)
|
err = json.Unmarshal(body, &cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err)
|
return response.Error(http.StatusBadRequest, "Error decoding write config create command", err)
|
||||||
}
|
}
|
||||||
result, err := g.pipelineStorage.CreateRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd)
|
result, err := g.pipelineStorage.CreateWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Failed to create remote write backend", err)
|
return response.Error(http.StatusInternalServerError, "Failed to create write config", err)
|
||||||
}
|
}
|
||||||
return response.JSON(http.StatusOK, util.DynMap{
|
return response.JSON(http.StatusOK, util.DynMap{
|
||||||
"remoteWriteBackend": pipeline.RemoteWriteBackendToDto(result),
|
"writeConfig": pipeline.WriteConfigToDto(result),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleChannelRulesPutHTTP ...
|
// HandleWriteConfigsPutHTTP ...
|
||||||
func (g *GrafanaLive) HandleRemoteWriteBackendsPutHTTP(c *models.ReqContext) response.Response {
|
func (g *GrafanaLive) HandleWriteConfigsPutHTTP(c *models.ReqContext) response.Response {
|
||||||
body, err := ioutil.ReadAll(c.Req.Body)
|
body, err := ioutil.ReadAll(c.Req.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
||||||
}
|
}
|
||||||
var cmd pipeline.RemoteWriteBackendUpdateCmd
|
var cmd pipeline.WriteConfigUpdateCmd
|
||||||
err = json.Unmarshal(body, &cmd)
|
err = json.Unmarshal(body, &cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err)
|
return response.Error(http.StatusBadRequest, "Error decoding write config update command", err)
|
||||||
}
|
}
|
||||||
if cmd.UID == "" {
|
if cmd.UID == "" {
|
||||||
return response.Error(http.StatusBadRequest, "UID required", nil)
|
return response.Error(http.StatusBadRequest, "UID required", nil)
|
||||||
}
|
}
|
||||||
existingBackend, ok, err := g.pipelineStorage.GetRemoteWriteBackend(c.Req.Context(), c.OrgId, pipeline.RemoteWriteBackendGetCmd{
|
existingBackend, ok, err := g.pipelineStorage.GetWriteConfig(c.Req.Context(), c.OrgId, pipeline.WriteConfigGetCmd{
|
||||||
UID: cmd.UID,
|
UID: cmd.UID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Failed to get remote write backend", err)
|
return response.Error(http.StatusInternalServerError, "Failed to get write config", err)
|
||||||
}
|
}
|
||||||
if ok {
|
if ok {
|
||||||
if cmd.SecureSettings == nil {
|
if cmd.SecureSettings == nil {
|
||||||
@ -1231,32 +1231,32 @@ func (g *GrafanaLive) HandleRemoteWriteBackendsPutHTTP(c *models.ReqContext) res
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result, err := g.pipelineStorage.UpdateRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd)
|
result, err := g.pipelineStorage.UpdateWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Failed to update remote write backend", err)
|
return response.Error(http.StatusInternalServerError, "Failed to update write config", err)
|
||||||
}
|
}
|
||||||
return response.JSON(http.StatusOK, util.DynMap{
|
return response.JSON(http.StatusOK, util.DynMap{
|
||||||
"remoteWriteBackend": pipeline.RemoteWriteBackendToDto(result),
|
"writeConfig": pipeline.WriteConfigToDto(result),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleChannelRulesDeleteHTTP ...
|
// HandleWriteConfigsDeleteHTTP ...
|
||||||
func (g *GrafanaLive) HandleRemoteWriteBackendsDeleteHTTP(c *models.ReqContext) response.Response {
|
func (g *GrafanaLive) HandleWriteConfigsDeleteHTTP(c *models.ReqContext) response.Response {
|
||||||
body, err := ioutil.ReadAll(c.Req.Body)
|
body, err := ioutil.ReadAll(c.Req.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
return response.Error(http.StatusInternalServerError, "Error reading body", err)
|
||||||
}
|
}
|
||||||
var cmd pipeline.RemoteWriteBackendDeleteCmd
|
var cmd pipeline.WriteConfigDeleteCmd
|
||||||
err = json.Unmarshal(body, &cmd)
|
err = json.Unmarshal(body, &cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err)
|
return response.Error(http.StatusBadRequest, "Error decoding write config delete command", err)
|
||||||
}
|
}
|
||||||
if cmd.UID == "" {
|
if cmd.UID == "" {
|
||||||
return response.Error(http.StatusBadRequest, "UID required", nil)
|
return response.Error(http.StatusBadRequest, "UID required", nil)
|
||||||
}
|
}
|
||||||
err = g.pipelineStorage.DeleteRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd)
|
err = g.pipelineStorage.DeleteWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return response.Error(http.StatusInternalServerError, "Failed to delete remote write backend", err)
|
return response.Error(http.StatusInternalServerError, "Failed to delete write config", err)
|
||||||
}
|
}
|
||||||
return response.JSON(http.StatusOK, util.DynMap{})
|
return response.JSON(http.StatusOK, util.DynMap{})
|
||||||
}
|
}
|
||||||
|
@ -153,53 +153,53 @@ func typeRegistered(entityType string, registry []EntityInfo) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func RemoteWriteBackendToDto(b RemoteWriteBackend) RemoteWriteBackendDto {
|
func WriteConfigToDto(b WriteConfig) WriteConfigDto {
|
||||||
secureFields := make(map[string]bool, len(b.SecureSettings))
|
secureFields := make(map[string]bool, len(b.SecureSettings))
|
||||||
for k := range b.SecureSettings {
|
for k := range b.SecureSettings {
|
||||||
secureFields[k] = true
|
secureFields[k] = true
|
||||||
}
|
}
|
||||||
return RemoteWriteBackendDto{
|
return WriteConfigDto{
|
||||||
UID: b.UID,
|
UID: b.UID,
|
||||||
Settings: b.Settings,
|
Settings: b.Settings,
|
||||||
SecureFields: secureFields,
|
SecureFields: secureFields,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackendDto struct {
|
type WriteConfigDto struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
Settings RemoteWriteSettings `json:"settings"`
|
Settings WriteSettings `json:"settings"`
|
||||||
SecureFields map[string]bool `json:"secureFields"`
|
SecureFields map[string]bool `json:"secureFields"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackendGetCmd struct {
|
type WriteConfigGetCmd struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackendCreateCmd struct {
|
type WriteConfigCreateCmd struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
Settings RemoteWriteSettings `json:"settings"`
|
Settings WriteSettings `json:"settings"`
|
||||||
SecureSettings map[string]string `json:"secureSettings"`
|
SecureSettings map[string]string `json:"secureSettings"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add version field later.
|
// TODO: add version field later.
|
||||||
type RemoteWriteBackendUpdateCmd struct {
|
type WriteConfigUpdateCmd struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
Settings RemoteWriteSettings `json:"settings"`
|
Settings WriteSettings `json:"settings"`
|
||||||
SecureSettings map[string]string `json:"secureSettings"`
|
SecureSettings map[string]string `json:"secureSettings"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackendDeleteCmd struct {
|
type WriteConfigDeleteCmd struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackend struct {
|
type WriteConfig struct {
|
||||||
OrgId int64 `json:"-"`
|
OrgId int64 `json:"-"`
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
Settings RemoteWriteSettings `json:"settings"`
|
Settings WriteSettings `json:"settings"`
|
||||||
SecureSettings map[string][]byte `json:"secureSettings,omitempty"`
|
SecureSettings map[string][]byte `json:"secureSettings,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r RemoteWriteBackend) Valid() (bool, string) {
|
func (r WriteConfig) Valid() (bool, string) {
|
||||||
if r.UID == "" {
|
if r.UID == "" {
|
||||||
return false, "uid required"
|
return false, "uid required"
|
||||||
}
|
}
|
||||||
@ -209,9 +209,7 @@ func (r RemoteWriteBackend) Valid() (bool, string) {
|
|||||||
return true, ""
|
return true, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteSettings struct {
|
type BasicAuth struct {
|
||||||
// Endpoint to send streaming frames to.
|
|
||||||
Endpoint string `json:"endpoint"`
|
|
||||||
// User is a user for remote write request.
|
// User is a user for remote write request.
|
||||||
User string `json:"user,omitempty"`
|
User string `json:"user,omitempty"`
|
||||||
// Password is a plain text non-encrypted password.
|
// Password is a plain text non-encrypted password.
|
||||||
@ -219,8 +217,15 @@ type RemoteWriteSettings struct {
|
|||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteWriteBackends struct {
|
type WriteSettings struct {
|
||||||
Backends []RemoteWriteBackend `json:"remoteWriteBackends"`
|
// Endpoint to send streaming frames to.
|
||||||
|
Endpoint string `json:"endpoint"`
|
||||||
|
// BasicAuth is an optional basic auth settings.
|
||||||
|
BasicAuth *BasicAuth `json:"basicAuth,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigs struct {
|
||||||
|
Configs []WriteConfig `json:"writeConfigs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelRules struct {
|
type ChannelRules struct {
|
||||||
@ -276,11 +281,11 @@ type ChannelRuleDeleteCmd struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error)
|
ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error)
|
||||||
GetRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendGetCmd) (RemoteWriteBackend, bool, error)
|
GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error)
|
||||||
CreateRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendCreateCmd) (RemoteWriteBackend, error)
|
CreateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error)
|
||||||
UpdateRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendUpdateCmd) (RemoteWriteBackend, error)
|
UpdateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error)
|
||||||
DeleteRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendDeleteCmd) error
|
DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error
|
||||||
ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
|
ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
|
||||||
CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)
|
CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)
|
||||||
UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)
|
UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)
|
||||||
@ -422,23 +427,31 @@ func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameCondition
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) getWritePassword(remoteWriteBackend RemoteWriteBackend) (string, error) {
|
func (f *StorageRuleBuilder) constructBasicAuth(writeConfig WriteConfig) (*BasicAuth, error) {
|
||||||
|
if writeConfig.Settings.BasicAuth == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
var password string
|
var password string
|
||||||
hasSecurePassword := len(remoteWriteBackend.SecureSettings["password"]) > 0
|
hasSecurePassword := len(writeConfig.SecureSettings["basicAuthPassword"]) > 0
|
||||||
if hasSecurePassword {
|
if hasSecurePassword {
|
||||||
passwordBytes, err := f.EncryptionService.Decrypt(context.Background(), remoteWriteBackend.SecureSettings["password"], setting.SecretKey)
|
passwordBytes, err := f.EncryptionService.Decrypt(context.Background(), writeConfig.SecureSettings["basicAuthPassword"], setting.SecretKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("password can't be decrypted: %w", err)
|
return nil, fmt.Errorf("basicAuthPassword can't be decrypted: %w", err)
|
||||||
}
|
}
|
||||||
password = string(passwordBytes)
|
password = string(passwordBytes)
|
||||||
} else {
|
} else {
|
||||||
// Use plain text password (should be removed upon database integration).
|
// Use plain text password (should be removed upon database integration).
|
||||||
password = remoteWriteBackend.Settings.Password
|
if writeConfig.Settings.BasicAuth != nil {
|
||||||
|
password = writeConfig.Settings.BasicAuth.Password
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return password, nil
|
return &BasicAuth{
|
||||||
|
User: writeConfig.Settings.BasicAuth.User,
|
||||||
|
Password: password,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (FrameOutputter, error) {
|
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, writeConfigs []WriteConfig) (FrameOutputter, error) {
|
||||||
if config == nil {
|
if config == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -456,7 +469,7 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
|
|||||||
var outputters []FrameOutputter
|
var outputters []FrameOutputter
|
||||||
for _, outConf := range config.MultipleOutputterConfig.Outputters {
|
for _, outConf := range config.MultipleOutputterConfig.Outputters {
|
||||||
out := outConf
|
out := outConf
|
||||||
outputter, err := f.extractFrameOutputter(&out, remoteWriteBackends)
|
outputter, err := f.extractFrameOutputter(&out, writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -475,7 +488,7 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, remoteWriteBackends)
|
outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -489,36 +502,34 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
|
|||||||
if config.RemoteWriteOutputConfig == nil {
|
if config.RemoteWriteOutputConfig == nil {
|
||||||
return nil, missingConfiguration
|
return nil, missingConfiguration
|
||||||
}
|
}
|
||||||
remoteWriteBackend, ok := f.getRemoteWriteBackend(config.RemoteWriteOutputConfig.UID, remoteWriteBackends)
|
writeConfig, ok := f.getWriteConfig(config.RemoteWriteOutputConfig.UID, writeConfigs)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unknown remote write backend uid: %s", config.RemoteWriteOutputConfig.UID)
|
return nil, fmt.Errorf("unknown write config uid: %s", config.RemoteWriteOutputConfig.UID)
|
||||||
}
|
}
|
||||||
password, err := f.getWritePassword(remoteWriteBackend)
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error getting password: %w", err)
|
return nil, fmt.Errorf("error getting password: %w", err)
|
||||||
}
|
}
|
||||||
return NewRemoteWriteFrameOutput(
|
return NewRemoteWriteFrameOutput(
|
||||||
remoteWriteBackend.Settings.Endpoint,
|
writeConfig.Settings.Endpoint,
|
||||||
remoteWriteBackend.Settings.User,
|
basicAuth,
|
||||||
password,
|
|
||||||
config.RemoteWriteOutputConfig.SampleMilliseconds,
|
config.RemoteWriteOutputConfig.SampleMilliseconds,
|
||||||
), nil
|
), nil
|
||||||
case FrameOutputTypeLoki:
|
case FrameOutputTypeLoki:
|
||||||
if config.LokiOutputConfig == nil {
|
if config.LokiOutputConfig == nil {
|
||||||
return nil, missingConfiguration
|
return nil, missingConfiguration
|
||||||
}
|
}
|
||||||
remoteWriteBackend, ok := f.getRemoteWriteBackend(config.LokiOutputConfig.UID, remoteWriteBackends)
|
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
||||||
}
|
}
|
||||||
password, err := f.getWritePassword(remoteWriteBackend)
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error getting password: %w", err)
|
return nil, fmt.Errorf("error getting password: %w", err)
|
||||||
}
|
}
|
||||||
return NewLokiFrameOutput(
|
return NewLokiFrameOutput(
|
||||||
remoteWriteBackend.Settings.Endpoint,
|
writeConfig.Settings.Endpoint,
|
||||||
remoteWriteBackend.Settings.User,
|
basicAuth,
|
||||||
password,
|
|
||||||
), nil
|
), nil
|
||||||
case FrameOutputTypeChangeLog:
|
case FrameOutputTypeChangeLog:
|
||||||
if config.ChangeLogOutputConfig == nil {
|
if config.ChangeLogOutputConfig == nil {
|
||||||
@ -530,7 +541,7 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (DataOutputter, error) {
|
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, writeConfigs []WriteConfig) (DataOutputter, error) {
|
||||||
if config == nil {
|
if config == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -545,18 +556,17 @@ func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, r
|
|||||||
if config.LokiOutputConfig == nil {
|
if config.LokiOutputConfig == nil {
|
||||||
return nil, missingConfiguration
|
return nil, missingConfiguration
|
||||||
}
|
}
|
||||||
remoteWriteBackend, ok := f.getRemoteWriteBackend(config.LokiOutputConfig.UID, remoteWriteBackends)
|
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
||||||
}
|
}
|
||||||
password, err := f.getWritePassword(remoteWriteBackend)
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error getting password: %w", err)
|
return nil, fmt.Errorf("error constructing basicAuth: %w", err)
|
||||||
}
|
}
|
||||||
return NewLokiDataOutput(
|
return NewLokiDataOutput(
|
||||||
remoteWriteBackend.Settings.Endpoint,
|
writeConfig.Settings.Endpoint,
|
||||||
remoteWriteBackend.Settings.User,
|
basicAuth,
|
||||||
password,
|
|
||||||
), nil
|
), nil
|
||||||
case DataOutputTypeBuiltin:
|
case DataOutputTypeBuiltin:
|
||||||
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
|
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
|
||||||
@ -567,13 +577,13 @@ func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) getRemoteWriteBackend(uid string, remoteWriteBackends []RemoteWriteBackend) (RemoteWriteBackend, bool) {
|
func (f *StorageRuleBuilder) getWriteConfig(uid string, writeConfigs []WriteConfig) (WriteConfig, bool) {
|
||||||
for _, rwb := range remoteWriteBackends {
|
for _, rwb := range writeConfigs {
|
||||||
if rwb.UID == uid {
|
if rwb.UID == uid {
|
||||||
return rwb, true
|
return rwb, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return RemoteWriteBackend{}, false
|
return WriteConfig{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error) {
|
func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error) {
|
||||||
@ -582,7 +592,7 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteWriteBackends, err := f.Storage.ListRemoteWriteBackends(ctx, orgID)
|
writeConfigs, err := f.Storage.ListWriteConfigs(ctx, orgID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -622,7 +632,7 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
|
|||||||
|
|
||||||
var dataOutputters []DataOutputter
|
var dataOutputters []DataOutputter
|
||||||
for _, outConfig := range ruleConfig.Settings.DataOutputters {
|
for _, outConfig := range ruleConfig.Settings.DataOutputters {
|
||||||
out, err := f.extractDataOutputter(outConfig, remoteWriteBackends)
|
out, err := f.extractDataOutputter(outConfig, writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
|
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
|
||||||
}
|
}
|
||||||
@ -632,7 +642,7 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
|
|||||||
|
|
||||||
var outputters []FrameOutputter
|
var outputters []FrameOutputter
|
||||||
for _, outConfig := range ruleConfig.Settings.FrameOutputters {
|
for _, outConfig := range ruleConfig.Settings.FrameOutputters {
|
||||||
out, err := f.extractFrameOutputter(outConfig, remoteWriteBackends)
|
out, err := f.extractFrameOutputter(outConfig, writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err)
|
return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err)
|
||||||
}
|
}
|
||||||
|
@ -10,9 +10,9 @@ type LokiDataOutput struct {
|
|||||||
lokiWriter *lokiWriter
|
lokiWriter *lokiWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLokiDataOutput(endpoint, user, password string) *LokiDataOutput {
|
func NewLokiDataOutput(endpoint string, basicAuth *BasicAuth) *LokiDataOutput {
|
||||||
return &LokiDataOutput{
|
return &LokiDataOutput{
|
||||||
lokiWriter: newLokiWriter(endpoint, user, password),
|
lokiWriter: newLokiWriter(endpoint, basicAuth),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,8 +103,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
|
|||||||
DataOutputters: []DataOutputter{
|
DataOutputters: []DataOutputter{
|
||||||
NewLokiDataOutput(
|
NewLokiDataOutput(
|
||||||
os.Getenv("GF_LIVE_LOKI_ENDPOINT"),
|
os.Getenv("GF_LIVE_LOKI_ENDPOINT"),
|
||||||
os.Getenv("GF_LIVE_LOKI_USER"),
|
&BasicAuth{
|
||||||
os.Getenv("GF_LIVE_LOKI_PASSWORD"),
|
User: os.Getenv("GF_LIVE_LOKI_USER"),
|
||||||
|
Password: os.Getenv("GF_LIVE_LOKI_PASSWORD"),
|
||||||
|
},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
|
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
|
||||||
@ -112,8 +114,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
|
|||||||
NewManagedStreamFrameOutput(f.ManagedStream),
|
NewManagedStreamFrameOutput(f.ManagedStream),
|
||||||
NewRemoteWriteFrameOutput(
|
NewRemoteWriteFrameOutput(
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
&BasicAuth{
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
||||||
|
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
||||||
|
},
|
||||||
1000,
|
1000,
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@ -298,8 +302,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
|
|||||||
NewManagedStreamFrameOutput(f.ManagedStream),
|
NewManagedStreamFrameOutput(f.ManagedStream),
|
||||||
NewRemoteWriteFrameOutput(
|
NewRemoteWriteFrameOutput(
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
&BasicAuth{
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
||||||
|
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
||||||
|
},
|
||||||
0,
|
0,
|
||||||
),
|
),
|
||||||
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
|
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
|
||||||
@ -333,8 +339,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
|
|||||||
NewManagedStreamFrameOutput(f.ManagedStream),
|
NewManagedStreamFrameOutput(f.ManagedStream),
|
||||||
NewRemoteWriteFrameOutput(
|
NewRemoteWriteFrameOutput(
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
&BasicAuth{
|
||||||
os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
||||||
|
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
||||||
|
},
|
||||||
0,
|
0,
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
@ -20,9 +20,9 @@ type LokiFrameOutput struct {
|
|||||||
lokiWriter *lokiWriter
|
lokiWriter *lokiWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLokiFrameOutput(endpoint, user, password string) *LokiFrameOutput {
|
func NewLokiFrameOutput(endpoint string, basicAuth *BasicAuth) *LokiFrameOutput {
|
||||||
return &LokiFrameOutput{
|
return &LokiFrameOutput{
|
||||||
lokiWriter: newLokiWriter(endpoint, user, password),
|
lokiWriter: newLokiWriter(endpoint, basicAuth),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,18 +65,14 @@ type lokiWriter struct {
|
|||||||
buffer []LokiStream
|
buffer []LokiStream
|
||||||
|
|
||||||
// Endpoint to send streaming frames to.
|
// Endpoint to send streaming frames to.
|
||||||
endpoint string
|
endpoint string
|
||||||
// User is a user for remote write request.
|
basicAuth *BasicAuth
|
||||||
user string
|
|
||||||
// Password for remote write endpoint.
|
|
||||||
password string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLokiWriter(endpoint, user, password string) *lokiWriter {
|
func newLokiWriter(endpoint string, basicAuth *BasicAuth) *lokiWriter {
|
||||||
w := &lokiWriter{
|
w := &lokiWriter{
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
user: user,
|
basicAuth: basicAuth,
|
||||||
password: password,
|
|
||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
Timeout: 2 * time.Second,
|
Timeout: 2 * time.Second,
|
||||||
},
|
},
|
||||||
@ -129,8 +125,8 @@ func (w *lokiWriter) flush(streams []LokiStream) error {
|
|||||||
return fmt.Errorf("error constructing loki push request: %w", err)
|
return fmt.Errorf("error constructing loki push request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
if w.user != "" {
|
if w.basicAuth != nil {
|
||||||
req.SetBasicAuth(w.user, w.password)
|
req.SetBasicAuth(w.basicAuth.User, w.basicAuth.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
|
@ -21,10 +21,10 @@ type RemoteWriteFrameOutput struct {
|
|||||||
|
|
||||||
// Endpoint to send streaming frames to.
|
// Endpoint to send streaming frames to.
|
||||||
Endpoint string
|
Endpoint string
|
||||||
// User is a user for remote write request.
|
|
||||||
User string
|
// BasicAuth is an optional basic auth params.
|
||||||
// Password for remote write endpoint.
|
BasicAuth *BasicAuth
|
||||||
Password string
|
|
||||||
// SampleMilliseconds allow defining an interval to sample points inside a channel
|
// SampleMilliseconds allow defining an interval to sample points inside a channel
|
||||||
// when outputting to remote write endpoint (on __name__ label basis). For example
|
// when outputting to remote write endpoint (on __name__ label basis). For example
|
||||||
// when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a
|
// when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a
|
||||||
@ -38,11 +38,10 @@ type RemoteWriteFrameOutput struct {
|
|||||||
buffer []prompb.TimeSeries
|
buffer []prompb.TimeSeries
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemoteWriteFrameOutput(endpoint, user, password string, sampleMilliseconds int64) *RemoteWriteFrameOutput {
|
func NewRemoteWriteFrameOutput(endpoint string, basicAuth *BasicAuth, sampleMilliseconds int64) *RemoteWriteFrameOutput {
|
||||||
out := &RemoteWriteFrameOutput{
|
out := &RemoteWriteFrameOutput{
|
||||||
Endpoint: endpoint,
|
Endpoint: endpoint,
|
||||||
User: user,
|
BasicAuth: basicAuth,
|
||||||
Password: password,
|
|
||||||
SampleMilliseconds: sampleMilliseconds,
|
SampleMilliseconds: sampleMilliseconds,
|
||||||
httpClient: &http.Client{Timeout: 2 * time.Second},
|
httpClient: &http.Client{Timeout: 2 * time.Second},
|
||||||
}
|
}
|
||||||
@ -153,8 +152,8 @@ func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error {
|
|||||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||||
req.Header.Set("Content-Encoding", "snappy")
|
req.Header.Set("Content-Encoding", "snappy")
|
||||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||||
if out.User != "" {
|
if out.BasicAuth != nil {
|
||||||
req.SetBasicAuth(out.User, out.Password)
|
req.SetBasicAuth(out.BasicAuth.User, out.BasicAuth.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
|
@ -51,7 +51,7 @@ func TestRemoteWriteFrameOutput_sample(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
out := NewRemoteWriteFrameOutput("", "", "", 500)
|
out := NewRemoteWriteFrameOutput("", nil, 500)
|
||||||
sampledTimeSeries := out.sample(timeSeries)
|
sampledTimeSeries := out.sample(timeSeries)
|
||||||
require.Len(t, sampledTimeSeries, 2)
|
require.Len(t, sampledTimeSeries, 2)
|
||||||
|
|
||||||
@ -123,7 +123,7 @@ func TestRemoteWriteFrameOutput_sample_merge(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
out := NewRemoteWriteFrameOutput("", "", "", 50)
|
out := NewRemoteWriteFrameOutput("", nil, 50)
|
||||||
sampledTimeSeries := out.sample(timeSeries)
|
sampledTimeSeries := out.sample(timeSeries)
|
||||||
require.Len(t, sampledTimeSeries, 2)
|
require.Len(t, sampledTimeSeries, 2)
|
||||||
|
|
||||||
|
@ -20,45 +20,37 @@ type FileStorage struct {
|
|||||||
EncryptionService encryption.Service
|
EncryptionService encryption.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error) {
|
func (f *FileStorage) ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error) {
|
||||||
cfgfile := filepath.Join(f.DataPath, "pipeline", "remote-write-backends.json")
|
writeConfigs, err := f.readWriteConfigs()
|
||||||
var backends []RemoteWriteBackend
|
|
||||||
// Safe to ignore gosec warning G304.
|
|
||||||
// nolint:gosec
|
|
||||||
backendBytes, err := ioutil.ReadFile(cfgfile)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backends, fmt.Errorf("can't read %s file: %w", cfgfile, err)
|
return nil, fmt.Errorf("can't read write configs: %w", err)
|
||||||
}
|
}
|
||||||
var remoteWriteBackends RemoteWriteBackends
|
var orgConfigs []WriteConfig
|
||||||
err = json.Unmarshal(backendBytes, &remoteWriteBackends)
|
for _, b := range writeConfigs.Configs {
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't unmarshal remote-write-backends.json data: %w", err)
|
|
||||||
}
|
|
||||||
for _, b := range remoteWriteBackends.Backends {
|
|
||||||
if b.OrgId == orgID || (orgID == 1 && b.OrgId == 0) {
|
if b.OrgId == orgID || (orgID == 1 && b.OrgId == 0) {
|
||||||
backends = append(backends, b)
|
orgConfigs = append(orgConfigs, b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return backends, nil
|
return orgConfigs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) GetRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendGetCmd) (RemoteWriteBackend, bool, error) {
|
func (f *FileStorage) GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error) {
|
||||||
remoteWriteBackends, err := f.readRemoteWriteBackends()
|
writeConfigs, err := f.readWriteConfigs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackend{}, false, fmt.Errorf("can't remote write backends: %w", err)
|
return WriteConfig{}, false, fmt.Errorf("can't read write configs: %w", err)
|
||||||
}
|
}
|
||||||
for _, existingBackend := range remoteWriteBackends.Backends {
|
for _, existingBackend := range writeConfigs.Configs {
|
||||||
if uidMatch(orgID, cmd.UID, existingBackend) {
|
if uidMatch(orgID, cmd.UID, existingBackend) {
|
||||||
return existingBackend, true, nil
|
return existingBackend, true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return RemoteWriteBackend{}, false, nil
|
return WriteConfig{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) CreateRemoteWriteBackend(ctx context.Context, orgID int64, cmd RemoteWriteBackendCreateCmd) (RemoteWriteBackend, error) {
|
func (f *FileStorage) CreateWriteConfig(ctx context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error) {
|
||||||
remoteWriteBackends, err := f.readRemoteWriteBackends()
|
writeConfigs, err := f.readWriteConfigs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("can't read remote write backends: %w", err)
|
return WriteConfig{}, fmt.Errorf("can't read write configs: %w", err)
|
||||||
}
|
}
|
||||||
if cmd.UID == "" {
|
if cmd.UID == "" {
|
||||||
cmd.UID = util.GenerateShortUID()
|
cmd.UID = util.GenerateShortUID()
|
||||||
@ -66,10 +58,10 @@ func (f *FileStorage) CreateRemoteWriteBackend(ctx context.Context, orgID int64,
|
|||||||
|
|
||||||
secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey)
|
secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("error encrypting data: %w", err)
|
return WriteConfig{}, fmt.Errorf("error encrypting data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
backend := RemoteWriteBackend{
|
backend := WriteConfig{
|
||||||
OrgId: orgID,
|
OrgId: orgID,
|
||||||
UID: cmd.UID,
|
UID: cmd.UID,
|
||||||
Settings: cmd.Settings,
|
Settings: cmd.Settings,
|
||||||
@ -78,30 +70,30 @@ func (f *FileStorage) CreateRemoteWriteBackend(ctx context.Context, orgID int64,
|
|||||||
|
|
||||||
ok, reason := backend.Valid()
|
ok, reason := backend.Valid()
|
||||||
if !ok {
|
if !ok {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("invalid remote write backend: %s", reason)
|
return WriteConfig{}, fmt.Errorf("invalid write config: %s", reason)
|
||||||
}
|
}
|
||||||
for _, existingBackend := range remoteWriteBackends.Backends {
|
for _, existingBackend := range writeConfigs.Configs {
|
||||||
if uidMatch(orgID, backend.UID, existingBackend) {
|
if uidMatch(orgID, backend.UID, existingBackend) {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("backend already exists in org: %s", backend.UID)
|
return WriteConfig{}, fmt.Errorf("backend already exists in org: %s", backend.UID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
remoteWriteBackends.Backends = append(remoteWriteBackends.Backends, backend)
|
writeConfigs.Configs = append(writeConfigs.Configs, backend)
|
||||||
err = f.saveRemoteWriteBackends(orgID, remoteWriteBackends)
|
err = f.saveWriteConfigs(orgID, writeConfigs)
|
||||||
return backend, err
|
return backend, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) UpdateRemoteWriteBackend(ctx context.Context, orgID int64, cmd RemoteWriteBackendUpdateCmd) (RemoteWriteBackend, error) {
|
func (f *FileStorage) UpdateWriteConfig(ctx context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error) {
|
||||||
remoteWriteBackends, err := f.readRemoteWriteBackends()
|
writeConfigs, err := f.readWriteConfigs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("can't read remote write backends: %w", err)
|
return WriteConfig{}, fmt.Errorf("can't read write configs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey)
|
secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("error encrypting data: %w", err)
|
return WriteConfig{}, fmt.Errorf("error encrypting data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
backend := RemoteWriteBackend{
|
backend := WriteConfig{
|
||||||
OrgId: orgID,
|
OrgId: orgID,
|
||||||
UID: cmd.UID,
|
UID: cmd.UID,
|
||||||
Settings: cmd.Settings,
|
Settings: cmd.Settings,
|
||||||
@ -110,35 +102,35 @@ func (f *FileStorage) UpdateRemoteWriteBackend(ctx context.Context, orgID int64,
|
|||||||
|
|
||||||
ok, reason := backend.Valid()
|
ok, reason := backend.Valid()
|
||||||
if !ok {
|
if !ok {
|
||||||
return RemoteWriteBackend{}, fmt.Errorf("invalid channel rule: %s", reason)
|
return WriteConfig{}, fmt.Errorf("invalid channel rule: %s", reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
index := -1
|
index := -1
|
||||||
|
|
||||||
for i, existingBackend := range remoteWriteBackends.Backends {
|
for i, existingBackend := range writeConfigs.Configs {
|
||||||
if uidMatch(orgID, backend.UID, existingBackend) {
|
if uidMatch(orgID, backend.UID, existingBackend) {
|
||||||
index = i
|
index = i
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if index > -1 {
|
if index > -1 {
|
||||||
remoteWriteBackends.Backends[index] = backend
|
writeConfigs.Configs[index] = backend
|
||||||
} else {
|
} else {
|
||||||
return f.CreateRemoteWriteBackend(ctx, orgID, RemoteWriteBackendCreateCmd(cmd))
|
return f.CreateWriteConfig(ctx, orgID, WriteConfigCreateCmd(cmd))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.saveRemoteWriteBackends(orgID, remoteWriteBackends)
|
err = f.saveWriteConfigs(orgID, writeConfigs)
|
||||||
return backend, err
|
return backend, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) DeleteRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendDeleteCmd) error {
|
func (f *FileStorage) DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error {
|
||||||
remoteWriteBackends, err := f.readRemoteWriteBackends()
|
writeConfigs, err := f.readWriteConfigs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't read remote write backends: %w", err)
|
return fmt.Errorf("can't read write configs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
index := -1
|
index := -1
|
||||||
for i, existingBackend := range remoteWriteBackends.Backends {
|
for i, existingBackend := range writeConfigs.Configs {
|
||||||
if uidMatch(orgID, cmd.UID, existingBackend) {
|
if uidMatch(orgID, cmd.UID, existingBackend) {
|
||||||
index = i
|
index = i
|
||||||
break
|
break
|
||||||
@ -146,12 +138,12 @@ func (f *FileStorage) DeleteRemoteWriteBackend(_ context.Context, orgID int64, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
if index > -1 {
|
if index > -1 {
|
||||||
remoteWriteBackends.Backends = removeRemoteWriteBackendByIndex(remoteWriteBackends.Backends, index)
|
writeConfigs.Configs = removeWriteConfigByIndex(writeConfigs.Configs, index)
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("remote write backend not found")
|
return fmt.Errorf("write config not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
return f.saveRemoteWriteBackends(orgID, remoteWriteBackends)
|
return f.saveWriteConfigs(orgID, writeConfigs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) {
|
func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) {
|
||||||
@ -198,7 +190,7 @@ func patternMatch(orgID int64, pattern string, existingRule ChannelRule) bool {
|
|||||||
return pattern == existingRule.Pattern && (existingRule.OrgId == orgID || (existingRule.OrgId == 0 && orgID == 1))
|
return pattern == existingRule.Pattern && (existingRule.OrgId == orgID || (existingRule.OrgId == 0 && orgID == 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
func uidMatch(orgID int64, uid string, existingBackend RemoteWriteBackend) bool {
|
func uidMatch(orgID int64, uid string, existingBackend WriteConfig) bool {
|
||||||
return uid == existingBackend.UID && (existingBackend.OrgId == orgID || (existingBackend.OrgId == 0 && orgID == 1))
|
return uid == existingBackend.UID && (existingBackend.OrgId == orgID || (existingBackend.OrgId == 0 && orgID == 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,44 +298,44 @@ func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, cmd Chan
|
|||||||
return f.saveChannelRules(orgID, channelRules)
|
return f.saveChannelRules(orgID, channelRules)
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeRemoteWriteBackendByIndex(s []RemoteWriteBackend, index int) []RemoteWriteBackend {
|
func removeWriteConfigByIndex(s []WriteConfig, index int) []WriteConfig {
|
||||||
return append(s[:index], s[index+1:]...)
|
return append(s[:index], s[index+1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) remoteWriteFilePath() string {
|
func (f *FileStorage) writeConfigsFilePath() string {
|
||||||
return filepath.Join(f.DataPath, "pipeline", "remote-write-backends.json")
|
return filepath.Join(f.DataPath, "pipeline", "write-configs.json")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) readRemoteWriteBackends() (RemoteWriteBackends, error) {
|
func (f *FileStorage) readWriteConfigs() (WriteConfigs, error) {
|
||||||
filePath := f.remoteWriteFilePath()
|
filePath := f.writeConfigsFilePath()
|
||||||
// Safe to ignore gosec warning G304.
|
// Safe to ignore gosec warning G304.
|
||||||
// nolint:gosec
|
// nolint:gosec
|
||||||
bytes, err := ioutil.ReadFile(filePath)
|
bytes, err := ioutil.ReadFile(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackends{}, fmt.Errorf("can't read %s file: %w", filePath, err)
|
return WriteConfigs{}, fmt.Errorf("can't read %s file: %w", filePath, err)
|
||||||
}
|
}
|
||||||
var remoteWriteBackends RemoteWriteBackends
|
var writeConfigs WriteConfigs
|
||||||
err = json.Unmarshal(bytes, &remoteWriteBackends)
|
err = json.Unmarshal(bytes, &writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return RemoteWriteBackends{}, fmt.Errorf("can't unmarshal %s data: %w", filePath, err)
|
return WriteConfigs{}, fmt.Errorf("can't unmarshal %s data: %w", filePath, err)
|
||||||
}
|
}
|
||||||
return remoteWriteBackends, nil
|
return writeConfigs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileStorage) saveRemoteWriteBackends(_ int64, remoteWriteBackends RemoteWriteBackends) error {
|
func (f *FileStorage) saveWriteConfigs(_ int64, writeConfigs WriteConfigs) error {
|
||||||
filePath := f.remoteWriteFilePath()
|
filePath := f.writeConfigsFilePath()
|
||||||
// Safe to ignore gosec warning G304.
|
// Safe to ignore gosec warning G304.
|
||||||
// nolint:gosec
|
// nolint:gosec
|
||||||
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open channel remote write backends file: %w", err)
|
return fmt.Errorf("can't open channel write configs file: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = file.Close() }()
|
defer func() { _ = file.Close() }()
|
||||||
enc := json.NewEncoder(file)
|
enc := json.NewEncoder(file)
|
||||||
enc.SetIndent("", " ")
|
enc.SetIndent("", " ")
|
||||||
err = enc.Encode(remoteWriteBackends)
|
err = enc.Encode(writeConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't save remote write backends to file: %w", err)
|
return fmt.Errorf("can't save write configs to file: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -15,9 +15,9 @@ export default function CloudAdminPage() {
|
|||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
getBackendSrv()
|
getBackendSrv()
|
||||||
.get(`api/live/remote-write-backends`)
|
.get(`api/live/write-configs`)
|
||||||
.then((data) => {
|
.then((data) => {
|
||||||
setCloud(data.remoteWriteBackends);
|
setCloud(data.writeConfigs);
|
||||||
})
|
})
|
||||||
.catch((e) => {
|
.catch((e) => {
|
||||||
if (e.data) {
|
if (e.data) {
|
||||||
|
Loading…
Reference in New Issue
Block a user