Files
grafana/pkg/storage/unified/resource/index_server.go
owensmallwood 612b864772 Indexing PoC: Add search/browse (#94126)
* adds Filter gRPC and make protobuf

* adds route for querying the filter gRPC

* wires up Filter gRPC call

* [WIP] index from start

* renames gRPC endpoint to "Search"

* adds /apis/search route into k8s routes. Hacky for now.

* updates readme - wrong casing

* adds feature toggle for unified storage search

* hides US search behind feature flag. Clean up print statements.

* removes indexer - will be added in another PR

* Search: Add API Builder

* adds required method

* implementing UpdateAPIGroupInfo (WIP)

* adds groupversion

* commenting out for now

* remove unneeded code from experimenting and update register.go to match interface required

* list resources and load into index

* pass context

* namespaces search route

* lint

* watch

* add todo

* add todo

* merge

* cleanup

* add todo

* gen protobuf

* lint; fix migration issue

* Updates index mapping function to map unified storage object Value

* Changes Index() to pointer receiver - fixes panic

* add delete

* cleanup

* gets search/browse functioning. Results show up as base64 encoded. Still a WIP.

* Doesnt json re-encode gRPC response in search handler

* add kind to SearchRequest proto

* Updates query interface to be more generic. Make proto. Parses query params in api server.

* make protobuf

* removes unused method and imports

* Returns all indexed fields in search results. Adds pagination support (limit + offset).

* remove comment

* remove unused struct

* gets tenant in search k8s api handler

* adds hardcoded spec field mappings - starting with playlists

* adds all spec fields to search results

* moved helper function for field mappings into index

* only includes allowed spec fields in search results

* cleans up error handling

* removes debug log

---------

Co-authored-by: leonorfmartins <leonorfmartins@gmail.com>
Co-authored-by: Todd Treece <todd.treece@grafana.com>
Co-authored-by: Scott Lepper <scott.lepper@gmail.com>
2024-10-09 11:20:05 -06:00

227 lines
4.4 KiB
Go

package resource
import (
"context"
"encoding/json"
"errors"
"log"
"strings"
"google.golang.org/grpc"
)
type IndexServer struct {
ResourceServer
s *server
index *Index
ws *indexWatchServer
}
func (is *IndexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) {
results, err := is.index.Search(ctx, req.Tenant, req.Query, int(req.Limit), int(req.Offset))
if err != nil {
return nil, err
}
res := &SearchResponse{}
for _, r := range results {
resJsonBytes, err := json.Marshal(r)
if err != nil {
return nil, err
}
res.Items = append(res.Items, &ResourceWrapper{Value: resJsonBytes})
}
return res, nil
}
func (is *IndexServer) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) {
return nil, nil
}
func (is *IndexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) {
return nil, nil
}
// Load the index
func (is *IndexServer) Load(ctx context.Context) error {
is.index = NewIndex(is.s, Opts{})
err := is.index.Init(ctx)
if err != nil {
return err
}
return nil
}
// Watch resources for changes and update the index
func (is *IndexServer) Watch(ctx context.Context) error {
rtList := fetchResourceTypes()
for _, rt := range rtList {
wr := &WatchRequest{
Options: rt,
}
go func() {
// TODO: handle error
err := is.s.Watch(wr, is.ws)
if err != nil {
log.Printf("Error watching resource %v", err)
}
}()
}
return nil
}
// Init sets the resource server on the index server
// so we can call the resource server from the index server
// TODO: a chicken and egg problem - index server needs the resource server but the resource server is created with the index server
func (is *IndexServer) Init(ctx context.Context, rs *server) error {
is.s = rs
is.ws = &indexWatchServer{
is: is,
context: ctx,
}
return nil
}
func NewResourceIndexServer() ResourceIndexServer {
return &IndexServer{}
}
type ResourceIndexer interface {
Index(ctx context.Context) (*Index, error)
}
type indexWatchServer struct {
grpc.ServerStream
context context.Context
is *IndexServer
}
func (f *indexWatchServer) Send(we *WatchEvent) error {
if we.Type == WatchEvent_ADDED {
return f.Add(we)
}
if we.Type == WatchEvent_DELETED {
return f.Delete(we)
}
if we.Type == WatchEvent_MODIFIED {
return f.Update(we)
}
return nil
}
func (f *indexWatchServer) RecvMsg(m interface{}) error {
return nil
}
func (f *indexWatchServer) SendMsg(m interface{}) error {
return errors.New("not implemented")
}
func (f *indexWatchServer) Context() context.Context {
if f.context == nil {
f.context = context.Background()
}
return f.context
}
func (f *indexWatchServer) Index() *Index {
return f.is.index
}
func (f *indexWatchServer) Add(we *WatchEvent) error {
data, err := getData(we.Resource)
if err != nil {
return err
}
err = f.Index().Index(f.context, data)
if err != nil {
return err
}
return nil
}
func (f *indexWatchServer) Delete(we *WatchEvent) error {
rs, err := resource(we)
if err != nil {
return err
}
data, err := getData(rs)
if err != nil {
return err
}
err = f.Index().Delete(f.context, data.Uid, data.Key)
if err != nil {
return err
}
return nil
}
func (f *indexWatchServer) Update(we *WatchEvent) error {
rs, err := resource(we)
if err != nil {
return err
}
data, err := getData(rs)
if err != nil {
return err
}
err = f.Index().Delete(f.context, data.Uid, data.Key)
if err != nil {
return err
}
err = f.Index().Index(f.context, data)
if err != nil {
return err
}
return nil
}
type Data struct {
Key *ResourceKey
Value *ResourceWrapper
Uid string
}
func getGroup(r *Resource) string {
v := strings.Split(r.ApiVersion, "/")
if len(v) > 0 {
return v[0]
}
return ""
}
func getData(wr *WatchEvent_Resource) (*Data, error) {
r, err := getResource(wr.Value)
if err != nil {
return nil, err
}
key := &ResourceKey{
Group: getGroup(r),
Resource: r.Kind,
Namespace: r.Metadata.Namespace,
Name: r.Metadata.Name,
}
value := &ResourceWrapper{
ResourceVersion: wr.Version,
Value: wr.Value,
}
return &Data{Key: key, Value: value, Uid: r.Metadata.Uid}, nil
}
func resource(we *WatchEvent) (*WatchEvent_Resource, error) {
rs := we.Resource
if rs == nil || len(rs.Value) == 0 {
// for updates/deletes
rs = we.Previous
}
if rs == nil || len(rs.Value) == 0 {
return nil, errors.New("resource not found")
}
return rs, nil
}