Storage: Add HTTP endpoint for object store service (#56214)

This commit is contained in:
Ryan McKinley 2022-10-04 09:40:15 -07:00 committed by GitHub
parent bba6eb1f2d
commit d5e2713168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 309 additions and 1 deletions

View File

@ -261,7 +261,13 @@ func (hs *HTTPServer) registerRoutes() {
})
if hs.Features.IsEnabled(featuremgmt.FlagStorage) {
// Will eventually be replaced with the 'object' route
apiRoute.Group("/storage", hs.StorageService.RegisterHTTPRoutes)
// Allow HTTP access to the object storage feature (dev only for now)
if hs.Features.IsEnabled(featuremgmt.FlagGrpcServer) {
apiRoute.Group("/object", hs.httpObjectStore.RegisterHTTPRoutes)
}
}
if hs.Features.IsEnabled(featuremgmt.FlagPanelTitleSearch) {

View File

@ -16,6 +16,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/middleware/csrf"
"github.com/grafana/grafana/pkg/services/searchV2"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/services/userauth"
"github.com/prometheus/client_golang/prometheus"
@ -140,6 +141,7 @@ type HTTPServer struct {
ThumbService thumbs.Service
ExportService export.ExportService
StorageService store.StorageService
httpObjectStore object.HTTPObjectStore
SearchV2HTTPService searchV2.SearchHTTPService
ContextHandler *contexthandler.ContextHandler
SQLStore sqlstore.Store
@ -225,7 +227,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
pluginsUpdateChecker *updatechecker.PluginsService, searchUsersService searchusers.Service,
dataSourcesService datasources.DataSourceService, queryDataService *query.Service,
ldapGroups ldap.Groups, teamGuardian teamguardian.TeamGuardian, serviceaccountsService serviceaccounts.Service,
authInfoService login.AuthInfoService, storageService store.StorageService,
authInfoService login.AuthInfoService, storageService store.StorageService, httpObjectStore object.HTTPObjectStore,
notificationService *notifications.NotificationService, dashboardService dashboards.DashboardService,
dashboardProvisioningService dashboards.DashboardProvisioningService, folderService dashboards.FolderService,
datasourcePermissionsService permissions.DatasourcePermissionsService, alertNotificationService *alerting.AlertNotificationService,
@ -302,6 +304,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
secretsMigrator: secretsMigrator,
secretsPluginMigrator: secretsPluginMigrator,
secretsStore: secretsStore,
httpObjectStore: httpObjectStore,
DataSourcesService: dataSourcesService,
searchUsersService: searchUsersService,
ldapGroups: ldapGroups,

View File

@ -121,6 +121,7 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore/mockstore"
"github.com/grafana/grafana/pkg/services/star/starimpl"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/object"
objectdummyserver "github.com/grafana/grafana/pkg/services/store/object/dummy"
"github.com/grafana/grafana/pkg/services/store/sanitizer"
"github.com/grafana/grafana/pkg/services/tag"
@ -347,6 +348,7 @@ var wireBasicSet = wire.NewSet(
grpcserver.ProvideHealthService,
grpcserver.ProvideReflectionService,
objectdummyserver.ProvideDummyObjectServer,
object.ProvideHTTPObjectStore,
teamimpl.ProvideService,
tempuserimpl.ProvideService,
loginattemptimpl.ProvideService,

View File

@ -0,0 +1,196 @@
package object
import (
"io"
"net/http"
"strings"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/web"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/models"
)
type HTTPObjectStore interface {
// Register HTTP Access to the store
RegisterHTTPRoutes(routing.RouteRegister)
}
type httpObjectStore struct {
store ObjectStoreServer
log log.Logger
}
func ProvideHTTPObjectStore(store ObjectStoreServer) HTTPObjectStore {
return &httpObjectStore{
store: store,
log: log.New("http-object-store"),
}
}
// All registered under "api/object"
func (s *httpObjectStore) RegisterHTTPRoutes(route routing.RouteRegister) {
// For now, require admin for everything
reqGrafanaAdmin := middleware.ReqSignedIn //.ReqGrafanaAdmin
// Every * must parse to a GRN (uid+kind)
route.Get("/store/*", reqGrafanaAdmin, routing.Wrap(s.doGetObject))
route.Get("/raw/*", reqGrafanaAdmin, routing.Wrap(s.doGetRawObject))
route.Post("/store/*", reqGrafanaAdmin, routing.Wrap(s.doWriteObject))
route.Delete("/store/*", reqGrafanaAdmin, routing.Wrap(s.doDeleteObject))
route.Get("/history/*", reqGrafanaAdmin, routing.Wrap(s.doGetHistory))
route.Get("/list/*", reqGrafanaAdmin, routing.Wrap(s.doListFolder)) // Simplified version of search -- path is prefix
route.Get("/search", reqGrafanaAdmin, routing.Wrap(s.doSearch))
}
// This function will extract UID+Kind from the requested path "*" in our router
// This is far from ideal! but is at least consistent for these endpoints.
// This will quickly be revisited as we explore how to encode UID+Kind in a "GRN" format
func parseRequestParams(req *http.Request) (uid string, kind string, params map[string]string) {
params = web.Params(req)
path := params["*"]
idx := strings.LastIndex(path, ".")
if idx > 0 {
uid = path[:idx]
kind = path[idx:]
} else {
uid = path
kind = "?"
}
return
}
func (s *httpObjectStore) doGetObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Read(c.Req.Context(), &ReadObjectRequest{
UID: uid,
Kind: kind,
Version: params["version"], // ?version = XYZ
WithBody: true, // ?? allow false?
WithSummary: true, // ?? allow false?
})
if err != nil {
return response.Error(500, "error fetching object", err)
}
if rsp.Object == nil {
return response.Error(404, "not found", nil)
}
// Configure etag support
currentEtag := rsp.Object.ETag
previousEtag := c.Req.Header.Get("If-None-Match")
if previousEtag == currentEtag {
return response.CreateNormalResponse(
http.Header{
"ETag": []string{rsp.Object.ETag},
},
[]byte{}, // nothing
http.StatusNotModified, // 304
)
}
c.Resp.Header().Set("ETag", currentEtag)
return response.JSON(200, rsp)
}
func (s *httpObjectStore) doGetRawObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Read(c.Req.Context(), &ReadObjectRequest{
UID: uid,
Kind: kind,
Version: params["version"], // ?version = XYZ
WithBody: true,
WithSummary: false,
})
if err != nil {
return response.Error(500, "?", err)
}
if rsp.Object != nil && rsp.Object.Body != nil {
// Configure etag support
currentEtag := rsp.Object.ETag
previousEtag := c.Req.Header.Get("If-None-Match")
if previousEtag == currentEtag {
return response.CreateNormalResponse(
http.Header{
"ETag": []string{rsp.Object.ETag},
},
[]byte{}, // nothing
http.StatusNotModified, // 304
)
}
return response.CreateNormalResponse(
http.Header{
"Content-Type": []string{"application/json"}, // TODO, based on kind!!!
"ETag": []string{currentEtag},
},
rsp.Object.Body,
200,
)
}
return response.JSON(400, rsp) // ???
}
const MAX_UPLOAD_SIZE = 5 * 1024 * 1024 // 5MB
func (s *httpObjectStore) doWriteObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
// Cap the max size
c.Req.Body = http.MaxBytesReader(c.Resp, c.Req.Body, MAX_UPLOAD_SIZE)
b, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(400, "error reading body", err)
}
rsp, err := s.store.Write(c.Req.Context(), &WriteObjectRequest{
UID: uid,
Kind: kind,
Body: b,
Comment: params["comment"],
PreviousVersion: params["previous"],
})
if err != nil {
return response.Error(500, "?", err)
}
return response.JSON(200, rsp)
}
func (s *httpObjectStore) doDeleteObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Delete(c.Req.Context(), &DeleteObjectRequest{
UID: uid,
Kind: kind,
PreviousVersion: params["previous"],
})
if err != nil {
return response.Error(500, "?", err)
}
return response.JSON(200, rsp)
}
func (s *httpObjectStore) doGetHistory(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
limit := int64(20) // params
rsp, err := s.store.History(c.Req.Context(), &ObjectHistoryRequest{
UID: uid,
Kind: kind,
Limit: limit,
NextPageToken: params["nextPageToken"],
})
if err != nil {
return response.Error(500, "?", err)
}
return response.JSON(200, rsp)
}
func (s *httpObjectStore) doListFolder(c *models.ReqContext) response.Response {
return response.JSON(501, "Not implemented yet")
}
func (s *httpObjectStore) doSearch(c *models.ReqContext) response.Response {
return response.JSON(501, "Not implemented yet")
}

