Live: remove measurement controller (#32622)

This commit is contained in:
Ryan McKinley 2021-04-01 22:32:56 -07:00 committed by GitHub
parent db12818d25
commit d2afcdd415
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 238 additions and 372 deletions

View File

@ -0,0 +1,17 @@
import { LiveChannelScope, parseLiveChannelAddress } from './live';
describe('parse address', () => {
it('simple address', () => {
const addr = parseLiveChannelAddress('plugin/testdata/random-flakey-stream');
expect(addr?.scope).toBe(LiveChannelScope.Plugin);
expect(addr?.namespace).toBe('testdata');
expect(addr?.path).toBe('random-flakey-stream');
});
it('suppors full path', () => {
const addr = parseLiveChannelAddress('plugin/testdata/a/b/c/d ');
expect(addr?.scope).toBe(LiveChannelScope.Plugin);
expect(addr?.namespace).toBe('testdata');
expect(addr?.path).toBe('a/b/c/d');
});
});

View File

@ -6,6 +6,8 @@ import { Observable } from 'rxjs';
* ${scope}/${namespace}/${path}
*
* The scope drives how the namespace is used and controlled
*
* @alpha
*/
export enum LiveChannelScope {
DataSource = 'ds', // namespace = data source ID
@ -16,7 +18,7 @@ export enum LiveChannelScope {
/**
* @alpha -- experimental
*/
export interface LiveChannelConfig<TMessage = any, TController = any> {
export interface LiveChannelConfig {
/**
* The path definition. either static, or it may contain variables identifed with {varname}
*/
@ -37,12 +39,6 @@ export interface LiveChannelConfig<TMessage = any, TController = any> {
* The function will return true/false if the current user can publish
*/
canPublish?: () => boolean;
/** convert the raw stream message into a message that should be broadcast */
processMessage?: (msg: any) => TMessage;
/** some channels are managed by an explicit interface */
getController?: () => TController;
}
export enum LiveChannelConnectionState {
@ -88,6 +84,11 @@ export interface LiveChannelStatusEvent {
*/
state: LiveChannelConnectionState;
/**
* When joining a channel, there may be an initial packet in the subscribe method
*/
message?: any;
/**
* The last error.
*
@ -149,6 +150,25 @@ export interface LiveChannelAddress {
path: string;
}
/**
* Return an address from a string
*
* @alpha -- experimental
*/
export function parseLiveChannelAddress(id: string): LiveChannelAddress | undefined {
if (id?.length) {
let parts = id.trim().split('/');
if (parts.length >= 3) {
return {
scope: parts[0] as LiveChannelScope,
namespace: parts[1],
path: parts.slice(2).join('/'),
};
}
}
return undefined;
}
/**
* Check if the address has a scope, namespace, and path
*/
@ -173,7 +193,7 @@ export interface LiveChannel<TMessage = any, TPublish = any> {
config?: LiveChannelConfig;
/**
* Watch all events in this channel
* Watch for messages in a channel
*/
getStream: () => Observable<LiveChannelEvent<TMessage>>;
@ -192,7 +212,7 @@ export interface LiveChannel<TMessage = any, TPublish = any> {
publish?: (msg: TPublish) => Promise<any>;
/**
* This will close and terminate this channel
* Close and terminate the channel for everyone
*/
disconnect: () => void;
}

View File

@ -1,77 +0,0 @@
import { FieldType } from '@grafana/data';
import { MeasurementCollector } from './collector';
describe('MeasurementCollector', () => {
it('should collect values', () => {
const collector = new MeasurementCollector();
collector.addBatch({
batch: [
{
key: 'aaa',
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200],
[1, 2],
],
},
},
{
key: 'aaa',
data: { values: [[300], [3]] },
},
{
key: 'aaa',
data: { values: [[400], [4]] },
},
],
});
const frames = collector.getData();
expect(frames.length).toEqual(1);
expect(frames[0]).toMatchInlineSnapshot(`
StreamingDataFrame {
"fields": Array [
Object {
"config": Object {},
"labels": undefined,
"name": "time",
"type": "time",
"values": Array [
100,
200,
300,
400,
],
},
Object {
"config": Object {},
"labels": undefined,
"name": "value",
"type": "number",
"values": Array [
1,
2,
3,
4,
],
},
],
"length": 4,
"meta": undefined,
"name": undefined,
"options": Object {
"maxDelta": Infinity,
"maxLength": 600,
},
"refId": undefined,
"timeFieldIndex": 0,
}
`);
});
});

View File

@ -1,86 +0,0 @@
import { DataFrame, DataFrameJSON, StreamingDataFrame, StreamingFrameOptions } from '@grafana/data';
import { MeasurementBatch, LiveMeasurements, MeasurementsQuery } from './types';
/**
* This will collect
*
* @alpha -- experimental
*/
export class MeasurementCollector implements LiveMeasurements {
measurements = new Map<string, StreamingDataFrame>();
config: StreamingFrameOptions = {
maxLength: 600, // Default capacity 10min @ 1hz
};
//------------------------------------------------------
// Public
//------------------------------------------------------
getData(query?: MeasurementsQuery): DataFrame[] {
const { key, fields } = query || {};
// Find the data
let data: StreamingDataFrame[] = [];
if (key) {
const f = this.measurements.get(key);
if (!f) {
return [];
}
data.push(f);
} else {
// Add all frames
for (const f of this.measurements.values()) {
data.push(f);
}
}
// Filter the fields we want
if (fields && fields.length) {
let filtered: DataFrame[] = [];
for (const frame of data) {
const match = frame.fields.filter((f) => fields.includes(f.name));
if (match.length > 0) {
filtered.push({ ...frame, fields: match, length: frame.length }); // Copy the frame with fewer fields
}
}
if (filtered.length) {
return filtered;
}
}
return data;
}
getKeys(): string[] {
return Object.keys(this.measurements);
}
ensureCapacity(size: number) {
// TODO...
}
//------------------------------------------------------
// Collector
//------------------------------------------------------
addBatch = (msg: MeasurementBatch) => {
// HACK! sending one message from the backend, not a batch
if (!msg.batch) {
const df: DataFrameJSON = msg as any;
msg = { batch: [df] };
console.log('NOTE converting message to batch');
}
for (const measure of msg.batch) {
const key = measure.key ?? measure.schema?.name ?? '';
let s = this.measurements.get(key);
if (s) {
s.push(measure);
} else {
s = new StreamingDataFrame(measure, this.config); //
this.measurements.set(key, s);
}
}
return this;
};
}

View File

@ -1,3 +1 @@
export * from './types';
export * from './collector';
export * from './query';

View File

@ -1,77 +1,114 @@
import {
DataFrame,
DataFrameJSON,
DataQueryResponse,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
isValidLiveChannelAddress,
LiveChannelAddress,
LiveChannelConnectionState,
LiveChannelEvent,
LoadingState,
StreamingDataFrame,
StreamingFrameOptions,
} from '@grafana/data';
import { LiveMeasurements, MeasurementsQuery } from './types';
import { getGrafanaLiveSrv } from '../services/live';
import { Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { toDataQueryError } from '../utils/queryResponse';
/**
* @alpha -- experimental
*/
export function getLiveMeasurements(addr: LiveChannelAddress): LiveMeasurements | undefined {
if (!isValidLiveChannelAddress(addr)) {
return undefined;
}
const live = getGrafanaLiveSrv();
if (!live) {
return undefined;
}
const channel = live.getChannel<LiveMeasurements>(addr);
const getController = channel?.config?.getController;
return getController ? getController() : undefined;
export interface LiveDataFilter {
fields?: string[];
}
/**
* When you know the stream will be managed measurements
* @alpha
*/
export interface LiveDataStreamOptions {
key?: string;
addr: LiveChannelAddress;
buffer?: StreamingFrameOptions;
filter?: LiveDataFilter;
}
/**
* Continue executing requests as long as `getNextQuery` returns a query
*
* @alpha -- experimental
* @alpha
*/
export function getLiveMeasurementsObserver(
addr: LiveChannelAddress,
requestId: string,
query?: MeasurementsQuery
): Observable<DataQueryResponse> {
const rsp: DataQueryResponse = { data: [] };
if (!addr || !addr.path) {
return of(rsp); // Address not configured yet
export function getLiveDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
if (!isValidLiveChannelAddress(options.addr)) {
return of({ error: toDataQueryError('invalid address'), data: [] });
}
const live = getGrafanaLiveSrv();
if (!live) {
// This will only happen with the feature flag is not enabled
rsp.error = { message: 'Grafana live is not initalized' };
return of(rsp);
return of({ error: toDataQueryError('grafana live is not initalized'), data: [] });
}
rsp.key = requestId;
return live
.getChannel<LiveMeasurements>(addr)
.getStream()
.pipe(
map((evt) => {
if (isLiveChannelMessageEvent(evt)) {
rsp.data = evt.message.getData(query);
if (!rsp.data.length) {
// ?? skip when data is empty ???
return new Observable<DataQueryResponse>((subscriber) => {
let data: StreamingDataFrame | undefined = undefined;
let state = LoadingState.Loading;
const { key, filter } = options;
const process = (msg: DataFrameJSON) => {
if (!data) {
data = new StreamingDataFrame(msg, options.buffer);
} else {
data.push(msg);
}
state = LoadingState.Streaming;
// TODO? this *coud* happen only when the schema changes
let filtered = data as DataFrame;
if (filter?.fields && filter.fields.length) {
filtered = {
...data,
fields: data.fields.filter((f) => filter.fields!.includes(f.name)),
};
}
subscriber.next({ state, data: [filtered], key });
};
const sub = live
.getChannel<DataFrameJSON>(options.addr)
.getStream()
.subscribe({
error: (err: any) => {
state = LoadingState.Error;
subscriber.next({ state, data: [data], key });
sub.unsubscribe(); // close after error
},
complete: () => {
if (state !== LoadingState.Error) {
state = LoadingState.Done;
}
delete rsp.error;
rsp.state = LoadingState.Streaming;
} else if (isLiveChannelStatusEvent(evt)) {
if (evt.error != null) {
rsp.error = rsp.error;
rsp.state = LoadingState.Error;
subscriber.next({ state, data: [data], key });
subscriber.complete();
sub.unsubscribe();
},
next: (evt: LiveChannelEvent) => {
if (isLiveChannelMessageEvent(evt)) {
process(evt.message);
return;
}
}
return { ...rsp }; // send event on all status messages
})
);
if (isLiveChannelStatusEvent(evt)) {
if (
evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending
) {
if (evt.message) {
process(evt.message);
}
return;
}
console.log('ignore state', evt);
}
},
});
return () => {
sub.unsubscribe();
};
});
}

View File

@ -1,32 +0,0 @@
import { DataFrame, DataFrameJSON } from '@grafana/data';
/**
* List of Measurements sent in a batch
*
* @alpha -- experimental
*/
export interface MeasurementBatch {
/**
* List of measurements to process
*/
batch: DataFrameJSON[];
}
/**
* @alpha -- experimental
*/
export interface MeasurementsQuery {
key?: string;
fields?: string[]; // only include the fields with these names
}
/**
* Channels that receive Measurements can collect them into frames
*
* @alpha -- experimental
*/
export interface LiveMeasurements {
getData(query?: MeasurementsQuery): DataFrame[];
getKeys(): string[];
ensureCapacity(size: number): void;
}

View File

@ -35,12 +35,12 @@ func (p *testStreamHandler) RunStream(ctx context.Context, request *backend.RunS
switch request.Path {
case "random-2s-stream":
conf = testStreamConfig{
Interval: 200 * time.Millisecond,
Interval: 2 * time.Second,
}
case "random-flakey-stream":
conf = testStreamConfig{
Interval: 200 * time.Millisecond,
Drop: 0.6,
Interval: 100 * time.Millisecond,
Drop: 0.75, // keep 25%
}
case "random-20Hz-stream":
conf = testStreamConfig{

View File

@ -53,17 +53,15 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
throw new Error('Channel already initalized: ' + this.id);
}
this.config = config;
const prepare = config.processMessage ? config.processMessage : (v: any) => v;
const events: SubscriptionEvents = {
// This means a message was received from the server
// Called when a message is recieved from the socket
publish: (ctx: PublicationContext) => {
try {
const message = prepare(ctx.data);
if (message) {
if (ctx.data) {
this.stream.next({
type: LiveChannelEventType.Message,
message,
message: ctx.data,
});
}
@ -117,8 +115,12 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
return events;
}
private sendStatus() {
this.stream.next({ ...this.currentStatus });
private sendStatus(message?: any) {
const copy = { ...this.currentStatus };
if (message) {
copy.message = message;
}
this.stream.next(copy);
}
/**

View File

@ -1,31 +1,21 @@
import { LiveChannelConfig } from '@grafana/data';
import { MeasurementCollector } from '@grafana/runtime';
import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher';
import { LiveMeasurementsSupport } from './measurements/measurementsSupport';
import { grafanaLiveCoreFeatures } from './scopes';
export function registerLiveFeatures() {
const random2s = new MeasurementCollector();
const randomFlakey = new MeasurementCollector();
const random20Hz = new MeasurementCollector();
const channels: LiveChannelConfig[] = [
{
path: 'random-2s-stream',
description: 'Random stream with points every 2s',
getController: () => random2s,
processMessage: random2s.addBatch,
},
{
path: 'random-flakey-stream',
description: 'Random stream with flakey data points',
getController: () => randomFlakey,
processMessage: randomFlakey.addBatch,
},
{
path: 'random-20Hz-stream',
description: 'Random stream with points in 20Hz',
getController: () => random20Hz,
processMessage: random20Hz.addBatch,
},
];

View File

@ -1,13 +1,7 @@
import { LiveChannelSupport, LiveChannelConfig } from '@grafana/data';
import { MeasurementCollector } from '@grafana/runtime';
interface MeasurementChannel {
config: LiveChannelConfig;
collector: MeasurementCollector;
}
export class LiveMeasurementsSupport implements LiveChannelSupport {
private cache: Record<string, MeasurementChannel> = {};
private cache: Record<string, LiveChannelConfig> = {};
/**
* Get the channel handler for the path, or throw an error if invalid
@ -15,19 +9,11 @@ export class LiveMeasurementsSupport implements LiveChannelSupport {
getChannelConfig(path: string): LiveChannelConfig | undefined {
let c = this.cache[path];
if (!c) {
// Create a new cache for each path
const collector = new MeasurementCollector();
c = this.cache[path] = {
collector,
config: {
path,
processMessage: collector.addBatch, // << this converts the stream from a single event to the whole cache
getController: () => collector,
canPublish: () => true,
},
c = {
path,
};
}
return c.config;
return c;
}
/**

View File

@ -2,8 +2,7 @@ import defaults from 'lodash/defaults';
import React, { PureComponent } from 'react';
import { InlineField, Select, FeatureInfoBox } from '@grafana/ui';
import { QueryEditorProps, SelectableValue, LiveChannelScope, FeatureState } from '@grafana/data';
import { getLiveMeasurements, LiveMeasurements } from '@grafana/runtime';
import { QueryEditorProps, SelectableValue, FeatureState, getFrameDisplayName } from '@grafana/data';
import { GrafanaDatasource } from '../datasource';
import { defaultQuery, GrafanaQuery, GrafanaQueryType } from '../types';
@ -37,21 +36,38 @@ export class QueryEditor extends PureComponent<Props> {
onRunQuery();
};
onMeasurementNameChanged = (sel: SelectableValue<string>) => {
onFieldNamesChange = (item: SelectableValue<string>) => {
const { onChange, query, onRunQuery } = this.props;
let fields: string[] = [];
if (Array.isArray(item)) {
fields = item.map((v) => v.value);
} else if (item.value) {
fields = [item.value];
}
onChange({
...query,
measurements: {
...query.measurements,
key: sel?.value,
filter: {
...query.filter,
fields,
},
});
onRunQuery();
};
renderMeasurementsQuery() {
let { channel, measurements } = this.props.query;
const channels: Array<SelectableValue<string>> = [];
const { data } = this.props;
let { channel, filter } = this.props.query;
const channels: Array<SelectableValue<string>> = [
{
value: 'plugin/testdata/random-2s-stream',
label: 'plugin/testdata/random-2s-stream',
},
{
value: 'plugin/testdata/random-flakey-stream',
label: 'plugin/testdata/random-flakey-stream',
},
];
let currentChannel = channels.find((c) => c.value === channel);
if (channel && !currentChannel) {
currentChannel = {
@ -62,42 +78,33 @@ export class QueryEditor extends PureComponent<Props> {
channels.push(currentChannel);
}
if (!measurements) {
measurements = {};
}
const names: Array<SelectableValue<string>> = [
{ value: '', label: 'All measurements', description: 'Show every measurement streamed to this channel' },
];
let info: LiveMeasurements | undefined = undefined;
if (channel) {
info = getLiveMeasurements({
scope: LiveChannelScope.Grafana,
namespace: 'measurements',
path: channel,
});
let foundName = false;
if (info) {
for (const name of info.getKeys()) {
names.push({
value: name,
label: name,
});
if (name === measurements.key) {
foundName = true;
const distinctFields = new Set<string>();
const fields: Array<SelectableValue<string>> = [];
if (data && data.series?.length) {
for (const frame of data.series) {
for (const field of frame.fields) {
if (distinctFields.has(field.name) || !field.name) {
continue;
}
fields.push({
value: field.name,
label: field.name,
description: `(${getFrameDisplayName(frame)} / ${field.type})`,
});
distinctFields.add(field.name);
}
} else {
console.log('NO INFO for', channel);
}
if (measurements.key && !foundName) {
names.push({
label: measurements.key,
value: measurements.key,
description: `Frames with key ${measurements.key}`,
});
}
if (filter?.fields) {
for (const f of filter.fields) {
if (!distinctFields.has(f)) {
fields.push({
value: f,
label: `${f} (not loaded)`,
description: `Configured, but not found in the query results`,
});
distinctFields.add(f);
}
}
}
@ -120,18 +127,19 @@ export class QueryEditor extends PureComponent<Props> {
</div>
{channel && (
<div className="gf-form">
<InlineField label="Measurement" grow={true} labelWidth={labelWidth}>
<InlineField label="Fields" grow={true} labelWidth={labelWidth}>
<Select
options={names}
value={names.find((v) => v.value === measurements?.key) || names[0]}
onChange={this.onMeasurementNameChanged}
options={fields}
value={filter?.fields || []}
onChange={this.onFieldNamesChange}
allowCustomValue={true}
backspaceRemovesValue={true}
placeholder="Filter by name"
placeholder="All fields"
isClearable={true}
noOptionsMessage="Filter by name"
formatCreateLabel={(input: string) => `Show: ${input}`}
noOptionsMessage="Unable to list all fields"
formatCreateLabel={(input: string) => `Field: ${input}`}
isSearchable={true}
isMulti={true}
/>
</InlineField>
</div>

View File

@ -6,11 +6,12 @@ import {
DataQueryResponse,
DataSourceApi,
DataSourceInstanceSettings,
LiveChannelScope,
parseLiveChannelAddress,
StreamingFrameOptions,
} from '@grafana/data';
import { GrafanaQuery, GrafanaAnnotationQuery, GrafanaAnnotationType, GrafanaQueryType } from './types';
import { getBackendSrv, getTemplateSrv, toDataQueryResponse, getLiveMeasurementsObserver } from '@grafana/runtime';
import { getBackendSrv, getTemplateSrv, toDataQueryResponse, getLiveDataStream } from '@grafana/runtime';
import { Observable, of, merge } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
@ -22,24 +23,31 @@ export class GrafanaDatasource extends DataSourceApi<GrafanaQuery> {
}
query(request: DataQueryRequest<GrafanaQuery>): Observable<DataQueryResponse> {
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};
if (request.rangeRaw?.to === 'now') {
const elapsed = request.range.to.valueOf() - request.range.from.valueOf();
buffer.maxDelta = elapsed;
}
const queries: Array<Observable<DataQueryResponse>> = [];
for (const target of request.targets) {
if (target.hide) {
continue;
}
if (target.queryType === GrafanaQueryType.LiveMeasurements) {
const { channel, measurements } = target;
const { channel, filter } = target;
if (channel) {
const addr = parseLiveChannelAddress(channel);
queries.push(
getLiveMeasurementsObserver(
{
scope: LiveChannelScope.Grafana,
namespace: 'measurements',
path: channel,
},
`${request.requestId}.${counter++}`,
measurements
)
getLiveDataStream({
key: `${request.requestId}.${counter++}`,
addr: addr!,
filter,
buffer,
})
);
}
} else {

View File

@ -1,5 +1,5 @@
import { AnnotationQuery, DataQuery } from '@grafana/data';
import { MeasurementsQuery } from '@grafana/runtime';
import { LiveDataFilter } from '@grafana/runtime';
//----------------------------------------------
// Query
@ -13,7 +13,7 @@ export enum GrafanaQueryType {
export interface GrafanaQuery extends DataQuery {
queryType: GrafanaQueryType; // RandomWalk by default
channel?: string;
measurements?: MeasurementsQuery;
filter?: LiveDataFilter;
}
export const defaultQuery: GrafanaQuery = {

View File

@ -14,13 +14,7 @@ import {
TimeRange,
} from '@grafana/data';
import { Scenario, TestDataQuery } from './types';
import {
DataSourceWithBackend,
getBackendSrv,
getLiveMeasurementsObserver,
getTemplateSrv,
TemplateSrv,
} from '@grafana/runtime';
import { DataSourceWithBackend, getBackendSrv, getLiveDataStream, getTemplateSrv, TemplateSrv } from '@grafana/runtime';
import { queryMetricTree } from './metricTree';
import { runStream } from './runStreams';
import { getSearchFilterScopedVar } from 'app/features/variables/utils';
@ -194,12 +188,12 @@ function runGrafanaLiveQuery(
if (!target.channel) {
throw new Error(`Missing channel config`);
}
return getLiveMeasurementsObserver(
{
return getLiveDataStream({
addr: {
scope: LiveChannelScope.Plugin,
namespace: 'testdata',
path: target.channel,
},
`testStream.${liveQueryCounter++}`
);
key: `testStream.${liveQueryCounter++}`,
});
}

View File

@ -14,10 +14,11 @@ import {
PanelData,
LoadingState,
applyFieldOverrides,
StreamingDataFrame,
} from '@grafana/data';
import { TablePanel } from '../table/TablePanel';
import { LivePanelOptions, MessageDisplayMode } from './types';
import { config, getGrafanaLiveSrv, MeasurementCollector } from '@grafana/runtime';
import { config, getGrafanaLiveSrv } from '@grafana/runtime';
import { css, cx } from '@emotion/css';
interface Props extends PanelProps<LivePanelOptions> {}
@ -168,10 +169,10 @@ export class LivePanel extends PureComponent<Props, State> {
}
if (options.message === MessageDisplayMode.Auto) {
if (message instanceof MeasurementCollector) {
if (message instanceof StreamingDataFrame) {
const data: PanelData = {
series: applyFieldOverrides({
data: message.getData(),
data: [message],
theme: config.theme,
replaceVariables: (v: string) => v,
fieldConfig: {