mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: generate ts definitions for pipeline configs (#41544)
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
parent
466eaeb4f0
commit
4cd2575ad0
8
Makefile
8
Makefile
@ -7,7 +7,7 @@ WIRE_TAGS = "oss"
|
|||||||
-include local/Makefile
|
-include local/Makefile
|
||||||
include .bingo/Variables.mk
|
include .bingo/Variables.mk
|
||||||
|
|
||||||
.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go golangci-lint test-go test-js test run run-frontend clean devenv devenv-down protobuf drone help
|
.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go golangci-lint test-go test-js gen-ts test run run-frontend clean devenv devenv-down protobuf drone help
|
||||||
|
|
||||||
GO = go
|
GO = go
|
||||||
GO_FILES ?= ./pkg/...
|
GO_FILES ?= ./pkg/...
|
||||||
@ -147,6 +147,12 @@ clean: ## Clean up intermediate build artifacts.
|
|||||||
rm -rf node_modules
|
rm -rf node_modules
|
||||||
rm -rf public/build
|
rm -rf public/build
|
||||||
|
|
||||||
|
gen-ts:
|
||||||
|
@echo "generating TypeScript definitions"
|
||||||
|
go get github.com/tkrajina/typescriptify-golang-structs/typescriptify@v0.1.7
|
||||||
|
tscriptify -interface -package=github.com/grafana/grafana/pkg/services/live/pipeline -import="import { FieldConfig } from '@grafana/data'" -target=public/app/features/live/pipeline/models.gen.ts pkg/services/live/pipeline/config.go
|
||||||
|
go mod tidy
|
||||||
|
|
||||||
# This repository's configuration is protected (https://readme.drone.io/signature/).
|
# This repository's configuration is protected (https://readme.drone.io/signature/).
|
||||||
# Use this make target to regenerate the configuration YAML files when
|
# Use this make target to regenerate the configuration YAML files when
|
||||||
# you modify starlark files.
|
# you modify starlark files.
|
||||||
|
@ -1,86 +1,10 @@
|
|||||||
package pipeline
|
package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/services/secrets"
|
|
||||||
|
|
||||||
"github.com/centrifugal/centrifuge"
|
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/models"
|
||||||
"github.com/grafana/grafana/pkg/services/live/managedstream"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/pipeline/pattern"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type JsonAutoSettings struct{}
|
|
||||||
|
|
||||||
type ConverterConfig struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
AutoJsonConverterConfig *AutoJsonConverterConfig `json:"jsonAuto,omitempty"`
|
|
||||||
ExactJsonConverterConfig *ExactJsonConverterConfig `json:"jsonExact,omitempty"`
|
|
||||||
AutoInfluxConverterConfig *AutoInfluxConverterConfig `json:"influxAuto,omitempty"`
|
|
||||||
JsonFrameConverterConfig *JsonFrameConverterConfig `json:"jsonFrame,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type FrameProcessorConfig struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
DropFieldsProcessorConfig *DropFieldsFrameProcessorConfig `json:"dropFields,omitempty"`
|
|
||||||
KeepFieldsProcessorConfig *KeepFieldsFrameProcessorConfig `json:"keepFields,omitempty"`
|
|
||||||
MultipleProcessorConfig *MultipleFrameProcessorConfig `json:"multiple,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type MultipleFrameProcessorConfig struct {
|
|
||||||
Processors []FrameProcessorConfig `json:"processors"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type MultipleOutputterConfig struct {
|
|
||||||
Outputters []FrameOutputterConfig `json:"outputs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ManagedStreamOutputConfig struct{}
|
|
||||||
|
|
||||||
type ConditionalOutputConfig struct {
|
|
||||||
Condition *FrameConditionCheckerConfig `json:"condition"`
|
|
||||||
Outputter *FrameOutputterConfig `json:"output"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type RemoteWriteOutputConfig struct {
|
|
||||||
UID string `json:"uid"`
|
|
||||||
SampleMilliseconds int64 `json:"sampleMilliseconds"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type LokiOutputConfig struct {
|
|
||||||
UID string `json:"uid"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type FrameOutputterConfig struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
ManagedStreamConfig *ManagedStreamOutputConfig `json:"managedStream,omitempty"`
|
|
||||||
MultipleOutputterConfig *MultipleOutputterConfig `json:"multiple,omitempty"`
|
|
||||||
RedirectOutputConfig *RedirectOutputConfig `json:"redirect,omitempty"`
|
|
||||||
ConditionalOutputConfig *ConditionalOutputConfig `json:"conditional,omitempty"`
|
|
||||||
ThresholdOutputConfig *ThresholdOutputConfig `json:"threshold,omitempty"`
|
|
||||||
RemoteWriteOutputConfig *RemoteWriteOutputConfig `json:"remoteWrite,omitempty"`
|
|
||||||
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
|
|
||||||
ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type DataOutputterConfig struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"`
|
|
||||||
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type MultipleSubscriberConfig struct {
|
|
||||||
Subscribers []SubscriberConfig `json:"subscribers"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type SubscriberConfig struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ChannelAuthCheckConfig is used to define auth rules for a channel.
|
// ChannelAuthCheckConfig is used to define auth rules for a channel.
|
||||||
type ChannelAuthCheckConfig struct {
|
type ChannelAuthCheckConfig struct {
|
||||||
RequireRole models.RoleType `json:"role,omitempty"`
|
RequireRole models.RoleType `json:"role,omitempty"`
|
||||||
@ -109,148 +33,86 @@ type ChannelRule struct {
|
|||||||
Settings ChannelRuleSettings `json:"settings"`
|
Settings ChannelRuleSettings `json:"settings"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r ChannelRule) Valid() (bool, string) {
|
type ConverterConfig struct {
|
||||||
ok, reason := pattern.Valid(r.Pattern)
|
Type string `json:"type" ts_type:"Omit<keyof ConverterConfig, 'type'>"`
|
||||||
if !ok {
|
AutoJsonConverterConfig *AutoJsonConverterConfig `json:"jsonAuto,omitempty"`
|
||||||
return false, fmt.Sprintf("invalid pattern: %s", reason)
|
ExactJsonConverterConfig *ExactJsonConverterConfig `json:"jsonExact,omitempty"`
|
||||||
}
|
AutoInfluxConverterConfig *AutoInfluxConverterConfig `json:"influxAuto,omitempty"`
|
||||||
if r.Settings.Converter != nil {
|
JsonFrameConverterConfig *JsonFrameConverterConfig `json:"jsonFrame,omitempty"`
|
||||||
if !typeRegistered(r.Settings.Converter.Type, ConvertersRegistry) {
|
|
||||||
return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(r.Settings.Subscribers) > 0 {
|
|
||||||
for _, sub := range r.Settings.Subscribers {
|
|
||||||
if !typeRegistered(sub.Type, SubscribersRegistry) {
|
|
||||||
return false, fmt.Sprintf("unknown subscriber type: %s", sub.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(r.Settings.FrameProcessors) > 0 {
|
|
||||||
for _, proc := range r.Settings.FrameProcessors {
|
|
||||||
if !typeRegistered(proc.Type, FrameProcessorsRegistry) {
|
|
||||||
return false, fmt.Sprintf("unknown processor type: %s", proc.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(r.Settings.FrameOutputters) > 0 {
|
|
||||||
for _, out := range r.Settings.FrameOutputters {
|
|
||||||
if !typeRegistered(out.Type, FrameOutputsRegistry) {
|
|
||||||
return false, fmt.Sprintf("unknown output type: %s", out.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true, ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func typeRegistered(entityType string, registry []EntityInfo) bool {
|
type DropFieldsFrameProcessorConfig struct {
|
||||||
for _, info := range registry {
|
FieldNames []string `json:"fieldNames"`
|
||||||
if info.Type == entityType {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func WriteConfigToDto(b WriteConfig) WriteConfigDto {
|
type KeepFieldsFrameProcessorConfig struct {
|
||||||
secureFields := make(map[string]bool, len(b.SecureSettings))
|
FieldNames []string `json:"fieldNames"`
|
||||||
for k := range b.SecureSettings {
|
|
||||||
secureFields[k] = true
|
|
||||||
}
|
|
||||||
return WriteConfigDto{
|
|
||||||
UID: b.UID,
|
|
||||||
Settings: b.Settings,
|
|
||||||
SecureFields: secureFields,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteConfigDto struct {
|
type FrameProcessorConfig struct {
|
||||||
UID string `json:"uid"`
|
Type string `json:"type" ts_type:"Omit<keyof FrameProcessorConfig, 'type'>"`
|
||||||
Settings WriteSettings `json:"settings"`
|
DropFieldsProcessorConfig *DropFieldsFrameProcessorConfig `json:"dropFields,omitempty"`
|
||||||
SecureFields map[string]bool `json:"secureFields"`
|
KeepFieldsProcessorConfig *KeepFieldsFrameProcessorConfig `json:"keepFields,omitempty"`
|
||||||
|
MultipleProcessorConfig *MultipleFrameProcessorConfig `json:"multiple,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteConfigGetCmd struct {
|
type MultipleFrameProcessorConfig struct {
|
||||||
|
Processors []FrameProcessorConfig `json:"processors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultipleOutputterConfig struct {
|
||||||
|
Outputters []FrameOutputterConfig `json:"outputs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConditionalOutputConfig struct {
|
||||||
|
Condition *FrameConditionCheckerConfig `json:"condition"`
|
||||||
|
Outputter *FrameOutputterConfig `json:"output"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoteWriteOutputConfig struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
SampleMilliseconds int64 `json:"sampleMilliseconds"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LokiOutputConfig struct {
|
||||||
UID string `json:"uid"`
|
UID string `json:"uid"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteConfigCreateCmd struct {
|
type MultipleSubscriberConfig struct {
|
||||||
UID string `json:"uid"`
|
Subscribers []SubscriberConfig `json:"subscribers"`
|
||||||
Settings WriteSettings `json:"settings"`
|
|
||||||
SecureSettings map[string]string `json:"secureSettings"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add version field later.
|
type SubscriberConfig struct {
|
||||||
type WriteConfigUpdateCmd struct {
|
Type string `json:"type" ts_type:"Omit<keyof SubscriberConfig, 'type'>"`
|
||||||
UID string `json:"uid"`
|
MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
|
||||||
Settings WriteSettings `json:"settings"`
|
|
||||||
SecureSettings map[string]string `json:"secureSettings"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteConfigDeleteCmd struct {
|
// RedirectDataOutputConfig ...
|
||||||
UID string `json:"uid"`
|
type RedirectDataOutputConfig struct {
|
||||||
|
Channel string `json:"channel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteConfig struct {
|
type DataOutputterConfig struct {
|
||||||
OrgId int64 `json:"-"`
|
Type string `json:"type" ts_type:"Omit<keyof DataOutputterConfig, 'type'>"`
|
||||||
UID string `json:"uid"`
|
RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"`
|
||||||
Settings WriteSettings `json:"settings"`
|
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
|
||||||
SecureSettings map[string][]byte `json:"secureSettings,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r WriteConfig) Valid() (bool, string) {
|
type FrameOutputterConfig struct {
|
||||||
if r.UID == "" {
|
Type string `json:"type" ts_type:"Omit<keyof FrameOutputterConfig, 'type'>"`
|
||||||
return false, "uid required"
|
ManagedStreamConfig *ManagedStreamOutputConfig `json:"managedStream,omitempty"`
|
||||||
}
|
MultipleOutputterConfig *MultipleOutputterConfig `json:"multiple,omitempty"`
|
||||||
if r.Settings.Endpoint == "" {
|
RedirectOutputConfig *RedirectOutputConfig `json:"redirect,omitempty"`
|
||||||
return false, "endpoint required"
|
ConditionalOutputConfig *ConditionalOutputConfig `json:"conditional,omitempty"`
|
||||||
}
|
ThresholdOutputConfig *ThresholdOutputConfig `json:"threshold,omitempty"`
|
||||||
return true, ""
|
RemoteWriteOutputConfig *RemoteWriteOutputConfig `json:"remoteWrite,omitempty"`
|
||||||
}
|
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
|
||||||
|
ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"`
|
||||||
type BasicAuth struct {
|
|
||||||
// User is a user for remote write request.
|
|
||||||
User string `json:"user,omitempty"`
|
|
||||||
// Password is a plain text non-encrypted password.
|
|
||||||
// TODO: remove after integrating with the database.
|
|
||||||
Password string `json:"password,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type WriteSettings struct {
|
|
||||||
// Endpoint to send streaming frames to.
|
|
||||||
Endpoint string `json:"endpoint"`
|
|
||||||
// BasicAuth is an optional basic auth settings.
|
|
||||||
BasicAuth *BasicAuth `json:"basicAuth,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type WriteConfigs struct {
|
|
||||||
Configs []WriteConfig `json:"writeConfigs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ChannelRules struct {
|
|
||||||
Rules []ChannelRule `json:"rules"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string) {
|
|
||||||
t := tree.New()
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
reason = fmt.Sprintf("%v", r)
|
|
||||||
ok = false
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for _, rule := range rules {
|
|
||||||
if rule.OrgId == orgID || (rule.OrgId == 0 && orgID == 1) {
|
|
||||||
t.AddRoute("/"+rule.Pattern, struct{}{})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ok = true
|
|
||||||
return ok, reason
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type MultipleFrameConditionCheckerConfig struct {
|
type MultipleFrameConditionCheckerConfig struct {
|
||||||
Type ConditionType `json:"type"`
|
ConditionType ConditionType `json:"conditionType"`
|
||||||
Conditions []FrameConditionCheckerConfig `json:"conditions"`
|
Conditions []FrameConditionCheckerConfig `json:"conditions"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NumberCompareFrameConditionConfig struct {
|
type NumberCompareFrameConditionConfig struct {
|
||||||
@ -260,407 +122,33 @@ type NumberCompareFrameConditionConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FrameConditionCheckerConfig struct {
|
type FrameConditionCheckerConfig struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type" ts_type:"Omit<keyof FrameConditionCheckerConfig, 'type'>"`
|
||||||
MultipleConditionCheckerConfig *MultipleFrameConditionCheckerConfig `json:"multiple,omitempty"`
|
MultipleConditionCheckerConfig *MultipleFrameConditionCheckerConfig `json:"multiple,omitempty"`
|
||||||
NumberCompareConditionConfig *NumberCompareFrameConditionConfig `json:"numberCompare,omitempty"`
|
NumberCompareConditionConfig *NumberCompareFrameConditionConfig `json:"numberCompare,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelRuleCreateCmd struct {
|
type AutoJsonConverterConfig struct {
|
||||||
Pattern string `json:"pattern"`
|
FieldTips map[string]Field `json:"fieldTips,omitempty"`
|
||||||
Settings ChannelRuleSettings `json:"settings"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelRuleUpdateCmd struct {
|
// Field description.
|
||||||
Pattern string `json:"pattern"`
|
type Field struct {
|
||||||
Settings ChannelRuleSettings `json:"settings"`
|
Name string `json:"name"`
|
||||||
|
Type data.FieldType `json:"type"`
|
||||||
|
Value string `json:"value"` // Can be JSONPath or Goja script.
|
||||||
|
Labels []Label `json:"labels,omitempty"`
|
||||||
|
Config *data.FieldConfig `json:"config,omitempty" ts_type:"FieldConfig"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelRuleDeleteCmd struct {
|
type ExactJsonConverterConfig struct {
|
||||||
Pattern string `json:"pattern"`
|
Fields []Field `json:"fields"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Storage interface {
|
// AutoInfluxConverterConfig ...
|
||||||
ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error)
|
type AutoInfluxConverterConfig struct {
|
||||||
GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error)
|
FrameFormat string `json:"frameFormat"`
|
||||||
CreateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error)
|
|
||||||
UpdateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error)
|
|
||||||
DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error
|
|
||||||
ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
|
|
||||||
CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)
|
|
||||||
UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)
|
|
||||||
DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type StorageRuleBuilder struct {
|
type JsonFrameConverterConfig struct{}
|
||||||
Node *centrifuge.Node
|
|
||||||
ManagedStream *managedstream.Runner
|
|
||||||
FrameStorage *FrameStorage
|
|
||||||
Storage Storage
|
|
||||||
ChannelHandlerGetter ChannelHandlerGetter
|
|
||||||
SecretsService secrets.Service
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscriber, error) {
|
type ManagedStreamOutputConfig struct{}
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case SubscriberTypeBuiltin:
|
|
||||||
return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil
|
|
||||||
case SubscriberTypeManagedStream:
|
|
||||||
return NewManagedStreamSubscriber(f.ManagedStream), nil
|
|
||||||
case SubscriberTypeMultiple:
|
|
||||||
if config.MultipleSubscriberConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
var subscribers []Subscriber
|
|
||||||
for _, outConf := range config.MultipleSubscriberConfig.Subscribers {
|
|
||||||
out := outConf
|
|
||||||
sub, err := f.extractSubscriber(&out)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
subscribers = append(subscribers, sub)
|
|
||||||
}
|
|
||||||
return NewMultipleSubscriber(subscribers...), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown subscriber type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converter, error) {
|
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case ConverterTypeJsonAuto:
|
|
||||||
if config.AutoJsonConverterConfig == nil {
|
|
||||||
config.AutoJsonConverterConfig = &AutoJsonConverterConfig{}
|
|
||||||
}
|
|
||||||
return NewAutoJsonConverter(*config.AutoJsonConverterConfig), nil
|
|
||||||
case ConverterTypeJsonExact:
|
|
||||||
if config.ExactJsonConverterConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewExactJsonConverter(*config.ExactJsonConverterConfig), nil
|
|
||||||
case ConverterTypeJsonFrame:
|
|
||||||
if config.JsonFrameConverterConfig == nil {
|
|
||||||
config.JsonFrameConverterConfig = &JsonFrameConverterConfig{}
|
|
||||||
}
|
|
||||||
return NewJsonFrameConverter(*config.JsonFrameConverterConfig), nil
|
|
||||||
case ConverterTypeInfluxAuto:
|
|
||||||
if config.AutoInfluxConverterConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewAutoInfluxConverter(*config.AutoInfluxConverterConfig), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown converter type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractFrameProcessor(config *FrameProcessorConfig) (FrameProcessor, error) {
|
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case FrameProcessorTypeDropFields:
|
|
||||||
if config.DropFieldsProcessorConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewDropFieldsFrameProcessor(*config.DropFieldsProcessorConfig), nil
|
|
||||||
case FrameProcessorTypeKeepFields:
|
|
||||||
if config.KeepFieldsProcessorConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewKeepFieldsFrameProcessor(*config.KeepFieldsProcessorConfig), nil
|
|
||||||
case FrameProcessorTypeMultiple:
|
|
||||||
if config.MultipleProcessorConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
var processors []FrameProcessor
|
|
||||||
for _, outConf := range config.MultipleProcessorConfig.Processors {
|
|
||||||
out := outConf
|
|
||||||
proc, err := f.extractFrameProcessor(&out)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
processors = append(processors, proc)
|
|
||||||
}
|
|
||||||
return NewMultipleFrameProcessor(processors...), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown processor type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameConditionCheckerConfig) (FrameConditionChecker, error) {
|
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case FrameConditionCheckerTypeNumberCompare:
|
|
||||||
if config.NumberCompareConditionConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
c := *config.NumberCompareConditionConfig
|
|
||||||
return NewFrameNumberCompareCondition(c.FieldName, c.Op, c.Value), nil
|
|
||||||
case FrameConditionCheckerTypeMultiple:
|
|
||||||
var conditions []FrameConditionChecker
|
|
||||||
if config.MultipleConditionCheckerConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
for _, outConf := range config.MultipleConditionCheckerConfig.Conditions {
|
|
||||||
out := outConf
|
|
||||||
cond, err := f.extractFrameConditionChecker(&out)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
conditions = append(conditions, cond)
|
|
||||||
}
|
|
||||||
return NewMultipleFrameConditionChecker(config.MultipleConditionCheckerConfig.Type, conditions...), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown condition type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) constructBasicAuth(writeConfig WriteConfig) (*BasicAuth, error) {
|
|
||||||
if writeConfig.Settings.BasicAuth == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
var password string
|
|
||||||
hasSecurePassword := len(writeConfig.SecureSettings["basicAuthPassword"]) > 0
|
|
||||||
if hasSecurePassword {
|
|
||||||
passwordBytes, err := f.SecretsService.Decrypt(context.Background(), writeConfig.SecureSettings["basicAuthPassword"])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("basicAuthPassword can't be decrypted: %w", err)
|
|
||||||
}
|
|
||||||
password = string(passwordBytes)
|
|
||||||
} else {
|
|
||||||
// Use plain text password (should be removed upon database integration).
|
|
||||||
if writeConfig.Settings.BasicAuth != nil {
|
|
||||||
password = writeConfig.Settings.BasicAuth.Password
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &BasicAuth{
|
|
||||||
User: writeConfig.Settings.BasicAuth.User,
|
|
||||||
Password: password,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, writeConfigs []WriteConfig) (FrameOutputter, error) {
|
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case FrameOutputTypeRedirect:
|
|
||||||
if config.RedirectOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewRedirectFrameOutput(*config.RedirectOutputConfig), nil
|
|
||||||
case FrameOutputTypeMultiple:
|
|
||||||
if config.MultipleOutputterConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
var outputters []FrameOutputter
|
|
||||||
for _, outConf := range config.MultipleOutputterConfig.Outputters {
|
|
||||||
out := outConf
|
|
||||||
outputter, err := f.extractFrameOutputter(&out, writeConfigs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
outputters = append(outputters, outputter)
|
|
||||||
}
|
|
||||||
return NewMultipleFrameOutput(outputters...), nil
|
|
||||||
case FrameOutputTypeManagedStream:
|
|
||||||
return NewManagedStreamFrameOutput(f.ManagedStream), nil
|
|
||||||
case FrameOutputTypeLocalSubscribers:
|
|
||||||
return NewLocalSubscribersFrameOutput(f.Node), nil
|
|
||||||
case FrameOutputTypeConditional:
|
|
||||||
if config.ConditionalOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
condition, err := f.extractFrameConditionChecker(config.ConditionalOutputConfig.Condition)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, writeConfigs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return NewConditionalOutput(condition, outputter), nil
|
|
||||||
case FrameOutputTypeThreshold:
|
|
||||||
if config.ThresholdOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewThresholdOutput(f.FrameStorage, *config.ThresholdOutputConfig), nil
|
|
||||||
case FrameOutputTypeRemoteWrite:
|
|
||||||
if config.RemoteWriteOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
writeConfig, ok := f.getWriteConfig(config.RemoteWriteOutputConfig.UID, writeConfigs)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unknown write config uid: %s", config.RemoteWriteOutputConfig.UID)
|
|
||||||
}
|
|
||||||
basicAuth, err := f.constructBasicAuth(writeConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error getting password: %w", err)
|
|
||||||
}
|
|
||||||
return NewRemoteWriteFrameOutput(
|
|
||||||
writeConfig.Settings.Endpoint,
|
|
||||||
basicAuth,
|
|
||||||
config.RemoteWriteOutputConfig.SampleMilliseconds,
|
|
||||||
), nil
|
|
||||||
case FrameOutputTypeLoki:
|
|
||||||
if config.LokiOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
|
||||||
}
|
|
||||||
basicAuth, err := f.constructBasicAuth(writeConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error getting password: %w", err)
|
|
||||||
}
|
|
||||||
return NewLokiFrameOutput(
|
|
||||||
writeConfig.Settings.Endpoint,
|
|
||||||
basicAuth,
|
|
||||||
), nil
|
|
||||||
case FrameOutputTypeChangeLog:
|
|
||||||
if config.ChangeLogOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewChangeLogFrameOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown output type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, writeConfigs []WriteConfig) (DataOutputter, error) {
|
|
||||||
if config == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
|
||||||
switch config.Type {
|
|
||||||
case DataOutputTypeRedirect:
|
|
||||||
if config.RedirectDataOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
return NewRedirectDataOutput(*config.RedirectDataOutputConfig), nil
|
|
||||||
case DataOutputTypeLoki:
|
|
||||||
if config.LokiOutputConfig == nil {
|
|
||||||
return nil, missingConfiguration
|
|
||||||
}
|
|
||||||
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
|
||||||
}
|
|
||||||
basicAuth, err := f.constructBasicAuth(writeConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error constructing basicAuth: %w", err)
|
|
||||||
}
|
|
||||||
return NewLokiDataOutput(
|
|
||||||
writeConfig.Settings.Endpoint,
|
|
||||||
basicAuth,
|
|
||||||
), nil
|
|
||||||
case DataOutputTypeBuiltin:
|
|
||||||
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
|
|
||||||
case DataOutputTypeLocalSubscribers:
|
|
||||||
return NewLocalSubscribersDataOutput(f.Node), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown data output type: %s", config.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) getWriteConfig(uid string, writeConfigs []WriteConfig) (WriteConfig, bool) {
|
|
||||||
for _, rwb := range writeConfigs {
|
|
||||||
if rwb.UID == uid {
|
|
||||||
return rwb, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return WriteConfig{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error) {
|
|
||||||
channelRules, err := f.Storage.ListChannelRules(ctx, orgID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
writeConfigs, err := f.Storage.ListWriteConfigs(ctx, orgID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var rules []*LiveChannelRule
|
|
||||||
|
|
||||||
for _, ruleConfig := range channelRules {
|
|
||||||
rule := &LiveChannelRule{
|
|
||||||
OrgId: orgID,
|
|
||||||
Pattern: ruleConfig.Pattern,
|
|
||||||
}
|
|
||||||
|
|
||||||
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Subscribe != nil {
|
|
||||||
rule.SubscribeAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Subscribe.RequireRole)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Publish != nil {
|
|
||||||
rule.PublishAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Publish.RequireRole)
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error building converter for %s: %w", rule.Pattern, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var processors []FrameProcessor
|
|
||||||
for _, procConfig := range ruleConfig.Settings.FrameProcessors {
|
|
||||||
proc, err := f.extractFrameProcessor(procConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error building processor for %s: %w", rule.Pattern, err)
|
|
||||||
}
|
|
||||||
processors = append(processors, proc)
|
|
||||||
}
|
|
||||||
rule.FrameProcessors = processors
|
|
||||||
|
|
||||||
var dataOutputters []DataOutputter
|
|
||||||
for _, outConfig := range ruleConfig.Settings.DataOutputters {
|
|
||||||
out, err := f.extractDataOutputter(outConfig, writeConfigs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
|
|
||||||
}
|
|
||||||
dataOutputters = append(dataOutputters, out)
|
|
||||||
}
|
|
||||||
rule.DataOutputters = dataOutputters
|
|
||||||
|
|
||||||
var outputters []FrameOutputter
|
|
||||||
for _, outConfig := range ruleConfig.Settings.FrameOutputters {
|
|
||||||
out, err := f.extractFrameOutputter(outConfig, writeConfigs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err)
|
|
||||||
}
|
|
||||||
outputters = append(outputters, out)
|
|
||||||
}
|
|
||||||
rule.FrameOutputters = outputters
|
|
||||||
|
|
||||||
var subscribers []Subscriber
|
|
||||||
for _, subConfig := range ruleConfig.Settings.Subscribers {
|
|
||||||
sub, err := f.extractSubscriber(subConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error building subscriber for %s: %w", rule.Pattern, err)
|
|
||||||
}
|
|
||||||
subscribers = append(subscribers, sub)
|
|
||||||
}
|
|
||||||
rule.Subscribers = subscribers
|
|
||||||
|
|
||||||
rules = append(rules, rule)
|
|
||||||
}
|
|
||||||
|
|
||||||
return rules, nil
|
|
||||||
}
|
|
||||||
|
@ -6,11 +6,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/live/convert"
|
"github.com/grafana/grafana/pkg/services/live/convert"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutoInfluxConverterConfig ...
|
|
||||||
type AutoInfluxConverterConfig struct {
|
|
||||||
FrameFormat string `json:"frameFormat"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutoInfluxConverter decodes Influx line protocol input and transforms it
|
// AutoInfluxConverter decodes Influx line protocol input and transforms it
|
||||||
// to several ChannelFrame objects where Channel is constructed from original
|
// to several ChannelFrame objects where Channel is constructed from original
|
||||||
// channel + / + <metric_name>.
|
// channel + / + <metric_name>.
|
||||||
|
@ -5,10 +5,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AutoJsonConverterConfig struct {
|
|
||||||
FieldTips map[string]Field `json:"fieldTips,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type AutoJsonConverter struct {
|
type AutoJsonConverter struct {
|
||||||
config AutoJsonConverterConfig
|
config AutoJsonConverterConfig
|
||||||
nowTimeFunc func() time.Time
|
nowTimeFunc func() time.Time
|
||||||
|
@ -13,10 +13,6 @@ import (
|
|||||||
"github.com/ohler55/ojg/oj"
|
"github.com/ohler55/ojg/oj"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExactJsonConverterConfig struct {
|
|
||||||
Fields []Field `json:"fields"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExactJsonConverter can convert JSON to a single data.Frame according to
|
// ExactJsonConverter can convert JSON to a single data.Frame according to
|
||||||
// user-defined field configuration and value extraction rules.
|
// user-defined field configuration and value extraction rules.
|
||||||
type ExactJsonConverter struct {
|
type ExactJsonConverter struct {
|
||||||
|
@ -7,8 +7,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JsonFrameConverterConfig struct{}
|
|
||||||
|
|
||||||
// JsonFrameConverter decodes single data.Frame from JSON.
|
// JsonFrameConverter decodes single data.Frame from JSON.
|
||||||
type JsonFrameConverter struct {
|
type JsonFrameConverter struct {
|
||||||
config JsonFrameConverterConfig
|
config JsonFrameConverterConfig
|
||||||
|
@ -5,11 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RedirectDataOutputConfig ...
|
|
||||||
type RedirectDataOutputConfig struct {
|
|
||||||
Channel string `json:"channel"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// RedirectDataOutput passes processing control to the rule defined
|
// RedirectDataOutput passes processing control to the rule defined
|
||||||
// for a configured channel.
|
// for a configured channel.
|
||||||
type RedirectDataOutput struct {
|
type RedirectDataOutput struct {
|
||||||
|
@ -6,10 +6,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DropFieldsFrameProcessorConfig struct {
|
|
||||||
FieldNames []string `json:"fieldNames"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DropFieldsFrameProcessor can drop specified fields from a data.Frame.
|
// DropFieldsFrameProcessor can drop specified fields from a data.Frame.
|
||||||
type DropFieldsFrameProcessor struct {
|
type DropFieldsFrameProcessor struct {
|
||||||
config DropFieldsFrameProcessorConfig
|
config DropFieldsFrameProcessorConfig
|
||||||
|
@ -6,10 +6,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
type KeepFieldsFrameProcessorConfig struct {
|
|
||||||
FieldNames []string `json:"fieldNames"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeepFieldsFrameProcessor can keep specified fields in a data.Frame dropping all other fields.
|
// KeepFieldsFrameProcessor can keep specified fields in a data.Frame dropping all other fields.
|
||||||
type KeepFieldsFrameProcessor struct {
|
type KeepFieldsFrameProcessor struct {
|
||||||
config KeepFieldsFrameProcessorConfig
|
config KeepFieldsFrameProcessorConfig
|
||||||
|
161
pkg/services/live/pipeline/models.go
Normal file
161
pkg/services/live/pipeline/models.go
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
package pipeline
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/live/pipeline/pattern"
|
||||||
|
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r ChannelRule) Valid() (bool, string) {
|
||||||
|
ok, reason := pattern.Valid(r.Pattern)
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Sprintf("invalid pattern: %s", reason)
|
||||||
|
}
|
||||||
|
if r.Settings.Converter != nil {
|
||||||
|
if !typeRegistered(r.Settings.Converter.Type, ConvertersRegistry) {
|
||||||
|
return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(r.Settings.Subscribers) > 0 {
|
||||||
|
for _, sub := range r.Settings.Subscribers {
|
||||||
|
if !typeRegistered(sub.Type, SubscribersRegistry) {
|
||||||
|
return false, fmt.Sprintf("unknown subscriber type: %s", sub.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(r.Settings.FrameProcessors) > 0 {
|
||||||
|
for _, proc := range r.Settings.FrameProcessors {
|
||||||
|
if !typeRegistered(proc.Type, FrameProcessorsRegistry) {
|
||||||
|
return false, fmt.Sprintf("unknown processor type: %s", proc.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(r.Settings.FrameOutputters) > 0 {
|
||||||
|
for _, out := range r.Settings.FrameOutputters {
|
||||||
|
if !typeRegistered(out.Type, FrameOutputsRegistry) {
|
||||||
|
return false, fmt.Sprintf("unknown output type: %s", out.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func typeRegistered(entityType string, registry []EntityInfo) bool {
|
||||||
|
for _, info := range registry {
|
||||||
|
if info.Type == entityType {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func WriteConfigToDto(b WriteConfig) WriteConfigDto {
|
||||||
|
secureFields := make(map[string]bool, len(b.SecureSettings))
|
||||||
|
for k := range b.SecureSettings {
|
||||||
|
secureFields[k] = true
|
||||||
|
}
|
||||||
|
return WriteConfigDto{
|
||||||
|
UID: b.UID,
|
||||||
|
Settings: b.Settings,
|
||||||
|
SecureFields: secureFields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigDto struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
Settings WriteSettings `json:"settings"`
|
||||||
|
SecureFields map[string]bool `json:"secureFields"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigGetCmd struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigCreateCmd struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
Settings WriteSettings `json:"settings"`
|
||||||
|
SecureSettings map[string]string `json:"secureSettings"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add version field later.
|
||||||
|
type WriteConfigUpdateCmd struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
Settings WriteSettings `json:"settings"`
|
||||||
|
SecureSettings map[string]string `json:"secureSettings"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigDeleteCmd struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfig struct {
|
||||||
|
OrgId int64 `json:"-"`
|
||||||
|
UID string `json:"uid"`
|
||||||
|
Settings WriteSettings `json:"settings"`
|
||||||
|
SecureSettings map[string][]byte `json:"secureSettings,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r WriteConfig) Valid() (bool, string) {
|
||||||
|
if r.UID == "" {
|
||||||
|
return false, "uid required"
|
||||||
|
}
|
||||||
|
if r.Settings.Endpoint == "" {
|
||||||
|
return false, "endpoint required"
|
||||||
|
}
|
||||||
|
return true, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type BasicAuth struct {
|
||||||
|
// User is a user for remote write request.
|
||||||
|
User string `json:"user,omitempty"`
|
||||||
|
// Password is a plain text non-encrypted password.
|
||||||
|
// TODO: remove after integrating with the database.
|
||||||
|
Password string `json:"password,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteSettings struct {
|
||||||
|
// Endpoint to send streaming frames to.
|
||||||
|
Endpoint string `json:"endpoint"`
|
||||||
|
// BasicAuth is an optional basic auth settings.
|
||||||
|
BasicAuth *BasicAuth `json:"basicAuth,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteConfigs struct {
|
||||||
|
Configs []WriteConfig `json:"writeConfigs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelRules struct {
|
||||||
|
Rules []ChannelRule `json:"rules"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string) {
|
||||||
|
t := tree.New()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
reason = fmt.Sprintf("%v", r)
|
||||||
|
ok = false
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for _, rule := range rules {
|
||||||
|
if rule.OrgId == orgID || (rule.OrgId == 0 && orgID == 1) {
|
||||||
|
t.AddRoute("/"+rule.Pattern, struct{}{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ok = true
|
||||||
|
return ok, reason
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelRuleCreateCmd struct {
|
||||||
|
Pattern string `json:"pattern"`
|
||||||
|
Settings ChannelRuleSettings `json:"settings"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelRuleUpdateCmd struct {
|
||||||
|
Pattern string `json:"pattern"`
|
||||||
|
Settings ChannelRuleSettings `json:"settings"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelRuleDeleteCmd struct {
|
||||||
|
Pattern string `json:"pattern"`
|
||||||
|
}
|
@ -169,15 +169,6 @@ type Label struct {
|
|||||||
Value string `json:"value"` // Can be JSONPath or Goja script.
|
Value string `json:"value"` // Can be JSONPath or Goja script.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Field description.
|
|
||||||
type Field struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
Type data.FieldType `json:"type"`
|
|
||||||
Value string `json:"value"` // Can be JSONPath or Goja script.
|
|
||||||
Labels []Label `json:"labels,omitempty"`
|
|
||||||
Config *data.FieldConfig `json:"config,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ChannelRuleGetter interface {
|
type ChannelRuleGetter interface {
|
||||||
Get(orgID int64, channel string) (*LiveChannelRule, bool, error)
|
Get(orgID int64, channel string) (*LiveChannelRule, bool, error)
|
||||||
}
|
}
|
||||||
|
384
pkg/services/live/pipeline/rule_builder_storage.go
Normal file
384
pkg/services/live/pipeline/rule_builder_storage.go
Normal file
@ -0,0 +1,384 @@
|
|||||||
|
package pipeline
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/centrifugal/centrifuge"
|
||||||
|
"github.com/grafana/grafana/pkg/services/live/managedstream"
|
||||||
|
"github.com/grafana/grafana/pkg/services/secrets"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StorageRuleBuilder struct {
|
||||||
|
Node *centrifuge.Node
|
||||||
|
ManagedStream *managedstream.Runner
|
||||||
|
FrameStorage *FrameStorage
|
||||||
|
Storage Storage
|
||||||
|
ChannelHandlerGetter ChannelHandlerGetter
|
||||||
|
SecretsService secrets.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscriber, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case SubscriberTypeBuiltin:
|
||||||
|
return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil
|
||||||
|
case SubscriberTypeManagedStream:
|
||||||
|
return NewManagedStreamSubscriber(f.ManagedStream), nil
|
||||||
|
case SubscriberTypeMultiple:
|
||||||
|
if config.MultipleSubscriberConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
var subscribers []Subscriber
|
||||||
|
for _, outConf := range config.MultipleSubscriberConfig.Subscribers {
|
||||||
|
out := outConf
|
||||||
|
sub, err := f.extractSubscriber(&out)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
subscribers = append(subscribers, sub)
|
||||||
|
}
|
||||||
|
return NewMultipleSubscriber(subscribers...), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown subscriber type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converter, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case ConverterTypeJsonAuto:
|
||||||
|
if config.AutoJsonConverterConfig == nil {
|
||||||
|
config.AutoJsonConverterConfig = &AutoJsonConverterConfig{}
|
||||||
|
}
|
||||||
|
return NewAutoJsonConverter(*config.AutoJsonConverterConfig), nil
|
||||||
|
case ConverterTypeJsonExact:
|
||||||
|
if config.ExactJsonConverterConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewExactJsonConverter(*config.ExactJsonConverterConfig), nil
|
||||||
|
case ConverterTypeJsonFrame:
|
||||||
|
if config.JsonFrameConverterConfig == nil {
|
||||||
|
config.JsonFrameConverterConfig = &JsonFrameConverterConfig{}
|
||||||
|
}
|
||||||
|
return NewJsonFrameConverter(*config.JsonFrameConverterConfig), nil
|
||||||
|
case ConverterTypeInfluxAuto:
|
||||||
|
if config.AutoInfluxConverterConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewAutoInfluxConverter(*config.AutoInfluxConverterConfig), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown converter type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractFrameProcessor(config *FrameProcessorConfig) (FrameProcessor, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case FrameProcessorTypeDropFields:
|
||||||
|
if config.DropFieldsProcessorConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewDropFieldsFrameProcessor(*config.DropFieldsProcessorConfig), nil
|
||||||
|
case FrameProcessorTypeKeepFields:
|
||||||
|
if config.KeepFieldsProcessorConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewKeepFieldsFrameProcessor(*config.KeepFieldsProcessorConfig), nil
|
||||||
|
case FrameProcessorTypeMultiple:
|
||||||
|
if config.MultipleProcessorConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
var processors []FrameProcessor
|
||||||
|
for _, outConf := range config.MultipleProcessorConfig.Processors {
|
||||||
|
out := outConf
|
||||||
|
proc, err := f.extractFrameProcessor(&out)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
processors = append(processors, proc)
|
||||||
|
}
|
||||||
|
return NewMultipleFrameProcessor(processors...), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown processor type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameConditionCheckerConfig) (FrameConditionChecker, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case FrameConditionCheckerTypeNumberCompare:
|
||||||
|
if config.NumberCompareConditionConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
c := *config.NumberCompareConditionConfig
|
||||||
|
return NewFrameNumberCompareCondition(c.FieldName, c.Op, c.Value), nil
|
||||||
|
case FrameConditionCheckerTypeMultiple:
|
||||||
|
var conditions []FrameConditionChecker
|
||||||
|
if config.MultipleConditionCheckerConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
for _, outConf := range config.MultipleConditionCheckerConfig.Conditions {
|
||||||
|
out := outConf
|
||||||
|
cond, err := f.extractFrameConditionChecker(&out)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
conditions = append(conditions, cond)
|
||||||
|
}
|
||||||
|
return NewMultipleFrameConditionChecker(config.MultipleConditionCheckerConfig.ConditionType, conditions...), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown condition type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) constructBasicAuth(writeConfig WriteConfig) (*BasicAuth, error) {
|
||||||
|
if writeConfig.Settings.BasicAuth == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
var password string
|
||||||
|
hasSecurePassword := len(writeConfig.SecureSettings["basicAuthPassword"]) > 0
|
||||||
|
if hasSecurePassword {
|
||||||
|
passwordBytes, err := f.SecretsService.Decrypt(context.Background(), writeConfig.SecureSettings["basicAuthPassword"])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("basicAuthPassword can't be decrypted: %w", err)
|
||||||
|
}
|
||||||
|
password = string(passwordBytes)
|
||||||
|
} else {
|
||||||
|
// Use plain text password (should be removed upon database integration).
|
||||||
|
if writeConfig.Settings.BasicAuth != nil {
|
||||||
|
password = writeConfig.Settings.BasicAuth.Password
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &BasicAuth{
|
||||||
|
User: writeConfig.Settings.BasicAuth.User,
|
||||||
|
Password: password,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, writeConfigs []WriteConfig) (FrameOutputter, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case FrameOutputTypeRedirect:
|
||||||
|
if config.RedirectOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewRedirectFrameOutput(*config.RedirectOutputConfig), nil
|
||||||
|
case FrameOutputTypeMultiple:
|
||||||
|
if config.MultipleOutputterConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
var outputters []FrameOutputter
|
||||||
|
for _, outConf := range config.MultipleOutputterConfig.Outputters {
|
||||||
|
out := outConf
|
||||||
|
outputter, err := f.extractFrameOutputter(&out, writeConfigs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outputters = append(outputters, outputter)
|
||||||
|
}
|
||||||
|
return NewMultipleFrameOutput(outputters...), nil
|
||||||
|
case FrameOutputTypeManagedStream:
|
||||||
|
return NewManagedStreamFrameOutput(f.ManagedStream), nil
|
||||||
|
case FrameOutputTypeLocalSubscribers:
|
||||||
|
return NewLocalSubscribersFrameOutput(f.Node), nil
|
||||||
|
case FrameOutputTypeConditional:
|
||||||
|
if config.ConditionalOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
condition, err := f.extractFrameConditionChecker(config.ConditionalOutputConfig.Condition)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, writeConfigs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return NewConditionalOutput(condition, outputter), nil
|
||||||
|
case FrameOutputTypeThreshold:
|
||||||
|
if config.ThresholdOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewThresholdOutput(f.FrameStorage, *config.ThresholdOutputConfig), nil
|
||||||
|
case FrameOutputTypeRemoteWrite:
|
||||||
|
if config.RemoteWriteOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
writeConfig, ok := f.getWriteConfig(config.RemoteWriteOutputConfig.UID, writeConfigs)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown write config uid: %s", config.RemoteWriteOutputConfig.UID)
|
||||||
|
}
|
||||||
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting password: %w", err)
|
||||||
|
}
|
||||||
|
return NewRemoteWriteFrameOutput(
|
||||||
|
writeConfig.Settings.Endpoint,
|
||||||
|
basicAuth,
|
||||||
|
config.RemoteWriteOutputConfig.SampleMilliseconds,
|
||||||
|
), nil
|
||||||
|
case FrameOutputTypeLoki:
|
||||||
|
if config.LokiOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
||||||
|
}
|
||||||
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting password: %w", err)
|
||||||
|
}
|
||||||
|
return NewLokiFrameOutput(
|
||||||
|
writeConfig.Settings.Endpoint,
|
||||||
|
basicAuth,
|
||||||
|
), nil
|
||||||
|
case FrameOutputTypeChangeLog:
|
||||||
|
if config.ChangeLogOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewChangeLogFrameOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown output type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, writeConfigs []WriteConfig) (DataOutputter, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
|
||||||
|
switch config.Type {
|
||||||
|
case DataOutputTypeRedirect:
|
||||||
|
if config.RedirectDataOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
return NewRedirectDataOutput(*config.RedirectDataOutputConfig), nil
|
||||||
|
case DataOutputTypeLoki:
|
||||||
|
if config.LokiOutputConfig == nil {
|
||||||
|
return nil, missingConfiguration
|
||||||
|
}
|
||||||
|
writeConfig, ok := f.getWriteConfig(config.LokiOutputConfig.UID, writeConfigs)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
|
||||||
|
}
|
||||||
|
basicAuth, err := f.constructBasicAuth(writeConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error constructing basicAuth: %w", err)
|
||||||
|
}
|
||||||
|
return NewLokiDataOutput(
|
||||||
|
writeConfig.Settings.Endpoint,
|
||||||
|
basicAuth,
|
||||||
|
), nil
|
||||||
|
case DataOutputTypeBuiltin:
|
||||||
|
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
|
||||||
|
case DataOutputTypeLocalSubscribers:
|
||||||
|
return NewLocalSubscribersDataOutput(f.Node), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown data output type: %s", config.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) getWriteConfig(uid string, writeConfigs []WriteConfig) (WriteConfig, bool) {
|
||||||
|
for _, rwb := range writeConfigs {
|
||||||
|
if rwb.UID == uid {
|
||||||
|
return rwb, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return WriteConfig{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error) {
|
||||||
|
channelRules, err := f.Storage.ListChannelRules(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
writeConfigs, err := f.Storage.ListWriteConfigs(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var rules []*LiveChannelRule
|
||||||
|
|
||||||
|
for _, ruleConfig := range channelRules {
|
||||||
|
rule := &LiveChannelRule{
|
||||||
|
OrgId: orgID,
|
||||||
|
Pattern: ruleConfig.Pattern,
|
||||||
|
}
|
||||||
|
|
||||||
|
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Subscribe != nil {
|
||||||
|
rule.SubscribeAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Subscribe.RequireRole)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Publish != nil {
|
||||||
|
rule.PublishAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Publish.RequireRole)
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building converter for %s: %w", rule.Pattern, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var processors []FrameProcessor
|
||||||
|
for _, procConfig := range ruleConfig.Settings.FrameProcessors {
|
||||||
|
proc, err := f.extractFrameProcessor(procConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building processor for %s: %w", rule.Pattern, err)
|
||||||
|
}
|
||||||
|
processors = append(processors, proc)
|
||||||
|
}
|
||||||
|
rule.FrameProcessors = processors
|
||||||
|
|
||||||
|
var dataOutputters []DataOutputter
|
||||||
|
for _, outConfig := range ruleConfig.Settings.DataOutputters {
|
||||||
|
out, err := f.extractDataOutputter(outConfig, writeConfigs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
|
||||||
|
}
|
||||||
|
dataOutputters = append(dataOutputters, out)
|
||||||
|
}
|
||||||
|
rule.DataOutputters = dataOutputters
|
||||||
|
|
||||||
|
var outputters []FrameOutputter
|
||||||
|
for _, outConfig := range ruleConfig.Settings.FrameOutputters {
|
||||||
|
out, err := f.extractFrameOutputter(outConfig, writeConfigs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err)
|
||||||
|
}
|
||||||
|
outputters = append(outputters, out)
|
||||||
|
}
|
||||||
|
rule.FrameOutputters = outputters
|
||||||
|
|
||||||
|
var subscribers []Subscriber
|
||||||
|
for _, subConfig := range ruleConfig.Settings.Subscribers {
|
||||||
|
sub, err := f.extractSubscriber(subConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building subscriber for %s: %w", rule.Pattern, err)
|
||||||
|
}
|
||||||
|
subscribers = append(subscribers, sub)
|
||||||
|
}
|
||||||
|
rule.Subscribers = subscribers
|
||||||
|
|
||||||
|
rules = append(rules, rule)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rules, nil
|
||||||
|
}
|
16
pkg/services/live/pipeline/storage.go
Normal file
16
pkg/services/live/pipeline/storage.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package pipeline
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// Storage describes all methods to manage Live pipeline persistent data.
|
||||||
|
type Storage interface {
|
||||||
|
ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error)
|
||||||
|
GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error)
|
||||||
|
CreateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error)
|
||||||
|
UpdateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error)
|
||||||
|
DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error
|
||||||
|
ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
|
||||||
|
CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)
|
||||||
|
UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)
|
||||||
|
DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error
|
||||||
|
}
|
132
public/app/features/live/pipeline/models.gen.ts
Normal file
132
public/app/features/live/pipeline/models.gen.ts
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
/* Do not change, this code is generated from Golang structs */
|
||||||
|
|
||||||
|
import { FieldConfig } from '@grafana/data';
|
||||||
|
|
||||||
|
export interface ChannelAuthCheckConfig {
|
||||||
|
role?: string;
|
||||||
|
}
|
||||||
|
export interface ChannelAuthConfig {
|
||||||
|
subscribe?: ChannelAuthCheckConfig;
|
||||||
|
publish?: ChannelAuthCheckConfig;
|
||||||
|
}
|
||||||
|
export interface ChangeLogOutputConfig {
|
||||||
|
fieldName: string;
|
||||||
|
channel: string;
|
||||||
|
}
|
||||||
|
export interface RemoteWriteOutputConfig {
|
||||||
|
uid: string;
|
||||||
|
sampleMilliseconds: number;
|
||||||
|
}
|
||||||
|
export interface ThresholdOutputConfig {
|
||||||
|
fieldName: string;
|
||||||
|
channel: string;
|
||||||
|
}
|
||||||
|
export interface NumberCompareFrameConditionConfig {
|
||||||
|
fieldName: string;
|
||||||
|
op: string;
|
||||||
|
value: number;
|
||||||
|
}
|
||||||
|
export interface MultipleFrameConditionCheckerConfig {
|
||||||
|
conditionType: string;
|
||||||
|
conditions: FrameConditionCheckerConfig[];
|
||||||
|
}
|
||||||
|
export interface FrameConditionCheckerConfig {
|
||||||
|
type: Omit<keyof FrameConditionCheckerConfig, 'type'>;
|
||||||
|
multiple?: MultipleFrameConditionCheckerConfig;
|
||||||
|
numberCompare?: NumberCompareFrameConditionConfig;
|
||||||
|
}
|
||||||
|
export interface ConditionalOutputConfig {
|
||||||
|
condition?: FrameConditionCheckerConfig;
|
||||||
|
output?: FrameOutputterConfig;
|
||||||
|
}
|
||||||
|
export interface RedirectOutputConfig {
|
||||||
|
channel: string;
|
||||||
|
}
|
||||||
|
export interface MultipleOutputterConfig {
|
||||||
|
outputs: FrameOutputterConfig[];
|
||||||
|
}
|
||||||
|
export interface ManagedStreamOutputConfig {}
|
||||||
|
export interface FrameOutputterConfig {
|
||||||
|
type: Omit<keyof FrameOutputterConfig, 'type'>;
|
||||||
|
managedStream?: ManagedStreamOutputConfig;
|
||||||
|
multiple?: MultipleOutputterConfig;
|
||||||
|
redirect?: RedirectOutputConfig;
|
||||||
|
conditional?: ConditionalOutputConfig;
|
||||||
|
threshold?: ThresholdOutputConfig;
|
||||||
|
remoteWrite?: RemoteWriteOutputConfig;
|
||||||
|
loki?: LokiOutputConfig;
|
||||||
|
changeLog?: ChangeLogOutputConfig;
|
||||||
|
}
|
||||||
|
export interface MultipleFrameProcessorConfig {
|
||||||
|
processors: FrameProcessorConfig[];
|
||||||
|
}
|
||||||
|
export interface KeepFieldsFrameProcessorConfig {
|
||||||
|
fieldNames: string[];
|
||||||
|
}
|
||||||
|
export interface DropFieldsFrameProcessorConfig {
|
||||||
|
fieldNames: string[];
|
||||||
|
}
|
||||||
|
export interface FrameProcessorConfig {
|
||||||
|
type: Omit<keyof FrameProcessorConfig, 'type'>;
|
||||||
|
dropFields?: DropFieldsFrameProcessorConfig;
|
||||||
|
keepFields?: KeepFieldsFrameProcessorConfig;
|
||||||
|
multiple?: MultipleFrameProcessorConfig;
|
||||||
|
}
|
||||||
|
export interface JsonFrameConverterConfig {}
|
||||||
|
export interface AutoInfluxConverterConfig {
|
||||||
|
frameFormat: string;
|
||||||
|
}
|
||||||
|
export interface ExactJsonConverterConfig {
|
||||||
|
fields: Field[];
|
||||||
|
}
|
||||||
|
export interface Label {
|
||||||
|
name: string;
|
||||||
|
value: string;
|
||||||
|
}
|
||||||
|
export interface Field {
|
||||||
|
name: string;
|
||||||
|
type: number;
|
||||||
|
value: string;
|
||||||
|
labels?: Label[];
|
||||||
|
config?: FieldConfig;
|
||||||
|
}
|
||||||
|
export interface AutoJsonConverterConfig {
|
||||||
|
fieldTips?: { [key: string]: Field };
|
||||||
|
}
|
||||||
|
export interface ConverterConfig {
|
||||||
|
type: Omit<keyof ConverterConfig, 'type'>;
|
||||||
|
jsonAuto?: AutoJsonConverterConfig;
|
||||||
|
jsonExact?: ExactJsonConverterConfig;
|
||||||
|
influxAuto?: AutoInfluxConverterConfig;
|
||||||
|
jsonFrame?: JsonFrameConverterConfig;
|
||||||
|
}
|
||||||
|
export interface LokiOutputConfig {
|
||||||
|
uid: string;
|
||||||
|
}
|
||||||
|
export interface RedirectDataOutputConfig {
|
||||||
|
channel: string;
|
||||||
|
}
|
||||||
|
export interface DataOutputterConfig {
|
||||||
|
type: Omit<keyof DataOutputterConfig, 'type'>;
|
||||||
|
redirect?: RedirectDataOutputConfig;
|
||||||
|
loki?: LokiOutputConfig;
|
||||||
|
}
|
||||||
|
export interface MultipleSubscriberConfig {
|
||||||
|
subscribers: SubscriberConfig[];
|
||||||
|
}
|
||||||
|
export interface SubscriberConfig {
|
||||||
|
type: Omit<keyof SubscriberConfig, 'type'>;
|
||||||
|
multiple?: MultipleSubscriberConfig;
|
||||||
|
}
|
||||||
|
export interface ChannelRuleSettings {
|
||||||
|
auth?: ChannelAuthConfig;
|
||||||
|
subscribers?: SubscriberConfig[];
|
||||||
|
dataOutputs?: DataOutputterConfig[];
|
||||||
|
converter?: ConverterConfig;
|
||||||
|
frameProcessors?: FrameProcessorConfig[];
|
||||||
|
frameOutputs?: FrameOutputterConfig[];
|
||||||
|
}
|
||||||
|
export interface ChannelRule {
|
||||||
|
pattern: string;
|
||||||
|
settings: ChannelRuleSettings;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user