diff --git a/packages/grafana-data/src/types/live.test.ts b/packages/grafana-data/src/types/live.test.ts new file mode 100644 index 00000000000..e0481a3b90a --- /dev/null +++ b/packages/grafana-data/src/types/live.test.ts @@ -0,0 +1,17 @@ +import { LiveChannelScope, parseLiveChannelAddress } from './live'; + +describe('parse address', () => { + it('simple address', () => { + const addr = parseLiveChannelAddress('plugin/testdata/random-flakey-stream'); + expect(addr?.scope).toBe(LiveChannelScope.Plugin); + expect(addr?.namespace).toBe('testdata'); + expect(addr?.path).toBe('random-flakey-stream'); + }); + + it('suppors full path', () => { + const addr = parseLiveChannelAddress('plugin/testdata/a/b/c/d '); + expect(addr?.scope).toBe(LiveChannelScope.Plugin); + expect(addr?.namespace).toBe('testdata'); + expect(addr?.path).toBe('a/b/c/d'); + }); +}); diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index 999be33591e..0bdc3edb7dd 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -6,6 +6,8 @@ import { Observable } from 'rxjs'; * ${scope}/${namespace}/${path} * * The scope drives how the namespace is used and controlled + * + * @alpha */ export enum LiveChannelScope { DataSource = 'ds', // namespace = data source ID @@ -16,7 +18,7 @@ export enum LiveChannelScope { /** * @alpha -- experimental */ -export interface LiveChannelConfig { +export interface LiveChannelConfig { /** * The path definition. either static, or it may contain variables identifed with {varname} */ @@ -37,12 +39,6 @@ export interface LiveChannelConfig { * The function will return true/false if the current user can publish */ canPublish?: () => boolean; - - /** convert the raw stream message into a message that should be broadcast */ - processMessage?: (msg: any) => TMessage; - - /** some channels are managed by an explicit interface */ - getController?: () => TController; } export enum LiveChannelConnectionState { @@ -88,6 +84,11 @@ export interface LiveChannelStatusEvent { */ state: LiveChannelConnectionState; + /** + * When joining a channel, there may be an initial packet in the subscribe method + */ + message?: any; + /** * The last error. * @@ -149,6 +150,25 @@ export interface LiveChannelAddress { path: string; } +/** + * Return an address from a string + * + * @alpha -- experimental + */ +export function parseLiveChannelAddress(id: string): LiveChannelAddress | undefined { + if (id?.length) { + let parts = id.trim().split('/'); + if (parts.length >= 3) { + return { + scope: parts[0] as LiveChannelScope, + namespace: parts[1], + path: parts.slice(2).join('/'), + }; + } + } + return undefined; +} + /** * Check if the address has a scope, namespace, and path */ @@ -173,7 +193,7 @@ export interface LiveChannel { config?: LiveChannelConfig; /** - * Watch all events in this channel + * Watch for messages in a channel */ getStream: () => Observable>; @@ -192,7 +212,7 @@ export interface LiveChannel { publish?: (msg: TPublish) => Promise; /** - * This will close and terminate this channel + * Close and terminate the channel for everyone */ disconnect: () => void; } diff --git a/packages/grafana-runtime/src/measurement/collector.test.ts b/packages/grafana-runtime/src/measurement/collector.test.ts deleted file mode 100644 index 4ed440e2bb0..00000000000 --- a/packages/grafana-runtime/src/measurement/collector.test.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { FieldType } from '@grafana/data'; -import { MeasurementCollector } from './collector'; - -describe('MeasurementCollector', () => { - it('should collect values', () => { - const collector = new MeasurementCollector(); - collector.addBatch({ - batch: [ - { - key: 'aaa', - schema: { - fields: [ - { name: 'time', type: FieldType.time }, - { name: 'value', type: FieldType.number }, - ], - }, - data: { - values: [ - [100, 200], - [1, 2], - ], - }, - }, - { - key: 'aaa', - data: { values: [[300], [3]] }, - }, - { - key: 'aaa', - data: { values: [[400], [4]] }, - }, - ], - }); - - const frames = collector.getData(); - expect(frames.length).toEqual(1); - expect(frames[0]).toMatchInlineSnapshot(` - StreamingDataFrame { - "fields": Array [ - Object { - "config": Object {}, - "labels": undefined, - "name": "time", - "type": "time", - "values": Array [ - 100, - 200, - 300, - 400, - ], - }, - Object { - "config": Object {}, - "labels": undefined, - "name": "value", - "type": "number", - "values": Array [ - 1, - 2, - 3, - 4, - ], - }, - ], - "length": 4, - "meta": undefined, - "name": undefined, - "options": Object { - "maxDelta": Infinity, - "maxLength": 600, - }, - "refId": undefined, - "timeFieldIndex": 0, - } - `); - }); -}); diff --git a/packages/grafana-runtime/src/measurement/collector.ts b/packages/grafana-runtime/src/measurement/collector.ts deleted file mode 100644 index c8e16b4285e..00000000000 --- a/packages/grafana-runtime/src/measurement/collector.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { DataFrame, DataFrameJSON, StreamingDataFrame, StreamingFrameOptions } from '@grafana/data'; -import { MeasurementBatch, LiveMeasurements, MeasurementsQuery } from './types'; - -/** - * This will collect - * - * @alpha -- experimental - */ -export class MeasurementCollector implements LiveMeasurements { - measurements = new Map(); - config: StreamingFrameOptions = { - maxLength: 600, // Default capacity 10min @ 1hz - }; - - //------------------------------------------------------ - // Public - //------------------------------------------------------ - - getData(query?: MeasurementsQuery): DataFrame[] { - const { key, fields } = query || {}; - - // Find the data - let data: StreamingDataFrame[] = []; - if (key) { - const f = this.measurements.get(key); - if (!f) { - return []; - } - data.push(f); - } else { - // Add all frames - for (const f of this.measurements.values()) { - data.push(f); - } - } - - // Filter the fields we want - if (fields && fields.length) { - let filtered: DataFrame[] = []; - for (const frame of data) { - const match = frame.fields.filter((f) => fields.includes(f.name)); - if (match.length > 0) { - filtered.push({ ...frame, fields: match, length: frame.length }); // Copy the frame with fewer fields - } - } - if (filtered.length) { - return filtered; - } - } - return data; - } - - getKeys(): string[] { - return Object.keys(this.measurements); - } - - ensureCapacity(size: number) { - // TODO... - } - - //------------------------------------------------------ - // Collector - //------------------------------------------------------ - - addBatch = (msg: MeasurementBatch) => { - // HACK! sending one message from the backend, not a batch - if (!msg.batch) { - const df: DataFrameJSON = msg as any; - msg = { batch: [df] }; - console.log('NOTE converting message to batch'); - } - - for (const measure of msg.batch) { - const key = measure.key ?? measure.schema?.name ?? ''; - - let s = this.measurements.get(key); - if (s) { - s.push(measure); - } else { - s = new StreamingDataFrame(measure, this.config); // - this.measurements.set(key, s); - } - } - return this; - }; -} diff --git a/packages/grafana-runtime/src/measurement/index.ts b/packages/grafana-runtime/src/measurement/index.ts index 56180b6f43b..a38910e42af 100644 --- a/packages/grafana-runtime/src/measurement/index.ts +++ b/packages/grafana-runtime/src/measurement/index.ts @@ -1,3 +1 @@ -export * from './types'; -export * from './collector'; export * from './query'; diff --git a/packages/grafana-runtime/src/measurement/query.ts b/packages/grafana-runtime/src/measurement/query.ts index e4cbf0f5908..14a850dd42a 100644 --- a/packages/grafana-runtime/src/measurement/query.ts +++ b/packages/grafana-runtime/src/measurement/query.ts @@ -1,77 +1,114 @@ import { + DataFrame, + DataFrameJSON, DataQueryResponse, isLiveChannelMessageEvent, isLiveChannelStatusEvent, isValidLiveChannelAddress, LiveChannelAddress, + LiveChannelConnectionState, + LiveChannelEvent, LoadingState, + StreamingDataFrame, + StreamingFrameOptions, } from '@grafana/data'; -import { LiveMeasurements, MeasurementsQuery } from './types'; import { getGrafanaLiveSrv } from '../services/live'; import { Observable, of } from 'rxjs'; -import { map } from 'rxjs/operators'; +import { toDataQueryError } from '../utils/queryResponse'; -/** - * @alpha -- experimental - */ -export function getLiveMeasurements(addr: LiveChannelAddress): LiveMeasurements | undefined { - if (!isValidLiveChannelAddress(addr)) { - return undefined; - } - - const live = getGrafanaLiveSrv(); - if (!live) { - return undefined; - } - - const channel = live.getChannel(addr); - const getController = channel?.config?.getController; - return getController ? getController() : undefined; +export interface LiveDataFilter { + fields?: string[]; } /** - * When you know the stream will be managed measurements + * @alpha + */ +export interface LiveDataStreamOptions { + key?: string; + addr: LiveChannelAddress; + buffer?: StreamingFrameOptions; + filter?: LiveDataFilter; +} + +/** + * Continue executing requests as long as `getNextQuery` returns a query * - * @alpha -- experimental + * @alpha */ -export function getLiveMeasurementsObserver( - addr: LiveChannelAddress, - requestId: string, - query?: MeasurementsQuery -): Observable { - const rsp: DataQueryResponse = { data: [] }; - if (!addr || !addr.path) { - return of(rsp); // Address not configured yet +export function getLiveDataStream(options: LiveDataStreamOptions): Observable { + if (!isValidLiveChannelAddress(options.addr)) { + return of({ error: toDataQueryError('invalid address'), data: [] }); } - const live = getGrafanaLiveSrv(); if (!live) { - // This will only happen with the feature flag is not enabled - rsp.error = { message: 'Grafana live is not initalized' }; - return of(rsp); + return of({ error: toDataQueryError('grafana live is not initalized'), data: [] }); } - rsp.key = requestId; - return live - .getChannel(addr) - .getStream() - .pipe( - map((evt) => { - if (isLiveChannelMessageEvent(evt)) { - rsp.data = evt.message.getData(query); - if (!rsp.data.length) { - // ?? skip when data is empty ??? + return new Observable((subscriber) => { + let data: StreamingDataFrame | undefined = undefined; + let state = LoadingState.Loading; + const { key, filter } = options; + + const process = (msg: DataFrameJSON) => { + if (!data) { + data = new StreamingDataFrame(msg, options.buffer); + } else { + data.push(msg); + } + state = LoadingState.Streaming; + + // TODO? this *coud* happen only when the schema changes + let filtered = data as DataFrame; + if (filter?.fields && filter.fields.length) { + filtered = { + ...data, + fields: data.fields.filter((f) => filter.fields!.includes(f.name)), + }; + } + + subscriber.next({ state, data: [filtered], key }); + }; + + const sub = live + .getChannel(options.addr) + .getStream() + .subscribe({ + error: (err: any) => { + state = LoadingState.Error; + subscriber.next({ state, data: [data], key }); + sub.unsubscribe(); // close after error + }, + complete: () => { + if (state !== LoadingState.Error) { + state = LoadingState.Done; } - delete rsp.error; - rsp.state = LoadingState.Streaming; - } else if (isLiveChannelStatusEvent(evt)) { - if (evt.error != null) { - rsp.error = rsp.error; - rsp.state = LoadingState.Error; + subscriber.next({ state, data: [data], key }); + subscriber.complete(); + sub.unsubscribe(); + }, + next: (evt: LiveChannelEvent) => { + if (isLiveChannelMessageEvent(evt)) { + process(evt.message); + return; } - } - return { ...rsp }; // send event on all status messages - }) - ); + if (isLiveChannelStatusEvent(evt)) { + if ( + evt.state === LiveChannelConnectionState.Connected || + evt.state === LiveChannelConnectionState.Pending + ) { + if (evt.message) { + process(evt.message); + } + return; + } + console.log('ignore state', evt); + } + }, + }); + + return () => { + sub.unsubscribe(); + }; + }); } diff --git a/packages/grafana-runtime/src/measurement/types.ts b/packages/grafana-runtime/src/measurement/types.ts deleted file mode 100644 index 7727b3173f0..00000000000 --- a/packages/grafana-runtime/src/measurement/types.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { DataFrame, DataFrameJSON } from '@grafana/data'; - -/** - * List of Measurements sent in a batch - * - * @alpha -- experimental - */ -export interface MeasurementBatch { - /** - * List of measurements to process - */ - batch: DataFrameJSON[]; -} - -/** - * @alpha -- experimental - */ -export interface MeasurementsQuery { - key?: string; - fields?: string[]; // only include the fields with these names -} - -/** - * Channels that receive Measurements can collect them into frames - * - * @alpha -- experimental - */ -export interface LiveMeasurements { - getData(query?: MeasurementsQuery): DataFrame[]; - getKeys(): string[]; - ensureCapacity(size: number): void; -} diff --git a/pkg/tsdb/testdatasource/stream_handler.go b/pkg/tsdb/testdatasource/stream_handler.go index beb2cc9d02a..87ef99beeb8 100644 --- a/pkg/tsdb/testdatasource/stream_handler.go +++ b/pkg/tsdb/testdatasource/stream_handler.go @@ -35,12 +35,12 @@ func (p *testStreamHandler) RunStream(ctx context.Context, request *backend.RunS switch request.Path { case "random-2s-stream": conf = testStreamConfig{ - Interval: 200 * time.Millisecond, + Interval: 2 * time.Second, } case "random-flakey-stream": conf = testStreamConfig{ - Interval: 200 * time.Millisecond, - Drop: 0.6, + Interval: 100 * time.Millisecond, + Drop: 0.75, // keep 25% } case "random-20Hz-stream": conf = testStreamConfig{ diff --git a/public/app/features/live/channel.ts b/public/app/features/live/channel.ts index b6187be9576..1b9d199249d 100644 --- a/public/app/features/live/channel.ts +++ b/public/app/features/live/channel.ts @@ -53,17 +53,15 @@ export class CentrifugeLiveChannel implements Li throw new Error('Channel already initalized: ' + this.id); } this.config = config; - const prepare = config.processMessage ? config.processMessage : (v: any) => v; const events: SubscriptionEvents = { - // This means a message was received from the server + // Called when a message is recieved from the socket publish: (ctx: PublicationContext) => { try { - const message = prepare(ctx.data); - if (message) { + if (ctx.data) { this.stream.next({ type: LiveChannelEventType.Message, - message, + message: ctx.data, }); } @@ -117,8 +115,12 @@ export class CentrifugeLiveChannel implements Li return events; } - private sendStatus() { - this.stream.next({ ...this.currentStatus }); + private sendStatus(message?: any) { + const copy = { ...this.currentStatus }; + if (message) { + copy.message = message; + } + this.stream.next(copy); } /** diff --git a/public/app/features/live/features.ts b/public/app/features/live/features.ts index 79a40951a13..b2d868f93da 100644 --- a/public/app/features/live/features.ts +++ b/public/app/features/live/features.ts @@ -1,31 +1,21 @@ import { LiveChannelConfig } from '@grafana/data'; -import { MeasurementCollector } from '@grafana/runtime'; import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher'; import { LiveMeasurementsSupport } from './measurements/measurementsSupport'; import { grafanaLiveCoreFeatures } from './scopes'; export function registerLiveFeatures() { - const random2s = new MeasurementCollector(); - const randomFlakey = new MeasurementCollector(); - const random20Hz = new MeasurementCollector(); const channels: LiveChannelConfig[] = [ { path: 'random-2s-stream', description: 'Random stream with points every 2s', - getController: () => random2s, - processMessage: random2s.addBatch, }, { path: 'random-flakey-stream', description: 'Random stream with flakey data points', - getController: () => randomFlakey, - processMessage: randomFlakey.addBatch, }, { path: 'random-20Hz-stream', description: 'Random stream with points in 20Hz', - getController: () => random20Hz, - processMessage: random20Hz.addBatch, }, ]; diff --git a/public/app/features/live/measurements/measurementsSupport.ts b/public/app/features/live/measurements/measurementsSupport.ts index fe090499230..642c30e454b 100644 --- a/public/app/features/live/measurements/measurementsSupport.ts +++ b/public/app/features/live/measurements/measurementsSupport.ts @@ -1,13 +1,7 @@ import { LiveChannelSupport, LiveChannelConfig } from '@grafana/data'; -import { MeasurementCollector } from '@grafana/runtime'; - -interface MeasurementChannel { - config: LiveChannelConfig; - collector: MeasurementCollector; -} export class LiveMeasurementsSupport implements LiveChannelSupport { - private cache: Record = {}; + private cache: Record = {}; /** * Get the channel handler for the path, or throw an error if invalid @@ -15,19 +9,11 @@ export class LiveMeasurementsSupport implements LiveChannelSupport { getChannelConfig(path: string): LiveChannelConfig | undefined { let c = this.cache[path]; if (!c) { - // Create a new cache for each path - const collector = new MeasurementCollector(); - c = this.cache[path] = { - collector, - config: { - path, - processMessage: collector.addBatch, // << this converts the stream from a single event to the whole cache - getController: () => collector, - canPublish: () => true, - }, + c = { + path, }; } - return c.config; + return c; } /** diff --git a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx index 54b7d602d50..41efbff9291 100644 --- a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx +++ b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx @@ -2,8 +2,7 @@ import defaults from 'lodash/defaults'; import React, { PureComponent } from 'react'; import { InlineField, Select, FeatureInfoBox } from '@grafana/ui'; -import { QueryEditorProps, SelectableValue, LiveChannelScope, FeatureState } from '@grafana/data'; -import { getLiveMeasurements, LiveMeasurements } from '@grafana/runtime'; +import { QueryEditorProps, SelectableValue, FeatureState, getFrameDisplayName } from '@grafana/data'; import { GrafanaDatasource } from '../datasource'; import { defaultQuery, GrafanaQuery, GrafanaQueryType } from '../types'; @@ -37,21 +36,38 @@ export class QueryEditor extends PureComponent { onRunQuery(); }; - onMeasurementNameChanged = (sel: SelectableValue) => { + onFieldNamesChange = (item: SelectableValue) => { const { onChange, query, onRunQuery } = this.props; + let fields: string[] = []; + if (Array.isArray(item)) { + fields = item.map((v) => v.value); + } else if (item.value) { + fields = [item.value]; + } + onChange({ ...query, - measurements: { - ...query.measurements, - key: sel?.value, + filter: { + ...query.filter, + fields, }, }); onRunQuery(); }; renderMeasurementsQuery() { - let { channel, measurements } = this.props.query; - const channels: Array> = []; + const { data } = this.props; + let { channel, filter } = this.props.query; + const channels: Array> = [ + { + value: 'plugin/testdata/random-2s-stream', + label: 'plugin/testdata/random-2s-stream', + }, + { + value: 'plugin/testdata/random-flakey-stream', + label: 'plugin/testdata/random-flakey-stream', + }, + ]; let currentChannel = channels.find((c) => c.value === channel); if (channel && !currentChannel) { currentChannel = { @@ -62,42 +78,33 @@ export class QueryEditor extends PureComponent { channels.push(currentChannel); } - if (!measurements) { - measurements = {}; - } - const names: Array> = [ - { value: '', label: 'All measurements', description: 'Show every measurement streamed to this channel' }, - ]; - - let info: LiveMeasurements | undefined = undefined; - if (channel) { - info = getLiveMeasurements({ - scope: LiveChannelScope.Grafana, - namespace: 'measurements', - path: channel, - }); - - let foundName = false; - if (info) { - for (const name of info.getKeys()) { - names.push({ - value: name, - label: name, - }); - if (name === measurements.key) { - foundName = true; + const distinctFields = new Set(); + const fields: Array> = []; + if (data && data.series?.length) { + for (const frame of data.series) { + for (const field of frame.fields) { + if (distinctFields.has(field.name) || !field.name) { + continue; } + fields.push({ + value: field.name, + label: field.name, + description: `(${getFrameDisplayName(frame)} / ${field.type})`, + }); + distinctFields.add(field.name); } - } else { - console.log('NO INFO for', channel); } - - if (measurements.key && !foundName) { - names.push({ - label: measurements.key, - value: measurements.key, - description: `Frames with key ${measurements.key}`, - }); + } + if (filter?.fields) { + for (const f of filter.fields) { + if (!distinctFields.has(f)) { + fields.push({ + value: f, + label: `${f} (not loaded)`, + description: `Configured, but not found in the query results`, + }); + distinctFields.add(f); + } } } @@ -120,18 +127,19 @@ export class QueryEditor extends PureComponent { {channel && (
- +