Search: use SQL search as a fallback during bluge's initial indexing (#54095)

* Search: use SQL search as a fallback when bluge indexing is ongoing

* Search: lint

* Search: feedback fixes - return an empty frame with a special name

* Search: revert readiness check query type

* Search: remove println

* remove sleep, get coffee
This commit is contained in:
Artur Wierzbicki 2022-08-26 12:36:41 +04:00 committed by GitHub
parent 5a1b9d2283
commit 74158ed66b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 275 additions and 144 deletions

View File

@ -81,30 +81,78 @@ func (i *orgIndex) readerForIndex(idxType indexType) (*bluge.Reader, func(), err
}
type searchIndex struct {
mu sync.RWMutex
loader dashboardLoader
perOrgIndex map[int64]*orgIndex
eventStore eventStore
logger log.Logger
buildSignals chan buildSignal
extender DocumentExtender
folderIdLookup folderUIDLookup
syncCh chan chan struct{}
mu sync.RWMutex
loader dashboardLoader
perOrgIndex map[int64]*orgIndex
initializedOrgs map[int64]bool
initialIndexingComplete bool
initializationMutex sync.RWMutex
eventStore eventStore
logger log.Logger
buildSignals chan buildSignal
extender DocumentExtender
folderIdLookup folderUIDLookup
syncCh chan chan struct{}
}
func newSearchIndex(dashLoader dashboardLoader, evStore eventStore, extender DocumentExtender, folderIDs folderUIDLookup) *searchIndex {
return &searchIndex{
loader: dashLoader,
eventStore: evStore,
perOrgIndex: map[int64]*orgIndex{},
logger: log.New("searchIndex"),
buildSignals: make(chan buildSignal),
extender: extender,
folderIdLookup: folderIDs,
syncCh: make(chan chan struct{}),
loader: dashLoader,
eventStore: evStore,
perOrgIndex: map[int64]*orgIndex{},
initializedOrgs: map[int64]bool{},
logger: log.New("searchIndex"),
buildSignals: make(chan buildSignal),
extender: extender,
folderIdLookup: folderIDs,
syncCh: make(chan chan struct{}),
}
}
func (i *searchIndex) isInitialized(_ context.Context, orgId int64) IsSearchReadyResponse {
i.initializationMutex.RLock()
orgInitialized := i.initializedOrgs[orgId]
initialInitComplete := i.initialIndexingComplete
i.initializationMutex.RUnlock()
if orgInitialized && initialInitComplete {
return IsSearchReadyResponse{IsReady: true}
}
if !initialInitComplete {
return IsSearchReadyResponse{IsReady: false, Reason: "initial-indexing-ongoing"}
}
i.triggerBuildingOrgIndex(orgId)
return IsSearchReadyResponse{IsReady: false, Reason: "org-indexing-ongoing"}
}
func (i *searchIndex) triggerBuildingOrgIndex(orgId int64) {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
doneIndexing := make(chan error, 1)
signal := buildSignal{orgID: orgId, done: doneIndexing}
select {
case i.buildSignals <- signal:
case <-ctx.Done():
i.logger.Warn("Failed to send a build signal to initialize org index", "orgId", orgId)
return
}
select {
case err := <-doneIndexing:
if err != nil {
i.logger.Error("Failed to build org index", "orgId", orgId, "error", err)
} else {
i.logger.Debug("Successfully built org index", "orgId", orgId)
}
case <-ctx.Done():
i.logger.Warn("Building org index timeout", "orgId", orgId)
}
}()
}
func (i *searchIndex) sync(ctx context.Context) error {
doneCh := make(chan struct{}, 1)
select {
@ -149,6 +197,10 @@ func (i *searchIndex) run(ctx context.Context, orgIDs []int64, reIndexSignalCh c
// Channel to handle signals about asynchronous full re-indexing completion.
reIndexDoneCh := make(chan int64, 1)
i.initializationMutex.Lock()
i.initialIndexingComplete = true
i.initializationMutex.Unlock()
for {
select {
case doneCh := <-i.syncCh:
@ -421,6 +473,10 @@ func (i *searchIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, erro
i.perOrgIndex[orgID] = index
i.mu.Unlock()
i.initializationMutex.Lock()
i.initializedOrgs[orgID] = true
i.initializationMutex.Unlock()
if orgID == 1 {
go func() {
if reader, cancel, err := index.readerForIndex(indexTypeDashboard); err == nil {

View File

@ -45,6 +45,20 @@ func (_m *MockSearchService) IsDisabled() bool {
return r0
}
// IsReady provides a mock function with given fields: ctx, orgId
func (_m *MockSearchService) IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse {
ret := _m.Called(ctx, orgId)
var r0 IsSearchReadyResponse
if rf, ok := ret.Get(0).(func(context.Context, int64) IsSearchReadyResponse); ok {
r0 = rf(ctx, orgId)
} else {
r0 = ret.Get(0).(IsSearchReadyResponse)
}
return r0
}
// RegisterDashboardIndexExtender provides a mock function with given fields: ext
func (_m *MockSearchService) RegisterDashboardIndexExtender(ext DashboardIndexExtender) {
_m.Called(ext)

View File

@ -64,6 +64,10 @@ type StandardSearchService struct {
reIndexCh chan struct{}
}
func (s *StandardSearchService) IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse {
return s.dashboardIndex.isInitialized(ctx, orgId)
}
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService, ac accesscontrol.Service) SearchService {
extender := &NoopExtender{}
s := &StandardSearchService{

View File

@ -10,6 +10,10 @@ import (
type stubSearchService struct {
}
func (s *stubSearchService) IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse {
return IsSearchReadyResponse{}
}
func (s *stubSearchService) IsDisabled() bool {
return true
}

View File

@ -31,11 +31,17 @@ type DashboardQuery struct {
From int `json:"from,omitempty"` // for paging
}
type IsSearchReadyResponse struct {
IsReady bool
Reason string // initial-indexing-ongoing, org-indexing-ongoing
}
//go:generate mockery --name SearchService --structname MockSearchService --inpackage --filename search_service_mock.go
type SearchService interface {
registry.CanBeDisabled
registry.BackgroundService
DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse
IsReady(ctx context.Context, orgId int64) IsSearchReadyResponse
RegisterDashboardIndexExtender(ext DashboardIndexExtender)
TriggerReIndex()
}

View File

@ -9,13 +9,15 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/searchV2"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/testdatasource"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// DatasourceName is the string constant used as the datasource name in requests
@ -34,8 +36,19 @@ const DatasourceUID = "grafana"
// This is important to do since otherwise we will only get a
// not implemented error response from plugin at runtime.
var (
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.CheckHealthHandler = (*Service)(nil)
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.CheckHealthHandler = (*Service)(nil)
namespace = "grafana"
subsystem = "grafanads"
dashboardSearchNotServedRequestsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dashboard_search_requests_not_served_total",
Help: "A counter for dashboard search requests that could not be served due to an ongoing search engine indexing",
},
[]string{"reason"},
)
)
func ProvideService(cfg *setting.Cfg, search searchV2.SearchService, store store.StorageService) *Service {
@ -46,6 +59,7 @@ func newService(cfg *setting.Cfg, search searchV2.SearchService, store store.Sto
s := &Service{
search: search,
store: store,
log: log.New("grafanads"),
}
return s
@ -55,6 +69,7 @@ func newService(cfg *setting.Cfg, search searchV2.SearchService, store store.Sto
type Service struct {
search searchV2.SearchService
store store.StorageService
log log.Logger
}
func DataSourceModel(orgId int64) *datasources.DataSource {
@ -157,6 +172,21 @@ func (s *Service) doRandomWalk(query backend.DataQuery) backend.DataResponse {
}
func (s *Service) doSearchQuery(ctx context.Context, req *backend.QueryDataRequest, query backend.DataQuery) backend.DataResponse {
searchReadinessCheckResp := s.search.IsReady(ctx, req.PluginContext.OrgID)
if !searchReadinessCheckResp.IsReady {
dashboardSearchNotServedRequestsCounter.With(prometheus.Labels{
"reason": searchReadinessCheckResp.Reason,
}).Inc()
return backend.DataResponse{
Frames: data.Frames{
&data.Frame{
Name: "Loading",
},
},
}
}
m := requestModel{}
err := json.Unmarshal(query.JSON, &m)
if err != nil {

View File

@ -10,12 +10,18 @@ import { replaceCurrentFolderQuery } from './utils';
import { DashboardQueryResult, GrafanaSearcher, QueryResponse, SearchQuery, SearchResultMeta } from '.';
// The backend returns an empty frame with a special name to indicate that the indexing engine is being rebuilt,
// and that it can not serve any search requests. We are temporarily using the old SQL Search API as a fallback when that happens.
const loadingFrameName = 'Loading';
export class BlugeSearcher implements GrafanaSearcher {
constructor(private fallbackSearcher: GrafanaSearcher) {}
async search(query: SearchQuery): Promise<QueryResponse> {
if (query.facet?.length) {
throw new Error('facets not supported!');
}
return doSearchQuery(query);
return this.doSearchQuery(query);
}
async starred(query: SearchQuery): Promise<QueryResponse> {
@ -28,7 +34,7 @@ export class BlugeSearcher implements GrafanaSearcher {
uid: starsUIDS,
query: query.query ?? '*',
};
return doSearchQuery(starredQuery);
return this.doSearchQuery(starredQuery);
}
async tags(query: SearchQuery): Promise<TermCount[]> {
@ -52,6 +58,11 @@ export class BlugeSearcher implements GrafanaSearcher {
} as any)
)
).data as DataFrame[];
if (data?.[0]?.name === loadingFrameName) {
return this.fallbackSearcher.tags(query);
}
for (const frame of data) {
if (frame.fields[0].name === 'tag') {
return getTermCountsFrom(frame);
@ -80,132 +91,137 @@ export class BlugeSearcher implements GrafanaSearcher {
return Promise.resolve(opts);
}
async doSearchQuery(query: SearchQuery): Promise<QueryResponse> {
query = await replaceCurrentFolderQuery(query);
const ds = await getGrafanaDatasource();
const target = {
refId: 'Search',
queryType: GrafanaQueryType.Search,
search: {
...query,
query: query.query ?? '*',
limit: query.limit ?? firstPageSize,
},
};
const rsp = await lastValueFrom(
ds.query({
targets: [target],
} as any)
);
const first = (rsp.data?.[0] as DataFrame) ?? { fields: [], length: 0 };
if (first.name === loadingFrameName) {
return this.fallbackSearcher.search(query);
}
for (const field of first.fields) {
field.display = getDisplayProcessor({ field, theme: config.theme2 });
}
// Make sure the object exists
if (!first.meta?.custom) {
first.meta = {
...first.meta,
custom: {
count: first.length,
max_score: 1,
},
};
}
const meta = first.meta.custom as SearchResultMeta;
if (!meta.locationInfo) {
meta.locationInfo = {}; // always set it so we can append
}
// Set the field name to a better display name
if (meta.sortBy?.length) {
const field = first.fields.find((f) => f.name === meta.sortBy);
if (field) {
const name = getSortFieldDisplayName(field.name);
meta.sortBy = name;
field.name = name; // make it look nicer
}
}
let loadMax = 0;
let pending: Promise<void> | undefined = undefined;
const getNextPage = async () => {
while (loadMax > view.dataFrame.length) {
const from = view.dataFrame.length;
if (from >= meta.count) {
return;
}
const frame = (
await lastValueFrom(
ds.query({
targets: [
{
...target,
search: {
...(target?.search ?? {}),
from,
limit: nextPageSizes,
},
refId: 'Page',
facet: undefined,
},
],
} as any)
)
).data?.[0] as DataFrame;
if (!frame) {
console.log('no results', frame);
return;
}
if (frame.fields.length !== view.dataFrame.fields.length) {
console.log('invalid shape', frame, view.dataFrame);
return;
}
// Append the raw values to the same array buffer
const length = frame.length + view.dataFrame.length;
for (let i = 0; i < frame.fields.length; i++) {
const values = (view.dataFrame.fields[i].values as ArrayVector).buffer;
values.push(...frame.fields[i].values.toArray());
}
view.dataFrame.length = length;
// Add all the location lookup info
const submeta = frame.meta?.custom as SearchResultMeta;
if (submeta?.locationInfo && meta) {
for (const [key, value] of Object.entries(submeta.locationInfo)) {
meta.locationInfo[key] = value;
}
}
}
pending = undefined;
};
const view = new DataFrameView<DashboardQueryResult>(first);
return {
totalRows: meta.count ?? first.length,
view,
loadMoreItems: async (startIndex: number, stopIndex: number): Promise<void> => {
loadMax = Math.max(loadMax, stopIndex);
if (!pending) {
pending = getNextPage();
}
return pending;
},
isItemLoaded: (index: number): boolean => {
return index < view.dataFrame.length;
},
};
}
}
const firstPageSize = 50;
const nextPageSizes = 100;
async function doSearchQuery(query: SearchQuery): Promise<QueryResponse> {
query = await replaceCurrentFolderQuery(query);
const ds = await getGrafanaDatasource();
const target = {
refId: 'Search',
queryType: GrafanaQueryType.Search,
search: {
...query,
query: query.query ?? '*',
limit: query.limit ?? firstPageSize,
},
};
const rsp = await lastValueFrom(
ds.query({
targets: [target],
} as any)
);
const first = (rsp.data?.[0] as DataFrame) ?? { fields: [], length: 0 };
for (const field of first.fields) {
field.display = getDisplayProcessor({ field, theme: config.theme2 });
}
// Make sure the object exists
if (!first.meta?.custom) {
first.meta = {
...first.meta,
custom: {
count: first.length,
max_score: 1,
},
};
}
const meta = first.meta.custom as SearchResultMeta;
if (!meta.locationInfo) {
meta.locationInfo = {}; // always set it so we can append
}
// Set the field name to a better display name
if (meta.sortBy?.length) {
const field = first.fields.find((f) => f.name === meta.sortBy);
if (field) {
const name = getSortFieldDisplayName(field.name);
meta.sortBy = name;
field.name = name; // make it look nicer
}
}
let loadMax = 0;
let pending: Promise<void> | undefined = undefined;
const getNextPage = async () => {
while (loadMax > view.dataFrame.length) {
const from = view.dataFrame.length;
if (from >= meta.count) {
return;
}
const frame = (
await lastValueFrom(
ds.query({
targets: [
{
...target,
search: {
...(target?.search ?? {}),
from,
limit: nextPageSizes,
},
refId: 'Page',
facet: undefined,
},
],
} as any)
)
).data?.[0] as DataFrame;
if (!frame) {
console.log('no results', frame);
return;
}
if (frame.fields.length !== view.dataFrame.fields.length) {
console.log('invalid shape', frame, view.dataFrame);
return;
}
// Append the raw values to the same array buffer
const length = frame.length + view.dataFrame.length;
for (let i = 0; i < frame.fields.length; i++) {
const values = (view.dataFrame.fields[i].values as ArrayVector).buffer;
values.push(...frame.fields[i].values.toArray());
}
view.dataFrame.length = length;
// Add all the location lookup info
const submeta = frame.meta?.custom as SearchResultMeta;
if (submeta?.locationInfo && meta) {
for (const [key, value] of Object.entries(submeta.locationInfo)) {
meta.locationInfo[key] = value;
}
}
}
pending = undefined;
};
const view = new DataFrameView<DashboardQueryResult>(first);
return {
totalRows: meta.count ?? first.length,
view,
loadMoreItems: async (startIndex: number, stopIndex: number): Promise<void> => {
loadMax = Math.max(loadMax, stopIndex);
if (!pending) {
pending = getNextPage();
}
return pending;
},
isItemLoaded: (index: number): boolean => {
return index < view.dataFrame.length;
},
};
}
function getTermCountsFrom(frame: DataFrame): TermCount[] {
const keys = frame.fields[0].values;
const vals = frame.fields[1].values;

View File

@ -7,9 +7,10 @@ import { GrafanaSearcher } from './types';
let searcher: GrafanaSearcher | undefined = undefined;
export function getGrafanaSearcher(): GrafanaSearcher {
const sqlSearcher = new SQLSearcher();
if (!searcher) {
const useBluge = config.featureToggles.panelTitleSearch;
searcher = useBluge ? new BlugeSearcher() : new SQLSearcher();
searcher = useBluge ? new BlugeSearcher(sqlSearcher) : sqlSearcher;
}
return searcher!;
}