K8s: Implement playlist api with k8s client (#77405)

This commit is contained in:
Ryan McKinley
2023-10-31 10:26:39 -07:00
committed by GitHub
parent 254648b96b
commit dd773e74f1
15 changed files with 294 additions and 53 deletions

View File

@@ -499,14 +499,7 @@ func (hs *HTTPServer) registerRoutes() {
})
// Playlist
apiRoute.Group("/playlists", func(playlistRoute routing.RouteRegister) {
playlistRoute.Get("/", routing.Wrap(hs.SearchPlaylists))
playlistRoute.Get("/:uid", hs.ValidateOrgPlaylist, routing.Wrap(hs.GetPlaylist))
playlistRoute.Get("/:uid/items", hs.ValidateOrgPlaylist, routing.Wrap(hs.GetPlaylistItems))
playlistRoute.Delete("/:uid", reqEditorRole, hs.ValidateOrgPlaylist, routing.Wrap(hs.DeletePlaylist))
playlistRoute.Put("/:uid", reqEditorRole, hs.ValidateOrgPlaylist, routing.Wrap(hs.UpdatePlaylist))
playlistRoute.Post("/", reqEditorRole, routing.Wrap(hs.CreatePlaylist))
})
hs.registerPlaylistAPI(apiRoute)
// Search
apiRoute.Get("/search/sorting", routing.Wrap(hs.ListSortOptions))

View File

