Storage: add support for snapshots, dataframes, and raw json objects (#57934)

This commit is contained in:
Ryan McKinley 2022-11-01 08:28:13 -07:00 committed by GitHub
parent 852d069a3c
commit 5736b46962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 346 additions and 25 deletions

View File

@ -9,6 +9,7 @@ import "context"
const (
StandardKindDashboard = "dashboard"
StandardKindPlaylist = "playlist"
StandardKindSnapshot = "snapshot"
StandardKindFolder = "folder"
// StandardKindDataSource: not a real kind yet, but used to define references from dashboards
@ -28,6 +29,12 @@ const (
// StandardKindGeoJSON represents spatial data
StandardKindGeoJSON = "geojson"
// StandardKindDataFrame data frame
StandardKindDataFrame = "frame"
// StandardKindJSONObj generic json object
StandardKindJSONObj = "jsonobj"
// StandardKindQuery early development on panel query library
// the kind may need to change to better encapsulate { targets:[], transforms:[] }
StandardKindQuery = "query"

View File

@ -9,9 +9,11 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/dashboardsnapshots"
"github.com/grafana/grafana/pkg/services/playlist"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/kind/snapshot"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/services/user"
)
@ -26,16 +28,26 @@ type objectStoreJob struct {
cfg ExportConfig
broadcaster statusBroadcaster
stopRequested bool
user *user.SignedInUser
sess *session.SessionDB
playlistService playlist.Service
store object.ObjectStoreServer
sess *session.SessionDB
playlistService playlist.Service
store object.ObjectStoreServer
dashboardsnapshots dashboardsnapshots.Service
}
func startObjectStoreJob(cfg ExportConfig, broadcaster statusBroadcaster, db db.DB, playlistService playlist.Service, store object.ObjectStoreServer) (Job, error) {
func startObjectStoreJob(user *user.SignedInUser,
cfg ExportConfig,
broadcaster statusBroadcaster,
db db.DB,
playlistService playlist.Service,
store object.ObjectStoreServer,
dashboardsnapshots dashboardsnapshots.Service,
) (Job, error) {
job := &objectStoreJob{
logger: log.New("export_to_object_store_job"),
cfg: cfg,
user: user,
broadcaster: broadcaster,
status: ExportStatus{
Running: true,
@ -44,9 +56,10 @@ func startObjectStoreJob(cfg ExportConfig, broadcaster statusBroadcaster, db db.
Count: make(map[string]int, 10),
Index: 0,
},
sess: db.GetSqlxSession(),
playlistService: playlistService,
store: store,
sess: db.GetSqlxSession(),
playlistService: playlistService,
store: store,
dashboardsnapshots: dashboardsnapshots,
}
broadcaster(job.status)
@ -170,6 +183,70 @@ func (e *objectStoreJob) start() {
e.status.Last = fmt.Sprintf("ITEM: %s", playlist.Uid)
e.broadcaster(e.status)
}
// TODO.. query lookup
orgIDs := []int64{1}
what = "snapshot"
for _, orgId := range orgIDs {
cmd := &dashboardsnapshots.GetDashboardSnapshotsQuery{
OrgId: orgId,
Limit: 500000,
SignedInUser: e.user,
}
err := e.dashboardsnapshots.SearchDashboardSnapshots(ctx, cmd)
if err != nil {
e.status.Status = "error: " + err.Error()
return
}
for _, dto := range cmd.Result {
m := snapshot.Model{
Name: dto.Name,
ExternalURL: dto.ExternalUrl,
Expires: dto.Expires.UnixMilli(),
}
rowUser.OrgID = dto.OrgId
rowUser.UserID = dto.UserId
snapcmd := &dashboardsnapshots.GetDashboardSnapshotQuery{
Key: dto.Key,
}
err = e.dashboardsnapshots.GetDashboardSnapshot(ctx, snapcmd)
if err == nil {
res := snapcmd.Result
m.DeleteKey = res.DeleteKey
m.ExternalURL = res.ExternalUrl
snap := res.Dashboard
m.DashboardUID = snap.Get("uid").MustString("")
snap.Del("uid")
snap.Del("id")
b, _ := snap.MarshalJSON()
m.Snapshot = b
}
_, err = e.store.Write(ctx, &object.WriteObjectRequest{
GRN: &object.GRN{
Scope: models.ObjectStoreScopeEntity,
UID: dto.Key,
Kind: models.StandardKindSnapshot,
},
Body: prettyJSON(m),
Comment: "export from snapshtts",
})
if err != nil {
e.status.Status = "error: " + err.Error()
return
}
e.status.Changed = time.Now().UnixMilli()
e.status.Index++
e.status.Count[what] += 1
e.status.Last = fmt.Sprintf("ITEM: %s", dto.Name)
e.broadcaster(e.status)
}
}
}
type dashInfo struct {

View File

@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/playlist"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/setting"
)
@ -223,6 +224,7 @@ func (ex *StandardExport) HandleRequestExport(c *models.ReqContext) response.Res
return response.Error(http.StatusLocked, "export already running", nil)
}
user := store.UserFromContext(c.Req.Context())
var job Job
broadcast := func(s ExportStatus) {
ex.broadcastStatus(c.OrgID, s)
@ -231,7 +233,7 @@ func (ex *StandardExport) HandleRequestExport(c *models.ReqContext) response.Res
case "dummy":
job, err = startDummyExportJob(cfg, broadcast)
case "objectStore":
job, err = startObjectStoreJob(cfg, broadcast, ex.db, ex.playlistService, ex.store)
job, err = startObjectStoreJob(user, cfg, broadcast, ex.db, ex.playlistService, ex.store, ex.dashboardsnapshotsService)
case "git":
dir := filepath.Join(ex.dataDir, "export_git", fmt.Sprintf("git_%d", time.Now().Unix()))
if err := os.MkdirAll(dir, os.ModePerm); err != nil {

View File

@ -0,0 +1,50 @@
package dataframe
import (
"context"
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/store"
)
func GetObjectKindInfo() models.ObjectKindInfo {
return models.ObjectKindInfo{
ID: models.StandardKindDataFrame,
Name: "Data frame",
Description: "Data frame",
}
}
func GetObjectSummaryBuilder() models.ObjectSummaryBuilder {
return func(ctx context.Context, uid string, body []byte) (*models.ObjectSummary, []byte, error) {
df := &data.Frame{}
err := json.Unmarshal(body, df)
if err != nil {
return nil, nil, err
}
rows, err := df.RowLen()
if err != nil {
return nil, nil, err
}
out, err := data.FrameToJSON(df, data.IncludeAll)
if err != nil {
return nil, nil, err
}
summary := &models.ObjectSummary{
Kind: models.StandardKindDataFrame,
Name: df.Name,
UID: uid,
Fields: map[string]interface{}{
"rows": rows,
"cols": len(df.Fields),
},
}
if summary.Name == "" {
summary.Name = store.GuessNameFromUID(uid)
}
return summary, out, err
}
}

View File

@ -0,0 +1,42 @@
package dataframe
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
func TestDataFrameSummary(t *testing.T) {
df := data.NewFrame("http_requests_total",
data.NewField("timestamp", nil, []time.Time{time.Now(), time.Now(), time.Now()}).SetConfig(&data.FieldConfig{
DisplayName: "A time Column.",
}),
data.NewField("value", data.Labels{"service": "auth"}, []float64{1.0, 2.0, 3.0}),
data.NewField("category", data.Labels{"service": "auth"}, []string{"foo", "bar", "test"}),
data.NewField("valid", data.Labels{"service": "auth"}, []bool{true, false, true}),
)
in, err := data.FrameToJSON(df, data.IncludeAll)
require.NoError(t, err)
summary, out, err := GetObjectSummaryBuilder()(context.Background(), "somthing", in)
require.NoError(t, err)
require.Equal(t, in, out) // same json
asjson, err := json.MarshalIndent(summary, "", " ")
// fmt.Printf(string(asjson))
require.NoError(t, err)
require.JSONEq(t, `{
"uid": "somthing",
"kind": "frame",
"name": "http_requests_total",
"fields": {
"cols": 4,
"rows": 3
}
}`, string(asjson))
}

View File

@ -0,0 +1,37 @@
package jsonobj
import (
"context"
"encoding/json"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/store"
)
func GetObjectKindInfo() models.ObjectKindInfo {
return models.ObjectKindInfo{
ID: models.StandardKindJSONObj,
Name: "JSON Object",
Description: "JSON Object",
}
}
func GetObjectSummaryBuilder() models.ObjectSummaryBuilder {
return func(ctx context.Context, uid string, body []byte) (*models.ObjectSummary, []byte, error) {
v := make(map[string]interface{})
err := json.Unmarshal(body, &v)
if err != nil {
return nil, nil, err
}
out, err := json.MarshalIndent(v, "", " ")
if err != nil {
return nil, nil, err
}
return &models.ObjectSummary{
Kind: models.StandardKindJSONObj,
Name: store.GuessNameFromUID(uid),
UID: uid,
}, out, err
}
}

View File

@ -0,0 +1,38 @@
package jsonobj
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
func TestDataFrameSummary(t *testing.T) {
// Just creating a JSON blob
df := data.NewFrame("http_requests_total",
data.NewField("timestamp", nil, []time.Time{time.Now(), time.Now(), time.Now()}).SetConfig(&data.FieldConfig{
DisplayName: "A time Column.",
}),
data.NewField("value", data.Labels{"service": "auth"}, []float64{1.0, 2.0, 3.0}),
data.NewField("category", data.Labels{"service": "auth"}, []string{"foo", "bar", "test"}),
data.NewField("valid", data.Labels{"service": "auth"}, []bool{true, false, true}),
)
in, err := data.FrameToJSON(df, data.IncludeAll)
require.NoError(t, err)
summary, out, err := GetObjectSummaryBuilder()(context.Background(), "path/to/item", in)
require.NoError(t, err)
require.JSONEq(t, string(in), string(out)) // same json
asjson, err := json.MarshalIndent(summary, "", " ")
// fmt.Printf(string(asjson))
require.NoError(t, err)
require.JSONEq(t, `{
"name": "item",
"uid": "path/to/item",
"kind": "jsonobj"
}`, string(asjson))
}

View File

@ -8,11 +8,13 @@ import (
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/rendering"
"github.com/grafana/grafana/pkg/services/store/kind/dashboard"
"github.com/grafana/grafana/pkg/services/store/kind/dummy"
"github.com/grafana/grafana/pkg/services/store/kind/dataframe"
"github.com/grafana/grafana/pkg/services/store/kind/folder"
"github.com/grafana/grafana/pkg/services/store/kind/geojson"
"github.com/grafana/grafana/pkg/services/store/kind/jsonobj"
"github.com/grafana/grafana/pkg/services/store/kind/playlist"
"github.com/grafana/grafana/pkg/services/store/kind/png"
"github.com/grafana/grafana/pkg/services/store/kind/snapshot"
"github.com/grafana/grafana/pkg/services/store/kind/svg"
"github.com/grafana/grafana/pkg/setting"
)
@ -35,6 +37,10 @@ func NewKindRegistry() KindRegistry {
info: dashboard.GetObjectKindInfo(),
builder: dashboard.GetObjectSummaryBuilder(),
}
kinds[models.StandardKindSnapshot] = &kindValues{
info: snapshot.GetObjectKindInfo(),
builder: snapshot.GetObjectSummaryBuilder(),
}
kinds[models.StandardKindFolder] = &kindValues{
info: folder.GetObjectKindInfo(),
builder: folder.GetObjectSummaryBuilder(),
@ -47,13 +53,13 @@ func NewKindRegistry() KindRegistry {
info: geojson.GetObjectKindInfo(),
builder: geojson.GetObjectSummaryBuilder(),
}
// FIXME -- these are registered because existing tests use them
for _, k := range []string{"dummy", "kind1", "kind2", "kind3"} {
kinds[k] = &kindValues{
info: dummy.GetObjectKindInfo(k),
builder: dummy.GetObjectSummaryBuilder(k),
}
kinds[models.StandardKindDataFrame] = &kindValues{
info: dataframe.GetObjectKindInfo(),
builder: dataframe.GetObjectSummaryBuilder(),
}
kinds[models.StandardKindJSONObj] = &kindValues{
info: jsonobj.GetObjectKindInfo(),
builder: jsonobj.GetObjectSummaryBuilder(),
}
// create a registry

View File

@ -20,14 +20,13 @@ func TestKindRegistry(t *testing.T) {
}
require.Equal(t, []string{
"dashboard",
"dummy",
"folder",
"frame",
"geojson",
"kind1",
"kind2",
"kind3",
"jsonobj",
"playlist",
"png",
"snapshot",
"test",
}, ids)

View File

@ -0,0 +1,62 @@
package snapshot
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/grafana/pkg/models"
)
// A snapshot is a dashboard with no external queries and a few additional properties
type Model struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
DeleteKey string `json:"deleteKey"`
ExternalURL string `json:"externalURL"`
Expires int64 `json:"expires,omitempty"` // time that this expires
DashboardUID string `json:"dashboard,omitempty"`
Snapshot json.RawMessage `json:"snapshot,omitempty"`
}
func GetObjectKindInfo() models.ObjectKindInfo {
return models.ObjectKindInfo{
ID: models.StandardKindSnapshot,
Name: "Snapshot",
}
}
func GetObjectSummaryBuilder() models.ObjectSummaryBuilder {
return func(ctx context.Context, uid string, body []byte) (*models.ObjectSummary, []byte, error) {
obj := &Model{}
err := json.Unmarshal(body, obj)
if err != nil {
return nil, nil, err // unable to read object
}
if obj.Name == "" {
return nil, nil, fmt.Errorf("expected snapshot name")
}
if obj.DeleteKey == "" {
return nil, nil, fmt.Errorf("expected delete key")
}
summary := &models.ObjectSummary{
Kind: models.StandardKindFolder,
Name: obj.Name,
Description: obj.Description,
UID: uid,
Fields: map[string]interface{}{
"deleteKey": obj.DeleteKey,
"externalURL": obj.ExternalURL,
"expires": obj.Expires,
},
References: []*models.ObjectExternalReference{
{Kind: models.StandardKindDashboard, UID: obj.DashboardUID},
},
}
// Keep the original body
return summary, body, err
}
}

View File

@ -32,7 +32,7 @@ type RawObjectWithHistory struct {
var (
// increment when RawObject changes
rawObjectVersion = 7
rawObjectVersion = 8
)
func ProvideDummyObjectServer(cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry) object.ObjectStoreServer {

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
@ -115,7 +116,7 @@ func TestObjectServer(t *testing.T) {
fakeUser := fmt.Sprintf("user:%d:%s", testCtx.user.UserID, testCtx.user.Login)
firstVersion := "1"
kind := "dummy"
kind := models.StandardKindJSONObj
grn := &object.GRN{
Kind: kind,
UID: "my-test-entity",
@ -292,7 +293,7 @@ func TestObjectServer(t *testing.T) {
uid2 := "uid2"
uid3 := "uid3"
uid4 := "uid4"
kind2 := "kind2"
kind2 := models.StandardKindPlaylist
w1, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
GRN: grn,
Body: body,
@ -342,7 +343,7 @@ func TestObjectServer(t *testing.T) {
version = append(version, res.Version)
}
require.Equal(t, []string{"my-test-entity", "uid2", "uid3", "uid4"}, uids)
require.Equal(t, []string{"dummy", "dummy", "kind2", "kind2"}, kinds)
require.Equal(t, []string{"jsonobj", "jsonobj", "playlist", "playlist"}, kinds)
require.Equal(t, []string{
w1.Object.Version,
w2.Object.Version,
@ -364,7 +365,7 @@ func TestObjectServer(t *testing.T) {
version = append(version, res.Version)
}
require.Equal(t, []string{"my-test-entity", "uid2"}, uids)
require.Equal(t, []string{"dummy", "dummy"}, kinds)
require.Equal(t, []string{"jsonobj", "jsonobj"}, kinds)
require.Equal(t, []string{
w1.Object.Version,
w2.Object.Version,