Streaming: support streaming and a javascript test datasource (#16729)

This commit is contained in:
Ryan McKinley 2019-04-25 14:01:02 -04:00 committed by GitHub
parent ab3860a334
commit 470634e2d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1100 additions and 177 deletions

View File

@ -105,7 +105,6 @@ export class Graph extends PureComponent<GraphProps> {
};
try {
console.log('Graph render');
$.plot(this.element, series, flotOptions);
} catch (err) {
console.log('Graph rendering error', err, flotOptions, series);

View File

@ -1,6 +1,7 @@
export enum LoadingState {
NotStarted = 'NotStarted',
Loading = 'Loading',
Streaming = 'Streaming',
Done = 'Done',
Error = 'Error',
}

View File

@ -1,7 +1,7 @@
import { ComponentClass } from 'react';
import { TimeRange } from './time';
import { PluginMeta } from './plugin';
import { TableData, TimeSeries, SeriesData } from './data';
import { TableData, TimeSeries, SeriesData, LoadingState } from './data';
import { PanelData } from './panel';
export interface DataSourcePluginOptionsEditorProps<TOptions> {
@ -105,7 +105,7 @@ export interface DataSourceApi<TQuery extends DataQuery = DataQuery> {
/**
* Main metrics / data query action
*/
query(options: DataQueryRequest<TQuery>): Promise<DataQueryResponse>;
query(options: DataQueryRequest<TQuery>, observer?: DataStreamObserver): Promise<DataQueryResponse>;
/**
* Test & verify datasource settings & connection details
@ -183,6 +183,47 @@ export type LegacyResponseData = TimeSeries | TableData | any;
export type DataQueryResponseData = SeriesData | LegacyResponseData;
export type DataStreamObserver = (event: DataStreamState) => void;
export interface DataStreamState {
/**
* when Done or Error no more events will be processed
*/
state: LoadingState;
/**
* Consistent key across events.
*/
key: string;
/**
* The stream request. The properties of this request will be examined
* to determine if the stream matches the original query. If not, it
* will be unsubscribed.
*/
request: DataQueryRequest;
/**
* Series data may not be known yet
*/
series?: SeriesData[];
/**
* Error in stream (but may still be running)
*/
error?: DataQueryError;
/**
* Optionally return only the rows that changed in this event
*/
delta?: SeriesData[];
/**
* Stop listening to this stream
*/
unsubscribe: () => void;
}
export interface DataQueryResponse {
data: DataQueryResponseData[];
}

View File

@ -225,6 +225,15 @@ func init() {
},
})
registerScenario(&Scenario{
Id: "streaming_client",
Name: "Streaming Client",
Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult {
// Real work is in javascript client
return tsdb.NewQueryResult()
},
})
registerScenario(&Scenario{
Id: "table_static",
Name: "Table Static",

View File

@ -4,6 +4,7 @@ const backendSrv = {
getDashboardByUid: jest.fn(),
getFolderByUid: jest.fn(),
post: jest.fn(),
resolveCancelerIfExists: jest.fn(),
};
export function getBackendSrv() {

View File

@ -22,7 +22,7 @@ import { ScopedVars } from '@grafana/ui';
import templateSrv from 'app/features/templating/template_srv';
import { getProcessedSeriesData } from '../state/PanelQueryRunner';
import { getProcessedSeriesData } from '../state/PanelQueryState';
import { Unsubscribable } from 'rxjs';
const DEFAULT_PLUGIN_ERROR = 'Error in plugin';
@ -48,6 +48,7 @@ export interface State {
export class PanelChrome extends PureComponent<Props, State> {
timeSrv: TimeSrv = getTimeSrv();
querySubscription: Unsubscribable;
delayedStateUpdate: Partial<State>;
constructor(props: Props) {
super(props);
@ -118,7 +119,15 @@ export class PanelChrome extends PureComponent<Props, State> {
}
}
this.setState({ isFirstLoad, errorMessage, data });
const stateUpdate = { isFirstLoad, errorMessage, data };
if (this.isVisible) {
this.setState(stateUpdate);
} else {
// if we are getting data while another panel is in fullscreen / edit mode
// we need to store the data but not update state yet
this.delayedStateUpdate = stateUpdate;
}
},
};
@ -162,9 +171,15 @@ export class PanelChrome extends PureComponent<Props, State> {
};
onRender = () => {
this.setState({
renderCounter: this.state.renderCounter + 1,
});
const stateUpdate = { renderCounter: this.state.renderCounter + 1 };
// If we have received a data update while hidden copy over that state as well
if (this.delayedStateUpdate) {
Object.assign(stateUpdate, this.delayedStateUpdate);
this.delayedStateUpdate = null;
}
this.setState(stateUpdate);
};
onOptionsChange = (options: any) => {

View File

@ -1,42 +1,15 @@
import { getProcessedSeriesData, PanelQueryRunner } from './PanelQueryRunner';
import { PanelData, DataQueryRequest } from '@grafana/ui/src/types';
import { PanelQueryRunner } from './PanelQueryRunner';
import {
PanelData,
DataQueryRequest,
DataStreamObserver,
DataStreamState,
LoadingState,
ScopedVars,
} from '@grafana/ui/src/types';
import moment from 'moment';
describe('PanelQueryRunner', () => {
it('converts timeseries to table skipping nulls', () => {
const input1 = {
target: 'Field Name',
datapoints: [[100, 1], [200, 2]],
};
const input2 = {
// without target
target: '',
datapoints: [[100, 1], [200, 2]],
};
const data = getProcessedSeriesData([null, input1, input2, null, null]);
expect(data.length).toBe(2);
expect(data[0].fields[0].name).toBe(input1.target);
expect(data[0].rows).toBe(input1.datapoints);
// Default name
expect(data[1].fields[0].name).toEqual('Value');
// Every colun should have a name and a type
for (const table of data) {
for (const column of table.fields) {
expect(column.name).toBeDefined();
expect(column.type).toBeDefined();
}
}
});
it('supports null values from query OK', () => {
expect(getProcessedSeriesData([null, null, null, null])).toEqual([]);
expect(getProcessedSeriesData(undefined)).toEqual([]);
expect(getProcessedSeriesData((null as unknown) as any[])).toEqual([]);
expect(getProcessedSeriesData([])).toEqual([]);
});
});
jest.mock('app/core/services/backend_srv');
interface ScenarioContext {
setup: (fn: () => void) => void;
@ -47,6 +20,9 @@ interface ScenarioContext {
events?: PanelData[];
res?: PanelData;
queryCalledWith?: DataQueryRequest;
observer: DataStreamObserver;
runner: PanelQueryRunner;
scopedVars: ScopedVars;
}
type ScenarioFn = (ctx: ScenarioContext) => void;
@ -57,12 +33,16 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
const ctx: ScenarioContext = {
widthPixels: 200,
scopedVars: {
server: { text: 'Server1', value: 'server-1' },
},
runner: new PanelQueryRunner(),
observer: (args: any) => {},
setup: (fn: () => void) => {
setupFn = fn;
},
};
let runner: PanelQueryRunner;
const response: any = {
data: [{ target: 'hello', datapoints: [] }],
};
@ -71,9 +51,11 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
setupFn();
const datasource: any = {
name: 'TestDB',
interval: ctx.dsInterval,
query: (options: DataQueryRequest) => {
query: (options: DataQueryRequest, observer: DataStreamObserver) => {
ctx.queryCalledWith = options;
ctx.observer = observer;
return Promise.resolve(response);
},
testDatasource: jest.fn(),
@ -81,6 +63,7 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
const args: any = {
datasource,
scopedVars: ctx.scopedVars,
minInterval: ctx.minInterval,
widthPixels: ctx.widthPixels,
maxDataPoints: ctx.maxDataPoints,
@ -93,15 +76,15 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
queries: [{ refId: 'A', test: 1 }],
};
runner = new PanelQueryRunner();
runner.subscribe({
ctx.runner = new PanelQueryRunner();
ctx.runner.subscribe({
next: (data: PanelData) => {
ctx.events.push(data);
},
});
ctx.events = [];
ctx.res = await runner.run(args);
ctx.res = await ctx.runner.run(args);
});
scenarioFn(ctx);
@ -109,6 +92,22 @@ function describeQueryRunnerScenario(description: string, scenarioFn: ScenarioFn
}
describe('PanelQueryRunner', () => {
describeQueryRunnerScenario('simple scenario', ctx => {
it('should set requestId on request', async () => {
expect(ctx.queryCalledWith.requestId).toBe('Q100');
});
it('should set datasource name on request', async () => {
expect(ctx.queryCalledWith.targets[0].datasource).toBe('TestDB');
});
it('should pass scopedVars to datasource with interval props', async () => {
expect(ctx.queryCalledWith.scopedVars.server.text).toBe('Server1');
expect(ctx.queryCalledWith.scopedVars.__interval.text).toBe('5m');
expect(ctx.queryCalledWith.scopedVars.__interval_ms.text).toBe('300000');
});
});
describeQueryRunnerScenario('with no maxDataPoints or minInterval', ctx => {
ctx.setup(() => {
ctx.maxDataPoints = null;
@ -165,4 +164,47 @@ describe('PanelQueryRunner', () => {
expect(ctx.queryCalledWith.maxDataPoints).toBe(10);
});
});
describeQueryRunnerScenario('when datasource is streaming data', ctx => {
let streamState: DataStreamState;
let isUnsubbed = false;
beforeEach(() => {
streamState = {
state: LoadingState.Streaming,
key: 'test-stream-1',
series: [
{
rows: [],
fields: [],
name: 'I am a magic stream',
},
],
request: {
requestId: ctx.queryCalledWith.requestId,
} as any,
unsubscribe: () => {
isUnsubbed = true;
},
};
ctx.observer(streamState);
});
it('should push another update to subscriber', async () => {
expect(ctx.events.length).toBe(2);
});
it('should set state to streaming', async () => {
expect(ctx.events[1].state).toBe(LoadingState.Streaming);
});
it('should not unsubscribe', async () => {
expect(isUnsubbed).toBe(false);
});
it('destroy should unsubscribe streams', async () => {
ctx.runner.destroy();
expect(isUnsubbed).toBe(true);
});
});
});

View File

@ -1,27 +1,17 @@
// Libraries
import cloneDeep from 'lodash/cloneDeep';
import throttle from 'lodash/throttle';
import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
// Services & Utils
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import kbn from 'app/core/utils/kbn';
import templateSrv from 'app/features/templating/template_srv';
// Components & Types
import {
guessFieldTypes,
toSeriesData,
PanelData,
LoadingState,
DataQuery,
TimeRange,
ScopedVars,
DataQueryRequest,
SeriesData,
DataSourceApi,
} from '@grafana/ui';
import { PanelQueryState } from './PanelQueryState';
// Types
import { PanelData, DataQuery, TimeRange, ScopedVars, DataQueryRequest, DataSourceApi } from '@grafana/ui';
export interface QueryRunnerOptions<TQuery extends DataQuery = DataQuery> {
datasource: string | DataSourceApi<TQuery>;
queries: TQuery[];
@ -54,6 +44,10 @@ export class PanelQueryRunner {
private state = new PanelQueryState();
constructor() {
this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
}
/**
* Listen for updates to the PanelData. If a query has already run for this panel,
* the results will be immediatly passed to the observer
@ -73,7 +67,7 @@ export class PanelQueryRunner {
}
// Send the last result
if (this.state.data.state !== LoadingState.NotStarted) {
if (this.state.isStarted()) {
observer.next(this.state.getDataAfterCheckingFormats());
}
@ -103,6 +97,9 @@ export class PanelQueryRunner {
delayStateNotification,
} = options;
// filter out hidden queries & deep clone them
const clonedAndFilteredQueries = cloneDeep(queries.filter(q => !q.hide));
const request: DataQueryRequest = {
requestId: getNextRequestId(),
timezone,
@ -112,26 +109,20 @@ export class PanelQueryRunner {
timeInfo,
interval: '',
intervalMs: 0,
targets: cloneDeep(
queries.filter(q => {
return !q.hide; // Skip any hidden queries
})
),
targets: clonedAndFilteredQueries,
maxDataPoints: maxDataPoints || widthPixels,
scopedVars: scopedVars || {},
cacheTimeout,
startTime: Date.now(),
};
// Deprecated
// Add deprecated property
(request as any).rangeRaw = timeRange.raw;
let loadingStateTimeoutId = 0;
try {
const ds =
datasource && (datasource as any).query
? (datasource as DataSourceApi)
: await getDatasourceSrv().get(datasource as string, request.scopedVars);
const ds = await getDataSource(datasource, request.scopedVars);
// Attach the datasource name to each query
request.targets = request.targets.map(query => {
@ -148,17 +139,19 @@ export class PanelQueryRunner {
// and add built in variables interval and interval_ms
request.scopedVars = Object.assign({}, request.scopedVars, {
__interval: { text: norm.interval, value: norm.interval },
__interval_ms: { text: norm.intervalMs, value: norm.intervalMs },
__interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
});
request.interval = norm.interval;
request.intervalMs = norm.intervalMs;
// Check if we can reuse the already issued query
if (state.isRunning()) {
const active = state.getActiveRunner();
if (active) {
if (state.isSameQuery(ds, request)) {
// TODO? maybe cancel if it has run too long?
return state.getCurrentExecutor();
// Maybe cancel if it has run too long?
console.log('Trying to execute query while last one has yet to complete, returning same promise');
return active;
} else {
state.cancel('Query Changed while running');
}
@ -166,8 +159,8 @@ export class PanelQueryRunner {
// Send a loading status event on slower queries
loadingStateTimeoutId = window.setTimeout(() => {
if (this.state.isRunning()) {
this.subject.next(this.state.data);
if (state.getActiveRunner()) {
this.subject.next(this.state.validateStreamsAndGetPanelData());
}
}, delayStateNotification || 500);
@ -188,6 +181,18 @@ export class PanelQueryRunner {
}
}
/**
* Called after every streaming event. This should be throttled so we
* avoid accidentally overwhelming the browser
*/
onStreamingDataUpdated = throttle(
() => {
this.subject.next(this.state.validateStreamsAndGetPanelData());
},
50,
{ trailing: true, leading: true }
);
/**
* Called when the panel is closed
*/
@ -202,22 +207,12 @@ export class PanelQueryRunner {
}
}
/**
* All panels will be passed tables that have our best guess at colum type set
*
* This is also used by PanelChrome for snapshot support
*/
export function getProcessedSeriesData(results?: any[]): SeriesData[] {
if (!results) {
return [];
async function getDataSource(
datasource: string | DataSourceApi | null,
scopedVars: ScopedVars
): Promise<DataSourceApi> {
if (datasource && (datasource as any).query) {
return datasource as DataSourceApi;
}
const series: SeriesData[] = [];
for (const r of results) {
if (r) {
series.push(guessFieldTypes(toSeriesData(r)));
}
}
return series;
return await getDatasourceSrv().get(datasource as string, scopedVars);
}

View File

@ -1,6 +1,6 @@
import { toDataQueryError, PanelQueryState } from './PanelQueryState';
import { toDataQueryError, PanelQueryState, getProcessedSeriesData } from './PanelQueryState';
import { MockDataSourceApi } from 'test/mocks/datasource_srv';
import { DataQueryResponse } from '@grafana/ui';
import { DataQueryResponse, LoadingState } from '@grafana/ui';
import { getQueryOptions } from 'test/helpers/getQueryOptions';
describe('PanelQueryState', () => {
@ -17,11 +17,11 @@ describe('PanelQueryState', () => {
it('keeps track of running queries', async () => {
const state = new PanelQueryState();
expect(state.isRunning()).toBeFalsy();
expect(state.getActiveRunner()).toBeFalsy();
let hasRun = false;
const dsRunner = new Promise<DataQueryResponse>((resolve, reject) => {
// The status should be running when we get here
expect(state.isRunning()).toBeTruthy();
expect(state.getActiveRunner()).toBeTruthy();
resolve({ data: ['x', 'y'] });
hasRun = true;
});
@ -30,7 +30,7 @@ describe('PanelQueryState', () => {
// should not actually run for an empty query
let empty = await state.execute(ds, getQueryOptions({}));
expect(state.isRunning()).toBeFalsy();
expect(state.getActiveRunner()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
@ -39,8 +39,162 @@ describe('PanelQueryState', () => {
getQueryOptions({ targets: [{ hide: true, refId: 'X' }, { hide: true, refId: 'Y' }, { hide: true, refId: 'Z' }] })
);
// should not run any hidden queries'
expect(state.isRunning()).toBeFalsy();
expect(state.getActiveRunner()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
});
});
describe('getProcessedSeriesData', () => {
it('converts timeseries to table skipping nulls', () => {
const input1 = {
target: 'Field Name',
datapoints: [[100, 1], [200, 2]],
};
const input2 = {
// without target
target: '',
datapoints: [[100, 1], [200, 2]],
};
const data = getProcessedSeriesData([null, input1, input2, null, null]);
expect(data.length).toBe(2);
expect(data[0].fields[0].name).toBe(input1.target);
expect(data[0].rows).toBe(input1.datapoints);
// Default name
expect(data[1].fields[0].name).toEqual('Value');
// Every colun should have a name and a type
for (const table of data) {
for (const column of table.fields) {
expect(column.name).toBeDefined();
expect(column.type).toBeDefined();
}
}
});
it('supports null values from query OK', () => {
expect(getProcessedSeriesData([null, null, null, null])).toEqual([]);
expect(getProcessedSeriesData(undefined)).toEqual([]);
expect(getProcessedSeriesData((null as unknown) as any[])).toEqual([]);
expect(getProcessedSeriesData([])).toEqual([]);
});
});
function makeSeriesStub(refId: string) {
return {
fields: [{ name: 'a' }],
rows: [],
refId,
};
}
describe('stream handling', () => {
const state = new PanelQueryState();
state.onStreamingDataUpdated = () => {
// nothing
};
state.request = {
requestId: '123',
range: {
raw: {
from: 123, // if string it gets revaluated
},
},
} as any;
state.response = {
state: LoadingState.Done,
series: [makeSeriesStub('A'), makeSeriesStub('B')],
};
it('gets the response', () => {
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(2);
expect(data.state).toBe(LoadingState.Done);
expect(data.series[0].refId).toBe('A');
});
it('adds a stream event', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'C',
request: state.request, // From the same request
series: [makeSeriesStub('C')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(1);
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(3);
expect(data.state).toBe(LoadingState.Streaming);
expect(data.series[2].refId).toBe('C');
});
it('add another stream event (with a differnet key)', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'D',
request: state.request, // From the same request
series: [makeSeriesStub('D')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2);
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(4);
expect(data.state).toBe(LoadingState.Streaming);
expect(data.series[3].refId).toBe('D');
});
it('replace the first stream value, but keep the order', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'C', // The key to replace previous index 2
request: state.request, // From the same request
series: [makeSeriesStub('X')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2);
const data = state.validateStreamsAndGetPanelData();
expect(data.series[2].refId).toBe('X');
});
it('ignores streams from a differnet request', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Loading,
key: 'Z', // Note with key 'A' it would still overwrite
request: {
...state.request,
requestId: 'XXX', // Different request and id
} as any,
series: [makeSeriesStub('C')],
unsubscribe: () => {},
});
expect(state.streams.length).toBe(2); // no change
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(4);
});
it('removes streams when the query changes', () => {
state.request = {
...state.request,
requestId: 'somethine else',
} as any;
state.response = {
state: LoadingState.Done,
series: [makeSeriesStub('F')],
};
expect(state.streams.length).toBe(2); // unchanged
const data = state.validateStreamsAndGetPanelData();
expect(data.series.length).toBe(1);
expect(data.series[0].refId).toBe('F');
expect(state.streams.length).toBe(0); // no streams
});
});

View File

@ -1,16 +1,25 @@
// Libraries
import isString from 'lodash/isString';
import isEqual from 'lodash/isEqual';
// Utils & Services
import { getBackendSrv } from 'app/core/services/backend_srv';
import * as dateMath from 'app/core/utils/datemath';
import { guessFieldTypes, toSeriesData, isSeriesData } from '@grafana/ui/src/utils';
// Types
import {
DataSourceApi,
DataQueryRequest,
PanelData,
LoadingState,
toLegacyResponseData,
isSeriesData,
toSeriesData,
DataQueryError,
DataStreamObserver,
DataStreamState,
SeriesData,
DataQueryResponseData,
} from '@grafana/ui';
import { getProcessedSeriesData } from './PanelQueryRunner';
import { getBackendSrv } from 'app/core/services/backend_srv';
import isEqual from 'lodash/isEqual';
export class PanelQueryState {
// The current/last running request
@ -19,26 +28,33 @@ export class PanelQueryState {
endTime: 1000, // Somethign not zero
} as DataQueryRequest;
// The best known state of data
data = {
// The result back from the datasource query
response = {
state: LoadingState.NotStarted,
series: [],
} as PanelData;
// Active stream results
streams: DataStreamState[] = [];
sendSeries = false;
sendLegacy = false;
// A promise for the running query
private executor: Promise<PanelData> = {} as any;
private executor?: Promise<PanelData>;
private rejector = (reason?: any) => {};
private datasource: DataSourceApi = {} as any;
isRunning() {
return this.data.state === LoadingState.Loading; //
isFinished(state: LoadingState) {
return state === LoadingState.Done || state === LoadingState.Error;
}
isStarted() {
return this.response.state !== LoadingState.NotStarted;
}
isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
if (this.datasource !== this.datasource) {
if (ds !== this.datasource) {
return false;
}
@ -46,16 +62,22 @@ export class PanelQueryState {
return isEqual(this.request.targets, req.targets);
}
getCurrentExecutor() {
/**
* Return the currently running query
*/
getActiveRunner(): Promise<PanelData> | undefined {
return this.executor;
}
cancel(reason: string) {
const { request } = this;
this.executor = null;
try {
// If no endTime the call to datasource.query did not complete
// call rejector to reject the executor promise
if (!request.endTime) {
request.endTime = Date.now();
this.rejector('Canceled:' + reason);
}
@ -64,8 +86,11 @@ export class PanelQueryState {
getBackendSrv().resolveCancelerIfExists(request.requestId);
}
} catch (err) {
console.log('Error canceling request');
console.log('Error canceling request', err);
}
// Close any open streams
this.closeStreams(true);
}
execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
@ -75,85 +100,214 @@ export class PanelQueryState {
if (!req.targets.length) {
console.log('No queries, so return early');
this.request.endTime = Date.now();
this.closeStreams();
return Promise.resolve(
(this.data = {
(this.response = {
state: LoadingState.Done,
series: [], // Clear the data
legacy: [],
request: req,
})
);
}
// Set the loading state immediatly
this.data.state = LoadingState.Loading;
return (this.executor = new Promise<PanelData>((resolve, reject) => {
this.response.state = LoadingState.Loading;
this.executor = new Promise<PanelData>((resolve, reject) => {
this.rejector = reject;
return ds
.query(this.request)
.query(this.request, this.dataStreamObserver)
.then(resp => {
this.request.endTime = Date.now();
this.executor = null;
// Make sure we send something back -- called run() w/o subscribe!
if (!(this.sendSeries || this.sendLegacy)) {
this.sendSeries = true;
}
// Make sure the response is in a supported format
const series = this.sendSeries ? getProcessedSeriesData(resp.data) : [];
const legacy = this.sendLegacy
? resp.data.map(v => {
if (isSeriesData(v)) {
return toLegacyResponseData(v);
}
return v;
})
: undefined;
resolve(
(this.data = {
state: LoadingState.Done,
request: this.request,
series,
legacy,
})
);
// Save the result state
this.response = {
state: LoadingState.Done,
request: this.request,
series: this.sendSeries ? getProcessedSeriesData(resp.data) : [],
legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined,
};
resolve(this.validateStreamsAndGetPanelData());
})
.catch(err => {
this.executor = null;
resolve(this.setError(err));
});
}));
});
return this.executor;
}
// Send a notice when the stream has updated the current model
onStreamingDataUpdated: () => void;
// This gets all stream events and keeps track of them
// it will then delegate real changes to the PanelQueryRunner
dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => {
// Streams only work with the 'series' format
this.sendSeries = true;
// Add the stream to our list
let found = false;
const active = this.streams.map(s => {
if (s.key === stream.key) {
found = true;
return stream;
}
return s;
});
if (!found) {
if (shouldDisconnect(this.request, stream)) {
console.log('Got stream update from old stream, unsubscribing');
stream.unsubscribe();
return;
}
active.push(stream);
}
this.streams = active;
this.onStreamingDataUpdated();
};
closeStreams(keepSeries = false) {
if (!this.streams.length) {
return;
}
const series: SeriesData[] = [];
for (const stream of this.streams) {
if (stream.series) {
series.push.apply(series, stream.series);
}
try {
stream.unsubscribe();
} catch {
console.log('Failed to unsubscribe to stream');
}
}
this.streams = [];
// Move the series from streams to the resposne
if (keepSeries) {
const { response } = this;
this.response = {
...response,
series: [
...response.series,
...series, // Append the streamed series
],
};
}
}
/**
* This is called before broadcasting data to listeners. Given that
* stream events can happen at any point, we need to make sure to
* only return data from active streams.
*/
validateStreamsAndGetPanelData(): PanelData {
const { response, streams, request } = this;
// When not streaming, return the response + request
if (!streams.length) {
return {
...response,
request: request,
};
}
let done = this.isFinished(response.state);
const series = [...response.series];
const active: DataStreamState[] = [];
for (const stream of this.streams) {
if (shouldDisconnect(request, stream)) {
console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam');
stream.unsubscribe();
continue;
}
active.push(stream);
series.push.apply(series, stream.series);
if (!this.isFinished(stream.state)) {
done = false;
}
}
this.streams = active;
// Update the time range
let timeRange = this.request.range;
if (isString(timeRange.raw.from)) {
timeRange = {
from: dateMath.parse(timeRange.raw.from, false),
to: dateMath.parse(timeRange.raw.to, true),
raw: timeRange.raw,
};
}
return {
state: done ? LoadingState.Done : LoadingState.Streaming,
series, // Union of series from response and all streams
legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
request: {
...this.request,
range: timeRange, // update the time range
},
};
}
/**
* Make sure all requested formats exist on the data
*/
getDataAfterCheckingFormats(): PanelData {
const { data, sendLegacy, sendSeries } = this;
if (sendLegacy && (!data.legacy || !data.legacy.length)) {
data.legacy = data.series.map(v => toLegacyResponseData(v));
const { response, sendLegacy, sendSeries } = this;
if (sendLegacy && (!response.legacy || !response.legacy.length)) {
response.legacy = response.series.map(v => toLegacyResponseData(v));
}
if (sendSeries && !data.series.length && data.legacy) {
data.series = data.legacy.map(v => toSeriesData(v));
if (sendSeries && !response.series.length && response.legacy) {
response.series = response.legacy.map(v => toSeriesData(v));
}
return this.data;
return this.validateStreamsAndGetPanelData();
}
setError(err: any): PanelData {
if (!this.request.endTime) {
this.request.endTime = Date.now();
}
return (this.data = {
...this.data, // Keep any existing data
this.closeStreams(true);
this.response = {
...this.response, // Keep any existing data
state: LoadingState.Error,
error: toDataQueryError(err),
request: this.request,
});
};
return this.validateStreamsAndGetPanelData();
}
}
export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) {
// It came from the same the same request, so keep it
if (source === state.request || state.request.requestId.startsWith(source.requestId)) {
return false;
}
// We should be able to check that it is the same query regardless of
// if it came from the same request. This will be important for #16676
return true;
}
export function toDataQueryError(err: any): DataQueryError {
const error = (err || {}) as DataQueryError;
if (!error.message) {
@ -175,3 +329,32 @@ export function toDataQueryError(err: any): DataQueryError {
}
return error;
}
function translateToLegacyData(data: DataQueryResponseData) {
return data.map(v => {
if (isSeriesData(v)) {
return toLegacyResponseData(v);
}
return v;
});
}
/**
* All panels will be passed tables that have our best guess at colum type set
*
* This is also used by PanelChrome for snapshot support
*/
export function getProcessedSeriesData(results?: any[]): SeriesData[] {
if (!results) {
return [];
}
const series: SeriesData[] = [];
for (const r of results) {
if (r) {
series.push(guessFieldTypes(toSeriesData(r)));
}
}
return series;
}

View File

@ -0,0 +1,400 @@
import defaults from 'lodash/defaults';
import {
DataQueryRequest,
FieldType,
SeriesData,
DataQueryResponse,
DataQueryError,
DataStreamObserver,
DataStreamState,
LoadingState,
} from '@grafana/ui';
import { TestDataQuery, StreamingQuery } from './types';
export const defaultQuery: StreamingQuery = {
type: 'signal',
speed: 250, // ms
spread: 3.5,
noise: 2.2,
};
type StreamWorkers = {
[key: string]: StreamWorker;
};
export class StreamHandler {
workers: StreamWorkers = {};
process(req: DataQueryRequest<TestDataQuery>, observer: DataStreamObserver): DataQueryResponse | undefined {
let resp: DataQueryResponse;
for (const query of req.targets) {
if ('streaming_client' !== query.scenarioId) {
continue;
}
if (!resp) {
resp = { data: [] };
}
// set stream option defaults
query.stream = defaults(query.stream, defaultQuery);
// create stream key
const key = req.dashboardId + '/' + req.panelId + '/' + query.refId;
if (this.workers[key]) {
const existing = this.workers[key];
if (existing.update(query, req)) {
continue;
}
existing.unsubscribe();
delete this.workers[key];
}
const type = query.stream.type;
if (type === 'signal') {
this.workers[key] = new SignalWorker(key, query, req, observer);
} else if (type === 'logs') {
this.workers[key] = new LogsWorker(key, query, req, observer);
} else {
throw {
message: 'Unknown Stream type: ' + type,
refId: query.refId,
} as DataQueryError;
}
}
return resp;
}
}
/**
* Manages a single stream request
*/
export class StreamWorker {
query: StreamingQuery;
stream: DataStreamState;
observer: DataStreamObserver;
last = -1;
timeoutId = 0;
constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) {
this.stream = {
key,
state: LoadingState.Streaming,
request,
unsubscribe: this.unsubscribe,
};
this.query = query.stream;
this.last = Date.now();
this.observer = observer;
console.log('Creating Test Stream: ', this);
}
unsubscribe = () => {
this.observer = null;
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = 0;
}
};
update(query: TestDataQuery, request: DataQueryRequest): boolean {
// Check if stream has been unsubscribed or query changed type
if (this.observer === null || this.query.type !== query.stream.type) {
return false;
}
this.query = query.stream;
this.stream.request = request; // OK?
console.log('Reuse Test Stream: ', this);
return true;
}
appendRows(append: any[][]) {
// Trim the maximum row count
const { query, stream } = this;
const maxRows = query.buffer ? query.buffer : stream.request.maxDataPoints;
// Edit the first series
const series = stream.series[0];
let rows = series.rows.concat(append);
const extra = maxRows - rows.length;
if (extra < 0) {
rows = rows.slice(extra * -1);
}
series.rows = rows;
// Tell the event about only the rows that changed (it may want to process them)
stream.delta = [{ ...series, rows: append }];
// Broadcast the changes
if (this.observer) {
this.observer(stream);
} else {
console.log('StreamWorker working without any observer');
}
this.last = Date.now();
}
}
export class SignalWorker extends StreamWorker {
value: number;
constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) {
super(key, query, request, observer);
setTimeout(() => {
this.stream.series = [this.initBuffer(query.refId)];
this.looper();
}, 10);
}
nextRow = (time: number) => {
const { spread, noise } = this.query;
this.value += (Math.random() - 0.5) * spread;
return [
time,
this.value, // Value
this.value - Math.random() * noise, // MIN
this.value + Math.random() * noise, // MAX
];
};
initBuffer(refId: string): SeriesData {
const { speed, buffer } = this.query;
const data = {
fields: [
{ name: 'Time', type: FieldType.time },
{ name: 'Value', type: FieldType.number },
{ name: 'Min', type: FieldType.number },
{ name: 'Max', type: FieldType.number },
],
rows: [],
refId,
name: 'Signal ' + refId,
} as SeriesData;
const request = this.stream.request;
this.value = Math.random() * 100;
const maxRows = buffer ? buffer : request.maxDataPoints;
let time = Date.now() - maxRows * speed;
for (let i = 0; i < maxRows; i++) {
data.rows.push(this.nextRow(time));
time += speed;
}
return data;
}
looper = () => {
if (!this.observer) {
const request = this.stream.request;
const elapsed = request.startTime - Date.now();
if (elapsed > 1000) {
console.log('Stop looping');
return;
}
}
// Make sure it has a minimum speed
const { query } = this;
if (query.speed < 5) {
query.speed = 5;
}
this.appendRows([this.nextRow(Date.now())]);
this.timeoutId = window.setTimeout(this.looper, query.speed);
};
}
export class LogsWorker extends StreamWorker {
index = 0;
constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) {
super(key, query, request, observer);
window.setTimeout(() => {
this.stream.series = [this.initBuffer(query.refId)];
this.looper();
}, 10);
}
getNextWord() {
this.index = (this.index + Math.floor(Math.random() * 5)) % words.length;
return words[this.index];
}
getRandomLine() {
let line = this.getNextWord();
while (line.length < 80) {
line += ' ' + this.getNextWord();
}
return line;
}
nextRow = (time: number) => {
return [time, this.getRandomLine()];
};
initBuffer(refId: string): SeriesData {
const { speed, buffer } = this.query;
const data = {
fields: [{ name: 'Time', type: FieldType.time }, { name: 'Line', type: FieldType.string }],
rows: [],
refId,
name: 'Logs ' + refId,
} as SeriesData;
const request = this.stream.request;
const maxRows = buffer ? buffer : request.maxDataPoints;
let time = Date.now() - maxRows * speed;
for (let i = 0; i < maxRows; i++) {
data.rows.push(this.nextRow(time));
time += speed;
}
return data;
}
looper = () => {
if (!this.observer) {
const request = this.stream.request;
const elapsed = request.startTime - Date.now();
if (elapsed > 1000) {
console.log('Stop looping');
return;
}
}
// Make sure it has a minimum speed
const { query } = this;
if (query.speed < 5) {
query.speed = 5;
}
this.appendRows([this.nextRow(Date.now())]);
this.timeoutId = window.setTimeout(this.looper, query.speed);
};
}
const words = [
'At',
'vero',
'eos',
'et',
'accusamus',
'et',
'iusto',
'odio',
'dignissimos',
'ducimus',
'qui',
'blanditiis',
'praesentium',
'voluptatum',
'deleniti',
'atque',
'corrupti',
'quos',
'dolores',
'et',
'quas',
'molestias',
'excepturi',
'sint',
'occaecati',
'cupiditate',
'non',
'provident',
'similique',
'sunt',
'in',
'culpa',
'qui',
'officia',
'deserunt',
'mollitia',
'animi',
'id',
'est',
'laborum',
'et',
'dolorum',
'fuga',
'Et',
'harum',
'quidem',
'rerum',
'facilis',
'est',
'et',
'expedita',
'distinctio',
'Nam',
'libero',
'tempore',
'cum',
'soluta',
'nobis',
'est',
'eligendi',
'optio',
'cumque',
'nihil',
'impedit',
'quo',
'minus',
'id',
'quod',
'maxime',
'placeat',
'facere',
'possimus',
'omnis',
'voluptas',
'assumenda',
'est',
'omnis',
'dolor',
'repellendus',
'Temporibus',
'autem',
'quibusdam',
'et',
'aut',
'officiis',
'debitis',
'aut',
'rerum',
'necessitatibus',
'saepe',
'eveniet',
'ut',
'et',
'voluptates',
'repudiandae',
'sint',
'et',
'molestiae',
'non',
'recusandae',
'Itaque',
'earum',
'rerum',
'hic',
'tenetur',
'a',
'sapiente',
'delectus',
'ut',
'aut',
'reiciendis',
'voluptatibus',
'maiores',
'alias',
'consequatur',
'aut',
'perferendis',
'doloribus',
'asperiores',
'repellat',
];

View File

@ -1,6 +1,15 @@
import _ from 'lodash';
import { DataSourceApi, DataQueryRequest, TableData, TimeSeries } from '@grafana/ui';
import {
DataSourceApi,
DataQueryRequest,
TableData,
TimeSeries,
DataSourceInstanceSettings,
DataStreamObserver,
} from '@grafana/ui';
import { TestDataQuery, Scenario } from './types';
import { getBackendSrv } from 'app/core/services/backend_srv';
import { StreamHandler } from './StreamHandler';
type TestData = TimeSeries | TableData;
@ -10,16 +19,15 @@ export interface TestDataRegistry {
export class TestDataDatasource implements DataSourceApi<TestDataQuery> {
id: number;
streams = new StreamHandler();
/** @ngInject */
constructor(instanceSettings, private backendSrv, private $q) {
constructor(instanceSettings: DataSourceInstanceSettings) {
this.id = instanceSettings.id;
}
query(options: DataQueryRequest<TestDataQuery>) {
const queries = _.filter(options.targets, item => {
return item.hide !== true;
}).map(item => {
query(options: DataQueryRequest<TestDataQuery>, observer: DataStreamObserver) {
const queries = options.targets.map(item => {
return {
refId: item.refId,
scenarioId: item.scenarioId,
@ -33,10 +41,16 @@ export class TestDataDatasource implements DataSourceApi<TestDataQuery> {
});
if (queries.length === 0) {
return this.$q.when({ data: [] });
return Promise.resolve({ data: [] });
}
return this.backendSrv
// Currently we do not support mixed with client only streaming
const resp = this.streams.process(options, observer);
if (resp) {
return Promise.resolve(resp);
}
return getBackendSrv()
.datasourceRequest({
method: 'POST',
url: '/api/tsdb/query',
@ -76,7 +90,7 @@ export class TestDataDatasource implements DataSourceApi<TestDataQuery> {
});
}
annotationQuery(options) {
annotationQuery(options: any) {
let timeWalker = options.range.from.valueOf();
const to = options.range.to.valueOf();
const events = [];
@ -92,7 +106,7 @@ export class TestDataDatasource implements DataSourceApi<TestDataQuery> {
});
timeWalker += step;
}
return this.$q.when(events);
return Promise.resolve(events);
}
getQueryDisplayText(query: TestDataQuery) {
@ -110,6 +124,6 @@ export class TestDataDatasource implements DataSourceApi<TestDataQuery> {
}
getScenarios(): Promise<Scenario[]> {
return this.backendSrv.get('/api/tsdb/testdata/scenarios');
return getBackendSrv().get('/api/tsdb/testdata/scenarios');
}
}

View File

@ -36,4 +36,50 @@
<div class="gf-form-label gf-form-label--grow"></div>
</div>
</div>
<div class="gf-form-inline" ng-if="ctrl.scenario.id === 'streaming_client'">
<div class="gf-form gf-form">
<label class="gf-form-label query-keyword width-7">Type</label>
<div class="gf-form-select-wrapper">
<select
ng-model="ctrl.target.stream.type"
class="gf-form-input"
ng-options="type for type in ['signal','logs']"
ng-change="ctrl.streamChanged()" />
</select>
</div>
</div>
<div class="gf-form">
<label class="gf-form-label query-keyword">Speed (ms)</label>
<input type="number"
class="gf-form-input width-5"
placeholder="value"
ng-model="ctrl.target.stream.speed"
min="10"
step="10"
ng-change="ctrl.streamChanged()" />
</div>
<div class="gf-form">
<label class="gf-form-label query-keyword">Spread</label>
<input type="number"
class="gf-form-input width-5"
placeholder="value"
ng-model="ctrl.target.stream.spread"
min="0.5"
step="0.1"
ng-change="ctrl.streamChanged()" />
</div>
<div class="gf-form">
<label class="gf-form-label query-keyword">Noise</label>
<input type="number"
class="gf-form-input width-5"
placeholder="value"
ng-model="ctrl.target.stream.noise"
min="0"
step="0.1"
ng-change="ctrl.streamChanged()" />
</div>
<div class="gf-form gf-form--grow">
<div class="gf-form-label gf-form-label--grow"></div>
</div>
</div>
</query-editor-row>

View File

@ -2,6 +2,8 @@ import _ from 'lodash';
import { QueryCtrl } from 'app/plugins/sdk';
import moment from 'moment';
import { defaultQuery } from './StreamHandler';
import { getBackendSrv } from 'app/core/services/backend_srv';
export class TestDataQueryCtrl extends QueryCtrl {
static templateUrl = 'partials/query.editor.html';
@ -13,7 +15,7 @@ export class TestDataQueryCtrl extends QueryCtrl {
selectedPoint: any;
/** @ngInject */
constructor($scope, $injector, private backendSrv) {
constructor($scope: any, $injector: any) {
super($scope, $injector);
this.target.scenarioId = this.target.scenarioId || 'random_walk';
@ -49,10 +51,12 @@ export class TestDataQueryCtrl extends QueryCtrl {
}
$onInit() {
return this.backendSrv.get('/api/tsdb/testdata/scenarios').then(res => {
this.scenarioList = res;
this.scenario = _.find(this.scenarioList, { id: this.target.scenarioId });
});
return getBackendSrv()
.get('/api/tsdb/testdata/scenarios')
.then(res => {
this.scenarioList = res;
this.scenario = _.find(this.scenarioList, { id: this.target.scenarioId });
});
}
scenarioChanged() {
@ -65,6 +69,16 @@ export class TestDataQueryCtrl extends QueryCtrl {
delete this.target.points;
}
if (this.target.scenarioId === 'streaming_client') {
this.target.stream = _.defaults(this.target.stream || {}, defaultQuery);
} else {
delete this.target.stream;
}
this.refresh();
}
streamChanged() {
this.refresh();
}
}

View File

@ -1,13 +1,22 @@
import { DataQuery } from '@grafana/ui/src/types';
export interface TestDataQuery extends DataQuery {
alias?: string;
scenarioId: string;
stringInput: string;
points: any;
}
export interface Scenario {
id: string;
name: string;
}
export interface TestDataQuery extends DataQuery {
alias?: string;
scenarioId: string;
stringInput: string;
points?: any[];
stream?: StreamingQuery;
}
export interface StreamingQuery {
type: 'signal' | 'logs';
speed: number;
spread: number;
noise: number; // wiggle around the signal for min/max
buffer?: number;
}