@@ -16,6 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver"
"github.com/grafana/grafana/pkg/api/avatar"
"github.com/grafana/grafana/pkg/api/routing"
httpstatic "github.com/grafana/grafana/pkg/api/static"
@@ -205,6 +207,7 @@ type HTTPServer struct {
authnService authn.Service
starApi *starApi.API
promRegister prometheus.Registerer
clientConfigProvider grafanaapiserver.DirectRestConfigProvider
}
type ServerOptions struct {
@@ -246,8 +249,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
accesscontrolService accesscontrol.Service, navTreeService navtree.Service,
annotationRepo annotations.Repository, tagService tag.Service, searchv2HTTPService searchV2.SearchHTTPService, oauthTokenService oauthtoken.OAuthTokenService,
statsService stats.Service, authnService authn.Service, pluginsCDNService *pluginscdn.Service,
starApi *starApi.API, promRegister prometheus.Registerer,
starApi *starApi.API, promRegister prometheus.Registerer, clientConfigProvider grafanaapiserver.DirectRestConfigProvider,
) (*HTTPServer, error) {
web.Env = cfg.Env
m := web.New()
@@ -348,6 +350,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
pluginsCDNService: pluginsCDNService,
starApi: starApi,
promRegister: promRegister,
clientConfigProvider: clientConfigProvider,
}
if hs.Listener != nil {
hs.log.Debug("Using provided listener")

View File

@@ -2,15 +2,145 @@ package api
import (
"net/http"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
"github.com/grafana/grafana/pkg/middleware"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grafana-apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/playlist"
"github.com/grafana/grafana/pkg/util/errutil/errhttp"
"github.com/grafana/grafana/pkg/web"
)
func (hs *HTTPServer) ValidateOrgPlaylist(c *contextmodel.ReqContext) {
type playlistAPIHandler struct {
SearchPlaylists []web.Handler
GetPlaylist []web.Handler
GetPlaylistItems []web.Handler
DeletePlaylist []web.Handler
UpdatePlaylist []web.Handler
CreatePlaylist []web.Handler
}
func chainHandlers(h ...web.Handler) []web.Handler {
return h
}
func (hs *HTTPServer) registerPlaylistAPI(apiRoute routing.RouteRegister) {
handler := playlistAPIHandler{
SearchPlaylists: chainHandlers(routing.Wrap(hs.SearchPlaylists)),
GetPlaylist: chainHandlers(hs.validateOrgPlaylist, routing.Wrap(hs.GetPlaylist)),
GetPlaylistItems: chainHandlers(hs.validateOrgPlaylist, routing.Wrap(hs.GetPlaylistItems)),
DeletePlaylist: chainHandlers(middleware.ReqEditorRole, hs.validateOrgPlaylist, routing.Wrap(hs.DeletePlaylist)),
UpdatePlaylist: chainHandlers(middleware.ReqEditorRole, hs.validateOrgPlaylist, routing.Wrap(hs.UpdatePlaylist)),
CreatePlaylist: chainHandlers(middleware.ReqEditorRole, routing.Wrap(hs.CreatePlaylist)),
}
// Alternative implementations for k8s
if hs.Features.IsEnabled(featuremgmt.FlagKubernetesPlaylistsAPI) {
namespacer := request.GetNamespaceMapper(hs.Cfg)
gvr := schema.GroupVersionResource{
Group: v0alpha1.GroupName,
Version: v0alpha1.VersionID,
Resource: "playlists",
}
clientGetter := func(c *contextmodel.ReqContext) (dynamic.ResourceInterface, bool) {
dyn, err := dynamic.NewForConfig(hs.clientConfigProvider.GetDirectRestConfig(c))
if err != nil {
c.JsonApiErr(500, "client", err)
return nil, false
}
return dyn.Resource(gvr).Namespace(namespacer(c.OrgID)), true
}
errorWriter := func(c *contextmodel.ReqContext, err error) {
//nolint:errorlint
statusError, ok := err.(*errors.StatusError)
if ok {
c.JsonApiErr(int(statusError.Status().Code),
statusError.Status().Message, err)
return
}
errhttp.Write(c.Req.Context(), err, c.Resp)
}
handler.SearchPlaylists = []web.Handler{func(c *contextmodel.ReqContext) {
client, ok := clientGetter(c)
if !ok {
return // error is already sent
}
out, err := client.List(c.Req.Context(), v1.ListOptions{})
if err != nil {
errorWriter(c, err)
return
}
query := strings.ToUpper(c.Query("query"))
playlists := []playlist.Playlist{}
for _, item := range out.Items {
p := v0alpha1.UnstructuredToLegacyPlaylist(item)
if p == nil {
continue
}
if query != "" && !strings.Contains(strings.ToUpper(p.Name), query) {
continue // query filter
}
playlists = append(playlists, *p)
}
c.JSON(http.StatusOK, playlists)
}}
handler.GetPlaylist = []web.Handler{func(c *contextmodel.ReqContext) {
client, ok := clientGetter(c)
if !ok {
return // error is already sent
}
uid := web.Params(c.Req)[":uid"]
out, err := client.Get(c.Req.Context(), uid, v1.GetOptions{})
if err != nil {
errorWriter(c, err)
return
}
c.JSON(http.StatusOK, v0alpha1.UnstructuredToLegacyPlaylistDTO(*out))
}}
handler.GetPlaylistItems = []web.Handler{func(c *contextmodel.ReqContext) {
client, ok := clientGetter(c)
if !ok {
return // error is already sent
}
uid := web.Params(c.Req)[":uid"]
out, err := client.Get(c.Req.Context(), uid, v1.GetOptions{})
if err != nil {
errorWriter(c, err)
return
}
c.JSON(http.StatusOK, v0alpha1.UnstructuredToLegacyPlaylistDTO(*out).Items)
}}
}
// Register the actual handlers
apiRoute.Group("/playlists", func(playlistRoute routing.RouteRegister) {
playlistRoute.Get("/", handler.SearchPlaylists...)
playlistRoute.Get("/:uid", handler.GetPlaylist...)
playlistRoute.Get("/:uid/items", handler.GetPlaylistItems...)
playlistRoute.Delete("/:uid", handler.DeletePlaylist...)
playlistRoute.Put("/:uid", handler.UpdatePlaylist...)
playlistRoute.Post("/", handler.CreatePlaylist...)
})
}
func (hs *HTTPServer) validateOrgPlaylist(c *contextmodel.ReqContext) {
uid := web.Params(c.Req)[":uid"]
query := playlist.GetPlaylistByUidQuery{UID: uid, OrgId: c.SignedInUser.GetOrgID()}
p, err := hs.playlistService.GetWithoutItems(c.Req.Context(), &query)

View File

@@ -1,16 +1,48 @@
package v0alpha1
import (
"encoding/json"
"fmt"
"strconv"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"github.com/grafana/grafana/pkg/kinds"
"github.com/grafana/grafana/pkg/services/grafana-apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/playlist"
)
func UnstructuredToLegacyPlaylist(item unstructured.Unstructured) *playlist.Playlist {
spec := item.Object["spec"].(map[string]any)
return &playlist.Playlist{
UID: item.GetName(),
Name: spec["title"].(string),
Interval: spec["interval"].(string),
Id: getLegacyID(&item),
}
}
func UnstructuredToLegacyPlaylistDTO(item unstructured.Unstructured) *playlist.PlaylistDTO {
spec := item.Object["spec"].(map[string]any)
dto := &playlist.PlaylistDTO{
Uid: item.GetName(),
Name: spec["title"].(string),
Interval: spec["interval"].(string),
Id: getLegacyID(&item),
}
items := spec["items"]
if items != nil {
b, err := json.Marshal(items)
if err == nil {
_ = json.Unmarshal(b, &dto.Items)
}
}
return dto
}
func convertToK8sResource(v *playlist.PlaylistDTO, namespacer request.NamespaceMapper) *Playlist {
spec := Spec{
Title: v.Name,
@@ -22,6 +54,15 @@ func convertToK8sResource(v *playlist.PlaylistDTO, namespacer request.NamespaceM
Value: item.Value,
})
}
meta := kinds.GrafanaResourceMetadata{}
meta.SetUpdatedTimestampMillis(v.UpdatedAt)
if v.Id > 0 {
meta.SetOriginInfo(&kinds.ResourceOriginInfo{
Name: "SQL",
Key: fmt.Sprintf("%d", v.Id),
})
}
return &Playlist{
ObjectMeta: metav1.ObjectMeta{
Name: v.Uid,
@@ -29,7 +70,23 @@ func convertToK8sResource(v *playlist.PlaylistDTO, namespacer request.NamespaceM
ResourceVersion: fmt.Sprintf("%d", v.UpdatedAt),
CreationTimestamp: metav1.NewTime(time.UnixMilli(v.CreatedAt)),
Namespace: namespacer(v.OrgID),
Annotations: meta.Annotations,
},
Spec: spec,
}
}
// Read legacy ID from metadata annotations
func getLegacyID(item *unstructured.Unstructured) int64 {
meta := kinds.GrafanaResourceMetadata{
Annotations: item.GetAnnotations(),
}
info := meta.GetOriginInfo()
if info != nil && info.Name == "SQL" {
i, err := strconv.ParseInt(info.Key, 10, 64)
if err == nil {
return i
}
}
return 0
}

View File

@@ -12,6 +12,7 @@ import (
func TestPlaylistConversion(t *testing.T) {
src := &playlist.PlaylistDTO{
Id: 123,
OrgID: 3,
Uid: "abc", // becomes k8s name
Name: "MyPlaylists", // becomes title
@@ -32,14 +33,19 @@ func TestPlaylistConversion(t *testing.T) {
out, err := json.MarshalIndent(dst, "", " ")
require.NoError(t, err)
//fmt.Printf("%s", string(out))
// fmt.Printf("%s", string(out))
require.JSONEq(t, `{
"metadata": {
"name": "abc",
"namespace": "org-3",
"uid": "abc",
"resourceVersion": "54321",
"creationTimestamp": "1970-01-01T00:00:12Z"
"creationTimestamp": "1970-01-01T00:00:12Z",
"annotations": {
"grafana.app/originKey": "123",
"grafana.app/originName": "SQL",
"grafana.app/updatedTimestamp": "1970-01-01T00:00:54Z"
}
},
"spec": {
"title": "MyPlaylists",

View File

@@ -60,10 +60,15 @@ const annoKeyOriginTimestamp = "grafana.app/originTimestamp"
func (m *GrafanaResourceMetadata) set(key string, val string) {
if val == "" {
delete(m.Annotations, key)
} else {
m.Annotations[key] = val
if m.Annotations != nil {
delete(m.Annotations, key)
}
return
}
if m.Annotations == nil {
m.Annotations = make(map[string]string)
}
m.Annotations[key] = val
}
func (m *GrafanaResourceMetadata) GetUpdatedTimestamp() *time.Time {
@@ -77,14 +82,23 @@ func (m *GrafanaResourceMetadata) GetUpdatedTimestamp() *time.Time {
return nil
}
func (m *GrafanaResourceMetadata) SetUpdatedTimestamp(v *time.Time) {
if v == nil {
delete(m.Annotations, annoKeyUpdatedTimestamp)
func (m *GrafanaResourceMetadata) SetUpdatedTimestampMillis(v int64) {
if v > 0 {
t := time.UnixMilli(v)
m.SetUpdatedTimestamp(&t)
} else {
m.Annotations[annoKeyUpdatedTimestamp] = v.Format(time.RFC3339)
m.SetUpdatedTimestamp(nil)
}
}
func (m *GrafanaResourceMetadata) SetUpdatedTimestamp(v *time.Time) {
txt := ""
if v != nil {
txt = v.UTC().Format(time.RFC3339)
}
m.set(annoKeyUpdatedTimestamp, txt)
}
func (m *GrafanaResourceMetadata) GetCreatedBy() string {
return m.Annotations[annoKeyCreatedBy]
}
@@ -123,13 +137,9 @@ func (m *GrafanaResourceMetadata) SetOriginInfo(info *ResourceOriginInfo) {
delete(m.Annotations, annoKeyOriginKey)
delete(m.Annotations, annoKeyOriginTimestamp)
if info != nil || info.Name != "" {
m.Annotations[annoKeyOriginName] = info.Name
if info.Path != "" {
m.Annotations[annoKeyOriginPath] = info.Path
}
if info.Key != "" {
m.Annotations[annoKeyOriginKey] = info.Key
}
m.set(annoKeyOriginName, info.Name)
m.set(annoKeyOriginKey, info.Key)
m.set(annoKeyOriginPath, info.Path)
if info.Timestamp != nil {
m.Annotations[annoKeyOriginTimestamp] = info.Timestamp.Format(time.RFC3339)
}

View File

@@ -850,6 +850,13 @@ var (
Stage: FeatureStageExperimental,
Owner: grafanaAppPlatformSquad,
},
{
Name: "kubernetesPlaylistsAPI",
Description: "Route /api/playlist API to k8s handlers",
Stage: FeatureStageExperimental,
Owner: grafanaAppPlatformSquad,
RequiresRestart: true, // changes the API routing
},
{
Name: "cloudWatchBatchQueries",
Description: "Runs CloudWatch metrics queries as separate batches",

View File

@@ -120,6 +120,7 @@ enableNativeHTTPHistogram,experimental,@grafana/hosted-grafana-team,false,false,
formatString,experimental,@grafana/grafana-bi-squad,false,false,false,true
transformationsVariableSupport,experimental,@grafana/grafana-bi-squad,false,false,false,true
kubernetesPlaylists,experimental,@grafana/grafana-app-platform-squad,false,false,false,true
kubernetesPlaylistsAPI,experimental,@grafana/grafana-app-platform-squad,false,false,true,false
cloudWatchBatchQueries,preview,@grafana/aws-datasources,false,false,false,false
navAdminSubsections,experimental,@grafana/grafana-frontend-platform,false,false,false,false
recoveryThreshold,experimental,@grafana/alerting-squad,false,false,true,false
1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
120 formatString experimental @grafana/grafana-bi-squad false false false true
121 transformationsVariableSupport experimental @grafana/grafana-bi-squad false false false true
122 kubernetesPlaylists experimental @grafana/grafana-app-platform-squad false false false true
123 kubernetesPlaylistsAPI experimental @grafana/grafana-app-platform-squad false false true false
124 cloudWatchBatchQueries preview @grafana/aws-datasources false false false false
125 navAdminSubsections experimental @grafana/grafana-frontend-platform false false false false
126 recoveryThreshold experimental @grafana/alerting-squad false false true false

View File

@@ -491,6 +491,10 @@ const (
// Use the kubernetes API in the frontend for playlists
FlagKubernetesPlaylists = "kubernetesPlaylists"
// FlagKubernetesPlaylistsAPI
// Route /api/playlist API to k8s handlers
FlagKubernetesPlaylistsAPI = "kubernetesPlaylistsAPI"
// FlagCloudWatchBatchQueries
// Runs CloudWatch metrics queries as separate batches
FlagCloudWatchBatchQueries = "cloudWatchBatchQueries"

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"path"
"strconv"
@@ -35,7 +36,6 @@ import (
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
filestorage "github.com/grafana/grafana/pkg/services/grafana-apiserver/storage/file"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/web"
)
type StorageType string
@@ -86,6 +86,13 @@ type RestConfigProvider interface {
GetRestConfig() *clientrest.Config
}
type DirectRestConfigProvider interface {
// GetDirectRestConfig returns a k8s client configuration that will use the same
// logged logged in user as the current request context. This is useful when
// creating clients that map legacy API handlers to k8s backed services
GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Config
}
type service struct {
*services.BasicService
@@ -96,7 +103,7 @@ type service struct {
stoppedCh chan error
rr routing.RouteRegister
handler web.Handler
handler http.Handler
builders []APIGroupBuilder
tracing *tracing.TracingService
@@ -133,10 +140,24 @@ func ProvideService(
return
}
if handle, ok := s.handler.(func(c *contextmodel.ReqContext)); ok {
handle(c)
return
req := c.Req
if req.URL.Path == "" {
req.URL.Path = "/"
}
//TODO: add support for the existing MetricsEndpointBasicAuth config option
if req.URL.Path == "/apiserver-metrics" {
req.URL.Path = "/metrics"
}
ctx := req.Context()
signedInUser := appcontext.MustUser(ctx)
req.Header.Set("X-Remote-User", strconv.FormatInt(signedInUser.UserID, 10))
req.Header.Set("X-Remote-Group", "grafana")
resp := responsewriter.WrapForHTTP1Or2(c.Resp)
s.handler.ServeHTTP(resp, req)
}
k8sRoute.Any("/", middleware.ReqSignedIn, handler)
k8sRoute.Any("/*", middleware.ReqSignedIn, handler)
@@ -301,27 +322,8 @@ func (s *service) start(ctx context.Context) error {
}
}
// TODO: this is a hack. see note in ProvideService
s.handler = func(c *contextmodel.ReqContext) {
req := c.Req
if req.URL.Path == "" {
req.URL.Path = "/"
}
//TODO: add support for the existing MetricsEndpointBasicAuth config option
if req.URL.Path == "/apiserver-metrics" {
req.URL.Path = "/metrics"
}
ctx := req.Context()
signedInUser := appcontext.MustUser(ctx)
req.Header.Set("X-Remote-User", strconv.FormatInt(signedInUser.UserID, 10))
req.Header.Set("X-Remote-Group", "grafana")
resp := responsewriter.WrapForHTTP1Or2(c.Resp)
server.Handler.ServeHTTP(resp, req)
}
// Used by the proxy wrapper registered in ProvideService
s.handler = server.Handler
// skip starting the server in prod mode
if !s.config.devMode {
@@ -335,6 +337,19 @@ func (s *service) start(ctx context.Context) error {
return nil
}
func (s *service) GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Config {
return &clientrest.Config{
Transport: &roundTripperFunc{
fn: func(req *http.Request) (*http.Response, error) {
ctx := appcontext.WithUser(req.Context(), c.SignedInUser)
w := httptest.NewRecorder()
s.handler.ServeHTTP(w, req.WithContext(ctx))
return w.Result(), nil
},
},
}
}
func (s *service) running(ctx context.Context) error {
// skip waiting for the server in prod mode
if !s.config.devMode {
@@ -383,3 +398,11 @@ func (s *service) ensureKubeConfig() error {
return clientcmd.WriteToFile(clientConfig, path.Join(s.config.dataPath, "grafana.kubeconfig"))
}
type roundTripperFunc struct {
fn func(req *http.Request) (*http.Response, error)
}
func (f *roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f.fn(req)
}

View File

@@ -11,5 +11,6 @@ var WireSet = wire.NewSet(
wire.Bind(new(RestConfigProvider), new(*service)),
wire.Bind(new(Service), new(*service)),
wire.Bind(new(APIRegistrar), new(*service)),
wire.Bind(new(DirectRestConfigProvider), new(*service)),
authorizer.WireSet,
)

View File

@@ -47,6 +47,9 @@ type PlaylistDTO struct {
// Returned for k8s
OrgID int64 `json:"-"`
// Returned for k8s and added as an annotation
Id int64 `json:"-"`
}
type PlaylistItemDTO struct {

View File

@@ -68,6 +68,7 @@ func (s *Service) Get(ctx context.Context, q *playlist.GetPlaylistByUidQuery) (*
}
}
return &playlist.PlaylistDTO{
Id: v.Id,
Uid: v.UID,
Name: v.Name,
Interval: v.Interval,