Files
grafana/pkg/storage/unified/sqlstash/sql_storage_server.go
2024-06-14 14:24:26 +03:00

205 lines
5.3 KiB
Go

package sqlstash
import (
"context"
"errors"
"fmt"
"strings"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// Package-level errors.
var (
ErrNotImplementedYet = errors.New("not implemented yet")
)
func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (resource.ResourceServer, error) {
ctx, cancel := context.WithCancel(context.Background())
store := &sqlResourceStore{
db: db,
log: log.New("sql-resource-server"),
ctx: ctx,
cancel: cancel,
tracer: tracer,
}
if err := prometheus.Register(sqlstash.NewStorageMetrics()); err != nil {
return nil, err
}
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Store: store,
NodeID: 234, // from config? used for snowflake ID
Diagnostics: store,
Lifecycle: store,
})
}
type sqlResourceStore struct {
log log.Logger
db db.EntityDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
broadcaster sqlstash.Broadcaster[*resource.WatchResponse]
ctx context.Context // TODO: remove
cancel context.CancelFunc
stream chan *resource.WatchResponse
tracer trace.Tracer
sqlDB db.DB
sqlDialect sqltemplate.Dialect
}
func (s *sqlResourceStore) Init() error {
if s.sess != nil {
return nil
}
if s.db == nil {
return errors.New("missing db")
}
err := s.db.Init()
if err != nil {
return err
}
sqlDB, err := s.db.GetDB()
if err != nil {
return err
}
s.sqlDB = sqlDB
driverName := sqlDB.DriverName()
driverName = strings.TrimSuffix(driverName, "WithHooks")
switch driverName {
case db.DriverMySQL:
s.sqlDialect = sqltemplate.MySQL
case db.DriverPostgres:
s.sqlDialect = sqltemplate.PostgreSQL
case db.DriverSQLite, db.DriverSQLite3:
s.sqlDialect = sqltemplate.SQLite
default:
return fmt.Errorf("no dialect for driver %q", driverName)
}
sess, err := s.db.GetSession()
if err != nil {
return err
}
engine, err := s.db.GetEngine()
if err != nil {
return err
}
s.sess = sess
s.dialect = migrator.NewDialect(engine.DriverName())
// set up the broadcaster
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *resource.WatchResponse) error {
s.stream = stream
// start the poller
go s.poller(stream)
return nil
})
if err != nil {
return err
}
return nil
}
func (s *sqlResourceStore) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
if err := s.sqlDB.PingContext(ctx); err != nil {
return nil, err
}
// TODO: check the status of the watcher implementation as well
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
}
func (s *sqlResourceStore) Stop() {
s.cancel()
}
func (s *sqlResourceStore) WriteEvent(ctx context.Context, event *resource.WriteEvent) (int64, error) {
_, span := s.tracer.Start(ctx, "storage_server.WriteEvent")
defer span.End()
// TODO... actually write write the event!
return 0, ErrNotImplementedYet
}
func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.GetResource")
defer span.End()
if req.Key.Group == "" {
return &resource.ReadResponse{Status: badRequest("missing group")}, nil
}
if req.Key.Resource == "" {
return &resource.ReadResponse{Status: badRequest("missing resource")}, nil
}
fmt.Printf("TODO, GET: %+v", req.Key)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceStore) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
fmt.Printf("TODO, LIST: %+v", req.Options.Key)
return nil, ErrNotImplementedYet
}
// Get the raw blob bytes and metadata
func (s *sqlResourceStore) GetBlob(ctx context.Context, req *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
fmt.Printf("TODO, GET BLOB: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Show resource history (and trash)
func (s *sqlResourceStore) History(ctx context.Context, req *resource.HistoryRequest) (*resource.HistoryResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Used for efficient provisioning
func (s *sqlResourceStore) Origin(ctx context.Context, req *resource.OriginRequest) (*resource.OriginResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}