View File

@ -0,0 +1,101 @@
package object
import (
"encoding/json"
"unsafe"
jsoniter "github.com/json-iterator/go"
)
func init() { //nolint:gochecknoinits
//jsoniter.RegisterTypeEncoder("object.ReadObjectResponse", &readObjectResponseCodec{})
jsoniter.RegisterTypeEncoder("object.RawObject", &rawObjectCodec{})
}
// Unlike the standard JSON marshal, this will write bytes as JSON when it can
type rawObjectCodec struct{}
// Custom marshal for RawObject (if JSON body)
func (obj *RawObject) MarshalJSON() ([]byte, error) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
return json.Marshal(obj)
}
func (codec *rawObjectCodec) IsEmpty(ptr unsafe.Pointer) bool {
f := (*RawObject)(ptr)
return f.UID == "" && f.Body == nil
}
func (codec *rawObjectCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
obj := (*RawObject)(ptr)
stream.WriteObjectStart()
stream.WriteObjectField("UID")
stream.WriteString(obj.UID)
if obj.Kind != "" {
stream.WriteMore()
stream.WriteObjectField("kind")
stream.WriteString(obj.Kind)
}
if obj.Created > 0 {
stream.WriteMore()
stream.WriteObjectField("created")
stream.WriteInt64(obj.Created)
}
if obj.CreatedBy != nil {
stream.WriteMore()
stream.WriteObjectField("createdBy")
stream.WriteVal(obj.CreatedBy)
}
if obj.Modified > 0 {
stream.WriteMore()
stream.WriteObjectField("modified")
stream.WriteInt64(obj.Modified)
}
if obj.ModifiedBy != nil {
stream.WriteMore()
stream.WriteObjectField("modifiedBy")
stream.WriteVal(obj.ModifiedBy)
}
if obj.Size > 0 {
stream.WriteMore()
stream.WriteObjectField("size")
stream.WriteInt64(obj.Size)
}
if obj.ETag != "" {
stream.WriteMore()
stream.WriteObjectField("etag")
stream.WriteString(obj.ETag)
}
if obj.Version != "" {
stream.WriteMore()
stream.WriteObjectField("version")
stream.WriteString(obj.Version)
}
// The one real difference (encodes JSON things directly)
if obj.Body != nil {
stream.WriteMore()
stream.WriteObjectField("body")
if json.Valid(obj.Body) {
stream.WriteRaw(string(obj.Body)) // works for strings
} else {
stream.WriteString("// link to raw bytes //")
//stream.WriteVal(obj.Body)
}
}
if obj.SyncSrc != "" {
stream.WriteMore()
stream.WriteObjectField("syncSrc")
stream.WriteString(obj.SyncSrc)
}
if obj.SyncTime > 0 {
stream.WriteMore()
stream.WriteObjectField("syncTime")
stream.WriteInt64(obj.SyncTime)
}
stream.WriteObjectEnd()
}