From 2aafa3987960952cd558fc727978646f59faacc9 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Thu, 22 Oct 2020 00:10:26 -0700 Subject: [PATCH] Live: support real time measurements (alpha) (#28022) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * improve reduce transformer * add measurment classes * sync with new grafana measure format * use address for live * use plural in URL * set the field name * fix build * find changes * POST http to channel * Yarn: Update lock file (#28014) * Loki: Run instant query only in Explore (#27974) * Run instant query only in Explore * Replace forEach with for loop * don't cast * Docs: Fixed row display in table (#28031) * Plugins: Let descendant plugins inherit their root's signature (#27970) * plugins: Let descendant plugins inherit their root's signature Signed-off-by: Arve Knudsen * Registry: Fix service shutdown mode trigger location (#28025) * Add Alex Khomenko as member (#28032) * show history * fix confirm * fix confirm * add tests * fix lint * add more errors * set values * remove unrelated changes * unrelated changes * Update pkg/models/live.go Co-authored-by: Arve Knudsen * Update pkg/models/live.go Co-authored-by: Arve Knudsen * Update pkg/services/live/live.go Co-authored-by: Arve Knudsen * Update pkg/services/live/pluginHandler.go Co-authored-by: Arve Knudsen * Update pkg/services/live/pluginHandler.go Co-authored-by: Arve Knudsen * Update pkg/services/live/pluginHandler.go Co-authored-by: Arve Knudsen * use measurments for testdata endpoints * add live to testdata * add live to testdata * Update pkg/services/live/channel.go Co-authored-by: Arve Knudsen * Apply suggestions from code review Co-authored-by: Arve Knudsen * update comment formats * uprevert testdata * Apply suggestions from code review Co-authored-by: Will Browne Co-authored-by: Ryan McKinley Co-authored-by: Hugo Häggmark * Apply suggestions from code review * CloudWatch: Add EC2CapacityReservations Namespace (#28309) * API: Fix short URLs (#28300) * API: Fix short URLs Signed-off-by: Arve Knudsen * Chore: Add cloud-middleware as code owners (#28310) Signed-off-by: Arve Knudsen * SQLStore: Run tests as integration tests (#28265) * sqlstore: Run tests as integration tests * Truncate database instead of re-creating it on each test * Fix test description See https://github.com/grafana/grafana/pull/12129 * Fix lint issues * Fix postgres dialect after review suggestion * Rename and document functions after review suggestion * Add periods * Fix auto-increment value for mysql dialect Co-authored-by: Emil Tullstedt * Drone: Fix grafana-mixin linting (#28308) * Drone: Fix Starlark script Signed-off-by: Arve Knudsen * grafana-mixin: Move build logic to scripts Signed-off-by: Arve Knudsen * Drone: Use mixin scripts Signed-off-by: Arve Knudsen * CI build image: Install jsonnetfmt and mixtool Signed-off-by: Arve Knudsen * Makefile: Print commands Signed-off-by: Arve Knudsen * should only ignore the file in the grafana mixin root folder (#28306) Signed-off-by: bergquist * fix: for graph size not taking up full height or width * Graph NG: fix toggling queries and extract Graph component from graph3 panel (#28290) * Fix issue when data and config is not in sync * Extract GraphNG component from graph panel and add some tests coverage * Update packages/grafana-ui/src/components/uPlot/hooks.test.ts * Update packages/grafana-ui/src/components/uPlot/hooks.test.ts * Update packages/grafana-ui/src/components/uPlot/hooks.test.ts * Fix grid color and annotations refresh * Drone: Use ${DRONE_TAG} in release pipelines, since it should work (#28299) Signed-off-by: Arve Knudsen * Explore: respect min_refresh_interval (#27988) * Explore: respect min_refresh_interval Fixes #27494 * fixup! Explore: respect min_refresh_interval * fixup! Explore: respect min_refresh_interval * UI: export defaultIntervals from refresh picker * fixup! Explore: respect min_refresh_interval Co-authored-by: Zoltán Bedi * Loki: Base maxDataPoints limits on query type (#28298) * Base maxLines and maxDataPoints based on query type * Allow overriding the limit to higher value * Bump tree-kill from 1.2.1 to 1.2.2 (#27405) Bumps [tree-kill](https://github.com/pkrumins/node-tree-kill) from 1.2.1 to 1.2.2. - [Release notes](https://github.com/pkrumins/node-tree-kill/releases) - [Commits](https://github.com/pkrumins/node-tree-kill/compare/v1.2.1...v1.2.2) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump handlebars from 4.4.3 to 4.7.6 (#27416) Bumps [handlebars](https://github.com/wycats/handlebars.js) from 4.4.3 to 4.7.6. - [Release notes](https://github.com/wycats/handlebars.js/releases) - [Changelog](https://github.com/handlebars-lang/handlebars.js/blob/master/release-notes.md) - [Commits](https://github.com/wycats/handlebars.js/compare/v4.4.3...v4.7.6) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Build(deps): Bump http-proxy from 1.18.0 to 1.18.1 (#27507) Bumps [http-proxy](https://github.com/http-party/node-http-proxy) from 1.18.0 to 1.18.1. - [Release notes](https://github.com/http-party/node-http-proxy/releases) - [Changelog](https://github.com/http-party/node-http-proxy/blob/master/CHANGELOG.md) - [Commits](https://github.com/http-party/node-http-proxy/compare/1.18.0...1.18.1) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Automation: Add backport github action (#28318) * BackendSrv: Fixes queue countdown when unsubscribe is before response (#28323) * GraphNG: Use AxisSide enum (#28320) * IssueTriage: Needs more info automation and messages (#28137) * IssueTriage: Needs more info automation and messages * Updated * Updated * Updated wording * SAML: IdP-initiated SSO docs (#28280) * SAML: IdP-initiated SSO docs * Update docs/sources/enterprise/saml.md Co-authored-by: Emil Tullstedt * Apply suggestions from code review Co-authored-by: Emil Tullstedt Co-authored-by: Emil Tullstedt * Loki: Run instant query only when doing metric query (#28325) * Run instant query only when doing metric query * Update public/app/plugins/datasource/loki/datasource.ts Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> * Automation: Tweaks to more info message (#28332) * AlertingNG: remove warn/crit from eval prototype (#28334) and misc cleanup * area/grafana/toolkit: update e2e docker image (#28335) * add xvfb to image * comment out toolkit inclusion * add latest tag * update packages for cypress * cleanup script * Update auth-proxy.md (#28339) Fix a minor grammar mistake: 'handling' to 'handle'. * Git: Create .gitattributes for windows line endings (#28340) With this set, Windows users will have text files converted from Windows style line endings (\r\n) to Unix style line endings (\n) when they’re added to the repository. https://www.edwardthomson.com/blog/git_for_windows_line_endings.html * Docs: Add docs for valuepicker (#28327) * Templating: Replace all '$tag' in tag values query (#28343) * Docs: Add missing records from grafana-ui 7.2.1 CHANGELOG (#28302) * Dashboard links: Places drop down list so it's always visible (#28330) * calculating whether to place the list on the right or left edge of the parent * change naming and add import of createRef * Automation: Update backport github action trigger (#28352) It seems like GitHub has solved the problem of running github actions on PRs from forks with access to secrets. https://github.blog/2020-08-03-github-actions-improvements-for-fork-and-pull-request-workflows/#improvements-for-public-repository-forks If I change the event that triggers it to pull_request_target the action is run in the context of the base instead of the merged PR branch * ColorSchemes: Adds more color schemes and text colors that depend on the background (#28305) * Adding more color modes and text colors that depend on the background color * Updates * Updated * Another big value fix * Fixing unit tests * Updated * Updated test * Update * Updated * Updated * Updated * Updated * Added new demo dashboard * Updated * updated * Updated * Updateed * added beta notice * Fixed e2e test * Fix typos Signed-off-by: Arve Knudsen * revert pseduo code * apply feedback * remove HTTP for now * fix backend test * change to datasource * clear input for streams * fix docs? * consistent measure vs measurements * better jsdocs * fix a few jsdoc errors * fix comment style * Remove commented out code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Clean up code Signed-off-by: Arve Knudsen * Update pkg/models/live.go Co-authored-by: Arve Knudsen * Fix build Signed-off-by: Arve Knudsen * set the stringField Co-authored-by: Torkel Ödegaard Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Co-authored-by: ozhuang Co-authored-by: Arve Knudsen Co-authored-by: Amos Law Co-authored-by: Alex Khomenko Co-authored-by: Will Browne Co-authored-by: Hugo Häggmark Co-authored-by: The Rock Guy Co-authored-by: Sofia Papagiannaki Co-authored-by: Emil Tullstedt Co-authored-by: Carl Bergquist Co-authored-by: Jack Westbrook Co-authored-by: Dominik Prokop Co-authored-by: Elliot Pryde Co-authored-by: Zoltán Bedi Co-authored-by: Andrej Ocenas Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Alexander Zobnin Co-authored-by: Kyle Brandt Co-authored-by: Brian Gann Co-authored-by: J-F-Far Co-authored-by: acoder77 <73009264+acoder77@users.noreply.github.com> Co-authored-by: Peter Holmberg Co-authored-by: Krzysztof Dąbrowski Co-authored-by: maknik --- .../transformations/transformers/concat.ts | 4 +- packages/grafana-data/src/types/live.ts | 11 +- packages/grafana-runtime/src/index.ts | 1 + .../src/measurement/collector.test.ts | 250 ++++++++++++++++++ .../src/measurement/collector.ts | 209 +++++++++++++++ .../grafana-runtime/src/measurement/index.ts | 3 + .../grafana-runtime/src/measurement/query.ts | 77 ++++++ .../grafana-runtime/src/measurement/types.ts | 73 +++++ .../src/services/LocationSrv.ts | 3 + packages/grafana-runtime/src/services/live.ts | 2 +- pkg/models/live.go | 9 +- pkg/models/measurement.go | 50 ++++ pkg/services/live/channel.go | 53 ++-- pkg/services/live/channel_test.go | 24 +- pkg/services/live/features/broadcast.go | 15 +- pkg/services/live/features/dashboard.go | 11 +- pkg/services/live/features/measurements.go | 41 +++ pkg/services/live/features/testdata.go | 78 +++--- pkg/services/live/live.go | 61 +++-- pkg/tsdb/testdatasource/scenarios.go | 9 + .../live/dashboard/dashboardWatcher.ts | 1 - public/app/features/live/features.ts | 18 +- .../live/measurements/measurementsSupport.ts | 40 +++ .../grafana/components/QueryEditor.tsx | 149 ++++++++++- .../datasource/grafana/datasource.test.ts | 13 +- .../plugins/datasource/grafana/datasource.ts | 102 ++++--- .../app/plugins/datasource/grafana/types.ts | 6 +- .../datasource/testdata/QueryEditor.tsx | 24 +- .../testdata/components/GrafanaLiveEditor.tsx | 37 +++ .../plugins/datasource/testdata/datasource.ts | 31 ++- .../app/plugins/datasource/testdata/types.ts | 1 + .../plugins/panel/live/LiveChannelEditor.tsx | 33 ++- public/app/plugins/panel/live/LivePanel.tsx | 211 +++++++++++++-- public/app/plugins/panel/live/module.tsx | 25 +- public/app/plugins/panel/live/types.ts | 10 + 35 files changed, 1471 insertions(+), 214 deletions(-) create mode 100644 packages/grafana-runtime/src/measurement/collector.test.ts create mode 100644 packages/grafana-runtime/src/measurement/collector.ts create mode 100644 packages/grafana-runtime/src/measurement/index.ts create mode 100644 packages/grafana-runtime/src/measurement/query.ts create mode 100644 packages/grafana-runtime/src/measurement/types.ts create mode 100644 pkg/models/measurement.go create mode 100644 pkg/services/live/features/measurements.go create mode 100644 public/app/features/live/measurements/measurementsSupport.ts create mode 100644 public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx diff --git a/packages/grafana-data/src/transformations/transformers/concat.ts b/packages/grafana-data/src/transformations/transformers/concat.ts index f6ede040060..09c5bf8be30 100644 --- a/packages/grafana-data/src/transformations/transformers/concat.ts +++ b/packages/grafana-data/src/transformations/transformers/concat.ts @@ -2,7 +2,7 @@ import { map } from 'rxjs/operators'; import { DataTransformerID } from './ids'; import { DataTransformerInfo } from '../../types/transformations'; -import { DataFrame, Field } from '../../types/dataFrame'; +import { DataFrame, Field, TIME_SERIES_VALUE_FIELD_NAME } from '../../types/dataFrame'; import { ArrayVector } from '../../vector'; export enum ConcatenateFrameNameMode { @@ -73,7 +73,7 @@ export function concatenateFields(data: DataFrame[], opts: ConcatenateTransforme } else if (opts.frameNameMode === ConcatenateFrameNameMode.Label) { copy.labels = { ...f.labels }; copy.labels[frameNameLabel] = frame.name; - } else if (!copy.name || copy.name === 'Value') { + } else if (!copy.name || copy.name === TIME_SERIES_VALUE_FIELD_NAME) { copy.name = frame.name; } else { copy.name = `${frame.name} · ${f.name}`; diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index e64d92c2878..999be33591e 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -1,4 +1,3 @@ -import { SelectableValue } from './select'; import { Observable } from 'rxjs'; /** @@ -17,7 +16,7 @@ export enum LiveChannelScope { /** * @alpha -- experimental */ -export interface LiveChannelConfig { +export interface LiveChannelConfig { /** * The path definition. either static, or it may contain variables identifed with {varname} */ @@ -28,11 +27,6 @@ export interface LiveChannelConfig { */ description?: string; - /** - * When variables exist, this list will identify each one - */ - variables?: Array>; - /** * The channel keeps track of who else is connected to the same channel */ @@ -46,6 +40,9 @@ export interface LiveChannelConfig { /** convert the raw stream message into a message that should be broadcast */ processMessage?: (msg: any) => TMessage; + + /** some channels are managed by an explicit interface */ + getController?: () => TController; } export enum LiveChannelConnectionState { diff --git a/packages/grafana-runtime/src/index.ts b/packages/grafana-runtime/src/index.ts index 5f1438c7fea..22d865593cb 100644 --- a/packages/grafana-runtime/src/index.ts +++ b/packages/grafana-runtime/src/index.ts @@ -6,6 +6,7 @@ export * from './services'; export * from './config'; export * from './types'; +export * from './measurement'; export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin'; export { reportMetaAnalytics } from './utils/analytics'; export { DataSourceWithBackend, HealthCheckResult, HealthStatus } from './utils/DataSourceWithBackend'; diff --git a/packages/grafana-runtime/src/measurement/collector.test.ts b/packages/grafana-runtime/src/measurement/collector.test.ts new file mode 100644 index 00000000000..3800e8c520d --- /dev/null +++ b/packages/grafana-runtime/src/measurement/collector.test.ts @@ -0,0 +1,250 @@ +import { MeasurementCollector } from './collector'; +import { MeasurementAction } from './types'; + +describe('MeasurementCollector', () => { + it('should collect values', () => { + const collector = new MeasurementCollector(); + collector.addBatch({ + measurements: [ + { + name: 'test', + labels: { host: 'a' }, + time: 100, + values: { + f0: 0, + f1: 1, + f2: 'hello', + }, + }, + { + name: 'test', + labels: { host: 'b' }, + time: 101, + values: { + f0: 0, + f1: 1, + f2: 'hello', + }, + config: { + f2: { + unit: 'mph', + }, + }, + }, + { + name: 'test', + time: 102, + labels: { host: 'a' }, // should append to first value + values: { + // note the missing values for f0/1 + f2: 'world', + }, + }, + ], + }); + + const frames = collector.getData(); + expect(frames.length).toEqual(2); + expect(frames[0]).toMatchInlineSnapshot(` + Object { + "fields": Array [ + Object { + "config": Object {}, + "labels": undefined, + "name": "time", + "type": "time", + "values": Array [ + 100, + 102, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f0", + "type": "number", + "values": Array [ + 0, + null, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f1", + "type": "number", + "values": Array [ + 1, + null, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f2", + "type": "string", + "values": Array [ + "hello", + "world", + ], + }, + ], + "meta": Object { + "custom": Object { + "labels": Object { + "host": "a", + }, + }, + }, + "name": "test", + "refId": undefined, + } + `); + expect(frames[1]).toMatchInlineSnapshot(` + Object { + "fields": Array [ + Object { + "config": Object {}, + "labels": undefined, + "name": "time", + "type": "time", + "values": Array [ + 101, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "b", + }, + "name": "f0", + "type": "number", + "values": Array [ + 0, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "b", + }, + "name": "f1", + "type": "number", + "values": Array [ + 1, + ], + }, + Object { + "config": Object { + "unit": "mph", + }, + "labels": Object { + "host": "b", + }, + "name": "f2", + "type": "string", + "values": Array [ + "hello", + ], + }, + ], + "meta": Object { + "custom": Object { + "labels": Object { + "host": "b", + }, + }, + }, + "name": "test", + "refId": undefined, + } + `); + + collector.addBatch({ + action: MeasurementAction.Replace, + measurements: [ + { + name: 'test', + time: 105, + labels: { host: 'a' }, + values: { + f1: 10, + }, + }, + ], + }); + + const frames2 = collector.getData(); + expect(frames2.length).toEqual(2); + expect(frames2[0].length).toEqual(1); // not three! + expect(frames2[0]).toMatchInlineSnapshot(` + Object { + "fields": Array [ + Object { + "config": Object {}, + "labels": undefined, + "name": "time", + "type": "time", + "values": Array [ + 105, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f0", + "type": "number", + "values": Array [ + null, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f1", + "type": "number", + "values": Array [ + 10, + ], + }, + Object { + "config": Object {}, + "labels": Object { + "host": "a", + }, + "name": "f2", + "type": "string", + "values": Array [ + null, + ], + }, + ], + "meta": Object { + "custom": Object { + "labels": Object { + "host": "a", + }, + }, + }, + "name": "test", + "refId": undefined, + } + `); + + collector.addBatch({ + action: MeasurementAction.Clear, + measurements: [], + }); + expect(collector.getData().length).toEqual(0); + }); +}); diff --git a/packages/grafana-runtime/src/measurement/collector.ts b/packages/grafana-runtime/src/measurement/collector.ts new file mode 100644 index 00000000000..4bc34af7b57 --- /dev/null +++ b/packages/grafana-runtime/src/measurement/collector.ts @@ -0,0 +1,209 @@ +import { + CircularDataFrame, + Labels, + formatLabels, + FieldType, + DataFrame, + matchAllLabels, + parseLabels, + CircularVector, + ArrayVector, +} from '@grafana/data'; +import { Measurement, MeasurementBatch, LiveMeasurements, MeasurementsQuery, MeasurementAction } from './types'; + +interface MeasurementCacheConfig { + append?: 'head' | 'tail'; + capacity?: number; +} + +/** This is a cache scoped to a the measurement name + * + * @alpha -- experimental + */ +export class MeasurementCache { + readonly frames: Record = {}; // key is the labels + + constructor(public name: string, private config: MeasurementCacheConfig) { + if (!this.config) { + this.config = { + append: 'tail', + capacity: 600, // Default capacity 10min @ 1hz + }; + } + } + + getFrames(match?: Labels): DataFrame[] { + const frames = Object.values(this.frames); + if (!match) { + return frames; + } + return frames.filter(f => { + return matchAllLabels(match, f.meta?.custom?.labels); + }); + } + + addMeasurement(m: Measurement, action: MeasurementAction): DataFrame { + const key = m.labels ? formatLabels(m.labels) : ''; + let frame = this.frames[key]; + if (!frame) { + frame = new CircularDataFrame(this.config); + frame.name = this.name; + frame.addField({ + name: 'time', + type: FieldType.time, + }); + for (const [key, value] of Object.entries(m.values)) { + frame.addFieldFor(value, key).labels = m.labels; + } + frame.meta = { + custom: { + labels: m.labels, + }, + }; + this.frames[key] = frame; + } + + // Clear existing values + if (action === MeasurementAction.Replace) { + for (const field of frame.fields) { + (field.values as ArrayVector).buffer.length = 0; // same buffer, but reset to empty length + } + } + + // Add the timestamp + frame.values['time'].add(m.time || Date.now()); + + // Attach field config to the current fields + if (m.config) { + for (const [key, value] of Object.entries(m.config)) { + const f = frame.fields.find(f => f.name === key); + if (f) { + f.config = value; + } + } + } + + // Append all values (a row) + for (const [key, value] of Object.entries(m.values)) { + let v = frame.values[key]; + if (!v) { + const f = frame.addFieldFor(value, key); + f.labels = m.labels; + v = f.values; + } + v.add(value); + } + + // Make sure all fields have the same length + frame.validate(); + return frame; + } +} + +/** + * @alpha -- experimental + */ +export class MeasurementCollector implements LiveMeasurements { + measurements = new Map(); + config: MeasurementCacheConfig = { + append: 'tail', + capacity: 600, // Default capacity 10min @ 1hz + }; + + //------------------------------------------------------ + // Public + //------------------------------------------------------ + + getData(query?: MeasurementsQuery): DataFrame[] { + const { name, labels, fields } = query || {}; + + let data: DataFrame[] = []; + if (name) { + // for now we only match exact names + const m = this.measurements.get(name); + if (m) { + data = m.getFrames(labels); + } + } else { + for (const f of this.measurements.values()) { + data.push.apply(data, f.getFrames(labels)); + } + } + + if (fields && fields.length) { + let filtered: DataFrame[] = []; + for (const frame of data) { + const match = frame.fields.filter(f => fields.includes(f.name)); + if (match.length > 0) { + filtered.push({ ...frame, fields: match }); // Copy the frame with fewer fields + } + } + } + return data; + } + + getDistinctNames(): string[] { + return Object.keys(this.measurements); + } + + getDistinctLabels(name: string): Labels[] { + const m = this.measurements.get(name); + if (m) { + return Object.keys(m.frames).map(k => parseLabels(k)); + } + return []; + } + + setCapacity(size: number) { + this.config.capacity = size; + + // Now update all the circular buffers + for (const wrap of this.measurements.values()) { + for (const frame of Object.values(wrap.frames)) { + for (const field of frame.fields) { + (field.values as CircularVector).setCapacity(size); + } + } + } + } + + getCapacity() { + return this.config.capacity!; + } + + clear() { + this.measurements.clear(); + } + + //------------------------------------------------------ + // Collector + //------------------------------------------------------ + + addBatch = (batch: MeasurementBatch) => { + let action = batch.action ?? MeasurementAction.Append; + if (action === MeasurementAction.Clear) { + this.measurements.clear(); + action = MeasurementAction.Append; + } + + // Change the local buffer size + if (batch.capacity && batch.capacity !== this.config.capacity) { + this.setCapacity(batch.capacity); + } + + for (const measure of batch.measurements) { + const name = measure.name || ''; + let m = this.measurements.get(name); + if (!m) { + m = new MeasurementCache(name, this.config); + this.measurements.set(name, m); + } + if (measure.values) { + m.addMeasurement(measure, action); + } else { + console.log('invalid measurement', measure); + } + } + return this; + }; +} diff --git a/packages/grafana-runtime/src/measurement/index.ts b/packages/grafana-runtime/src/measurement/index.ts new file mode 100644 index 00000000000..56180b6f43b --- /dev/null +++ b/packages/grafana-runtime/src/measurement/index.ts @@ -0,0 +1,3 @@ +export * from './types'; +export * from './collector'; +export * from './query'; diff --git a/packages/grafana-runtime/src/measurement/query.ts b/packages/grafana-runtime/src/measurement/query.ts new file mode 100644 index 00000000000..51610251ae2 --- /dev/null +++ b/packages/grafana-runtime/src/measurement/query.ts @@ -0,0 +1,77 @@ +import { + DataQueryResponse, + isLiveChannelMessageEvent, + isLiveChannelStatusEvent, + isValidLiveChannelAddress, + LiveChannelAddress, + LoadingState, +} from '@grafana/data'; +import { LiveMeasurements, MeasurementsQuery } from './types'; +import { getGrafanaLiveSrv } from '../services/live'; + +import { Observable, of } from 'rxjs'; +import { map } from 'rxjs/operators'; + +/** + * @alpha -- experimental + */ +export function getLiveMeasurements(addr: LiveChannelAddress): LiveMeasurements | undefined { + if (!isValidLiveChannelAddress(addr)) { + return undefined; + } + + const live = getGrafanaLiveSrv(); + if (!live) { + return undefined; + } + + const channel = live.getChannel(addr); + const getController = channel?.config?.getController; + return getController ? getController() : undefined; +} + +/** + * When you know the stream will be managed measurements + * + * @alpha -- experimental + */ +export function getLiveMeasurementsObserver( + addr: LiveChannelAddress, + requestId: string, + query?: MeasurementsQuery +): Observable { + const rsp: DataQueryResponse = { data: [] }; + if (!addr || !addr.path) { + return of(rsp); // Address not configured yet + } + + const live = getGrafanaLiveSrv(); + if (!live) { + // This will only happen with the feature flag is not enabled + rsp.error = { message: 'Grafana live is not initalized' }; + return of(rsp); + } + + rsp.key = requestId; + return live + .getChannel(addr) + .getStream() + .pipe( + map(evt => { + if (isLiveChannelMessageEvent(evt)) { + rsp.data = evt.message.getData(query); + if (!rsp.data.length) { + // ?? skip when data is empty ??? + } + delete rsp.error; + rsp.state = LoadingState.Streaming; + } else if (isLiveChannelStatusEvent(evt)) { + if (evt.error != null) { + rsp.error = rsp.error; + rsp.state = LoadingState.Error; + } + } + return { ...rsp }; // send event on all status messages + }) + ); +} diff --git a/packages/grafana-runtime/src/measurement/types.ts b/packages/grafana-runtime/src/measurement/types.ts new file mode 100644 index 00000000000..687e4947346 --- /dev/null +++ b/packages/grafana-runtime/src/measurement/types.ts @@ -0,0 +1,73 @@ +import { DataFrame, Labels, FieldConfig } from '@grafana/data'; + +/** + * the raw channel events are batches of Measurements + * + * @alpha -- experimental + */ +export interface Measurement { + name: string; + time?: number; // Missing will use the browser time + values: Record; + config?: Record; + labels?: Labels; +} + +/** + * @alpha -- experimental + */ +export enum MeasurementAction { + /** The measurements will be added to the client buffer */ + Append = 'append', + + /** The measurements will replace the client buffer */ + Replace = 'replace', + + /** All measurements will be removed from the client buffer before processing */ + Clear = 'clear', +} + +/** + * List of Measurements sent in a batch + * + * @alpha -- experimental + */ +export interface MeasurementBatch { + /** + * The default action is to append values to the client buffer + */ + action?: MeasurementAction; + + /** + * List of measurements to process + */ + measurements: Measurement[]; + + /** + * This will set the capacity on the client buffer for everything + * in the measurement channel + */ + capacity?: number; +} + +/** + * @alpha -- experimental + */ +export interface MeasurementsQuery { + name?: string; + labels?: Labels; + fields?: string[]; // only include the fields with these names +} + +/** + * Channels that receive Measurements can collect them into frames + * + * @alpha -- experimental + */ +export interface LiveMeasurements { + getData(query?: MeasurementsQuery): DataFrame[]; + getDistinctNames(): string[]; + getDistinctLabels(name: string): Labels[]; + setCapacity(size: number): void; + getCapacity(): number; +} diff --git a/packages/grafana-runtime/src/services/LocationSrv.ts b/packages/grafana-runtime/src/services/LocationSrv.ts index 37c0ee0818b..c1cdd03144c 100644 --- a/packages/grafana-runtime/src/services/LocationSrv.ts +++ b/packages/grafana-runtime/src/services/LocationSrv.ts @@ -6,6 +6,9 @@ */ import { UrlQueryMap } from '@grafana/data'; +/** + * @public + */ export interface LocationUpdate { /** * Target path where you automatically wants to navigate the user. diff --git a/packages/grafana-runtime/src/services/live.ts b/packages/grafana-runtime/src/services/live.ts index c57a4a1777f..db01d7ab0ed 100644 --- a/packages/grafana-runtime/src/services/live.ts +++ b/packages/grafana-runtime/src/services/live.ts @@ -39,7 +39,7 @@ export const setGrafanaLiveSrv = (instance: GrafanaLiveSrv) => { }; /** - * Used to retrieve the {@link GrafanaLiveSrv} that allows you to subscribe to + * Used to retrieve the GrafanaLiveSrv that allows you to subscribe to * server side events and streams * * @alpha -- experimental diff --git a/pkg/models/live.go b/pkg/models/live.go index 9042968b56e..c42f15ce7b5 100644 --- a/pkg/models/live.go +++ b/pkg/models/live.go @@ -2,7 +2,7 @@ package models import "github.com/centrifugal/centrifuge" -// ChannelPublisher writes data into a channel +// ChannelPublisher writes data into a channel. Note that pemissions are not checked. type ChannelPublisher func(channel string, data []byte) error // ChannelHandler defines the core channel behavior @@ -17,9 +17,10 @@ type ChannelHandler interface { OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) } -// ChannelHandlerProvider -- this should be implemented by any core feature -type ChannelHandlerProvider interface { - // This is called fast and often -- it must be synchrnozed +// ChannelHandlerFactory should be implemented by all core features. +type ChannelHandlerFactory interface { + // GetHandlerForPath gets a ChannelHandler for a path. + // This is called fast and often -- it must be synchronized GetHandlerForPath(path string) (ChannelHandler, error) } diff --git a/pkg/models/measurement.go b/pkg/models/measurement.go new file mode 100644 index 00000000000..1b381005d80 --- /dev/null +++ b/pkg/models/measurement.go @@ -0,0 +1,50 @@ +package models + +import "github.com/grafana/grafana-plugin-sdk-go/data" + +// NOTE: +// this likely should go in the Plugin SDK since it will be useful from plugins + +// Measurement is a single measurement value. +type Measurement struct { + // Name of the measurement. + Name string `json:"name,omitempty"` + + // Time is the measurement time. Units are usually ms, but depends on the channel + Time int64 `json:"time,omitempty"` + + // Values is the measurement's values. The value type is typically number or string. + Values map[string]interface{} `json:"values,omitempty"` + + // Config is an optional list of field configs. + Config map[string]data.FieldConfig `json:"config,omitempty"` + + // Labels are applied to all values. + Labels map[string]string `json:"labels,omitempty"` +} + +// MeasurementAction defines what should happen when you send a list of measurements. +type MeasurementAction string + +const ( + // MeasurementActionAppend means new values should be added to a client buffer. This is the default action + MeasurementActionAppend MeasurementAction = "append" + + // MeasurementActionReplace means new values should replace any existing values. + MeasurementActionReplace MeasurementAction = "replace" + + // MeasurementActionClear means all existing values should be remoed before adding. + MeasurementActionClear MeasurementAction = "clear" +) + +// MeasurementBatch is a collection of measurements all sent at once. +type MeasurementBatch struct { + // Action is the action in question, the default is append. + Action MeasurementAction `json:"action,omitempty"` + + // Measurements is the array of measurements. + Measurements []Measurement `json:"measurements,omitempty"` + + // Capacity is the suggested size of the client buffer + Capacity int64 `json:"capacity,omitempty"` +} diff --git a/pkg/services/live/channel.go b/pkg/services/live/channel.go index c901438e456..f3379777c8c 100644 --- a/pkg/services/live/channel.go +++ b/pkg/services/live/channel.go @@ -1,27 +1,48 @@ package live import ( - "fmt" "strings" ) -// ChannelIdentifier is the channel id split by parts -type ChannelIdentifier struct { - Scope string // grafana, ds, or plugin - Namespace string // feature, id, or name - Path string // path within the channel handler +// ChannelAddress is the channel ID split by parts. +type ChannelAddress struct { + // Scope is "grafana", "ds", or "plugin". + Scope string `json:"scope,omitempty"` + + // Namespace meaning depends on the scope. + // * when grafana, namespace is a "feature" + // * when ds, namespace is the datasource id + // * when plugin, namespace is the plugin name + Namespace string `json:"namespace,omitempty"` + + // Within each namespace, the handler can process the path as needed. + Path string `json:"path,omitempty"` } -// ParseChannelIdentifier parses the parts from a channel id: -// ${scope} / ${namespace} / ${path} -func ParseChannelIdentifier(id string) (ChannelIdentifier, error) { +// ParseChannelAddress parses the parts from a channel ID: +// ${scope} / ${namespace} / ${path}. +func ParseChannelAddress(id string) ChannelAddress { + addr := ChannelAddress{} parts := strings.SplitN(id, "/", 3) - if len(parts) == 3 { - return ChannelIdentifier{ - Scope: parts[0], - Namespace: parts[1], - Path: parts[2], - }, nil + length := len(parts) + if length > 0 { + addr.Scope = parts[0] } - return ChannelIdentifier{}, fmt.Errorf("Invalid channel id: %s", id) + if length > 1 { + addr.Namespace = parts[1] + } + if length > 2 { + addr.Path = parts[2] + } + return addr +} + +// IsValid checks if all parts of the address are valid. +func (ca *ChannelAddress) IsValid() bool { + return ca.Scope != "" && ca.Namespace != "" && ca.Path != "" +} + +// ToChannelID converts this to a single string. +func (ca *ChannelAddress) ToChannelID() string { + return ca.Scope + "/" + ca.Namespace + "/" + ca.Path } diff --git a/pkg/services/live/channel_test.go b/pkg/services/live/channel_test.go index 7d6600ddd82..2b73148d135 100644 --- a/pkg/services/live/channel_test.go +++ b/pkg/services/live/channel_test.go @@ -4,27 +4,25 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" ) -func TestParseChannelIdentifier(t *testing.T) { - ident, err := ParseChannelIdentifier("aaa/bbb/ccc/ddd") - if err != nil { - t.FailNow() - } +func TestParseChannelAddress_Valid(t *testing.T) { + addr := ParseChannelAddress("aaa/bbb/ccc/ddd") + require.True(t, addr.IsValid()) - ex := ChannelIdentifier{ + ex := ChannelAddress{ Scope: "aaa", Namespace: "bbb", Path: "ccc/ddd", } - if diff := cmp.Diff(ident, ex); diff != "" { + if diff := cmp.Diff(addr, ex); diff != "" { t.Fatalf("Result mismatch (-want +got):\n%s", diff) } - - // Check an invalid identifier - _, err = ParseChannelIdentifier("aaa/bbb") - if err == nil { - t.FailNow() - } +} + +func TestParseChannelAddress_Invalid(t *testing.T) { + addr := ParseChannelAddress("aaa/bbb") + require.False(t, addr.IsValid()) } diff --git a/pkg/services/live/features/broadcast.go b/pkg/services/live/features/broadcast.go index 7359b0e6947..93f0c8ba37c 100644 --- a/pkg/services/live/features/broadcast.go +++ b/pkg/services/live/features/broadcast.go @@ -6,27 +6,28 @@ import ( ) // BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels -// This makes no assumptions about the shape of the data and will broadcast it to anyone listening -type BroadcastRunner struct{} +// This assumes that data is a JSON object +type BroadcastRunner struct { +} // GetHandlerForPath called on init -func (g *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { - return g, nil // for now all channels share config +func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return b, nil // for now all channels share config } // GetChannelOptions called fast and often -func (g *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { +func (b *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { return centrifuge.ChannelOptions{} } // OnSubscribe for now allows anyone to subscribe to any dashboard -func (g *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { +func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { // anyone can subscribe return nil } // OnPublish called when an event is received from the websocket -func (g *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { +func (b *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { // expect the data to be the right shape? return e.Data, nil } diff --git a/pkg/services/live/features/dashboard.go b/pkg/services/live/features/dashboard.go index 819b6664030..569f6279717 100644 --- a/pkg/services/live/features/dashboard.go +++ b/pkg/services/live/features/dashboard.go @@ -17,14 +17,7 @@ type dashboardEvent struct { // DashboardHandler manages all the `grafana/dashboard/*` channels type DashboardHandler struct { - publisher models.ChannelPublisher -} - -// CreateDashboardHandler Initialize a dashboard handler -func CreateDashboardHandler(p models.ChannelPublisher) DashboardHandler { - return DashboardHandler{ - publisher: p, - } + Publisher models.ChannelPublisher } // GetHandlerForPath called on init @@ -58,7 +51,7 @@ func (g *DashboardHandler) publish(event dashboardEvent) error { if err != nil { return err } - return g.publisher("grafana/dashboard/"+event.UID, msg) + return g.Publisher("grafana/dashboard/"+event.UID, msg) } // DashboardSaved will broadcast to all connected dashboards diff --git a/pkg/services/live/features/measurements.go b/pkg/services/live/features/measurements.go new file mode 100644 index 00000000000..6c78ef90353 --- /dev/null +++ b/pkg/services/live/features/measurements.go @@ -0,0 +1,41 @@ +package features + +import ( + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" +) + +var ( + logger = log.New("live.features") // scoped to all features? +) + +// MeasurementsRunner will simply broadcast all events to `grafana/broadcast/*` channels. +// This makes no assumptions about the shape of the data and will broadcast it to anyone listening +type MeasurementsRunner struct { +} + +// GetHandlerForPath gets the handler for a path. +// It's called on init. +func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return m, nil // for now all channels share config +} + +// GetChannelOptions gets channel options. +// It gets called fast and often. +func (m *MeasurementsRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { + return centrifuge.ChannelOptions{} +} + +// OnSubscribe for now allows anyone to subscribe to any dashboard. +func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { + // anyone can subscribe + return nil +} + +// OnPublish is called when an event is received from the websocket. +func (m *MeasurementsRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { + // currently generic... but should be stricter + // logger.Debug("Measurements runner got event on channel", "channel", e.Channel) + return e.Data, nil +} diff --git a/pkg/services/live/features/testdata.go b/pkg/services/live/features/testdata.go index 1b7fe527d45..fdbe393adad 100644 --- a/pkg/services/live/features/testdata.go +++ b/pkg/services/live/features/testdata.go @@ -7,48 +7,43 @@ import ( "time" "github.com/centrifugal/centrifuge" - "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" "github.com/grafana/grafana/pkg/models" ) -// TestdataRunner manages all the `grafana/dashboard/*` channels -type testdataRunner struct { +// testDataRunner manages all the `grafana/dashboard/*` channels. +type testDataRunner struct { publisher models.ChannelPublisher running bool speedMillis int dropPercent float64 channel string + name string } -// TestdataSupplier manages all the `grafana/testdata/*` channels -type TestdataSupplier struct { - publisher models.ChannelPublisher +// TestDataSupplier manages all the `grafana/testdata/*` channels. +type TestDataSupplier struct { + Publisher models.ChannelPublisher } -// CreateTestdataSupplier Initialize a dashboard handler -func CreateTestdataSupplier(p models.ChannelPublisher) TestdataSupplier { - return TestdataSupplier{ - publisher: p, - } -} - -// GetHandlerForPath called on init -func (g *TestdataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { +// GetHandlerForPath gets the channel handler for a path. +// Called on init. +func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { channel := "grafana/testdata/" + path if path == "random-2s-stream" { - return &testdataRunner{ - publisher: g.publisher, + return &testDataRunner{ + publisher: g.Publisher, running: false, speedMillis: 2000, dropPercent: 0, channel: channel, + name: path, }, nil } if path == "random-flakey-stream" { - return &testdataRunner{ - publisher: g.publisher, + return &testDataRunner{ + publisher: g.Publisher, running: false, speedMillis: 400, dropPercent: .6, @@ -59,13 +54,14 @@ func (g *TestdataSupplier) GetHandlerForPath(path string) (models.ChannelHandler return nil, fmt.Errorf("unknown channel") } -// GetChannelOptions called fast and often -func (g *testdataRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { +// GetChannelOptions gets channel options. +// Called fast and often. +func (g *testDataRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { return centrifuge.ChannelOptions{} } -// OnSubscribe for now allows anyone to subscribe to any dashboard -func (g *testdataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { +// OnSubscribe for now allows anyone to subscribe to any dashboard. +func (g *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { if !g.running { g.running = true @@ -77,26 +73,26 @@ func (g *testdataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib return nil } -// OnPublish called when an event is received from the websocket -func (g *testdataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { +// OnPublish is called when an event is received from the websocket. +func (g *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { return nil, fmt.Errorf("can not publish to testdata") } -type randomWalkMessage struct { - Time int64 - Value float64 - Min float64 - Max float64 -} - -// RunRandomCSV just for an example -func (g *testdataRunner) runRandomCSV() { +// runRandomCSV is just for an example. +func (g *testDataRunner) runRandomCSV() { spread := 50.0 walker := rand.Float64() * 100 ticker := time.NewTicker(time.Duration(g.speedMillis) * time.Millisecond) - line := randomWalkMessage{} + measurement := models.Measurement{ + Name: g.name, + Time: 0, + Values: make(map[string]interface{}, 5), + } + msg := models.MeasurementBatch{ + Measurements: []models.Measurement{measurement}, // always a single measurement + } for t := range ticker.C { if rand.Float64() <= g.dropPercent { @@ -105,12 +101,12 @@ func (g *testdataRunner) runRandomCSV() { delta := rand.Float64() - 0.5 walker += delta - line.Time = t.UnixNano() / int64(time.Millisecond) - line.Value = walker - line.Min = walker - ((rand.Float64() * spread) + 0.01) - line.Max = walker + ((rand.Float64() * spread) + 0.01) + measurement.Time = t.UnixNano() / int64(time.Millisecond) + measurement.Values["value"] = walker + measurement.Values["min"] = walker - ((rand.Float64() * spread) + 0.01) + measurement.Values["max"] = walker + ((rand.Float64() * spread) + 0.01) - bytes, err := json.Marshal(&line) + bytes, err := json.Marshal(&msg) if err != nil { logger.Warn("unable to marshal line", "error", err) continue @@ -118,7 +114,7 @@ func (g *testdataRunner) runRandomCSV() { err = g.publisher(g.channel, bytes) if err != nil { - logger.Warn("write", "channel", g.channel, "line", line) + logger.Warn("write", "channel", g.channel, "measurement", measurement) } } } diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 28e9175323d..9d9fe981f11 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -20,7 +20,7 @@ var ( // CoreGrafanaScope list of core features type CoreGrafanaScope struct { - Features map[string]models.ChannelHandlerProvider + Features map[string]models.ChannelHandlerFactory // The generic service to advertise dashboard changes Dashboards models.DashboardActivityChannel @@ -47,7 +47,7 @@ func InitializeBroker() (*GrafanaLive, error) { channels: make(map[string]models.ChannelHandler), channelsMu: sync.RWMutex{}, GrafanaScope: CoreGrafanaScope{ - Features: make(map[string]models.ChannelHandlerProvider), + Features: make(map[string]models.ChannelHandlerFactory), }, } @@ -83,13 +83,17 @@ func InitializeBroker() (*GrafanaLive, error) { glive.node = node // Initialize the main features - dash := features.CreateDashboardHandler(glive.Publish) - tds := features.CreateTestdataSupplier(glive.Publish) + dash := &features.DashboardHandler{ + Publisher: glive.Publish, + } - glive.GrafanaScope.Dashboards = &dash - glive.GrafanaScope.Features["dashboard"] = &dash - glive.GrafanaScope.Features["testdata"] = &tds + glive.GrafanaScope.Dashboards = dash + glive.GrafanaScope.Features["dashboard"] = dash + glive.GrafanaScope.Features["testdata"] = &features.TestDataSupplier{ + Publisher: glive.Publish, + } glive.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{} + glive.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{} // Set ConnectHandler called when client successfully connected to Node. Your code // inside handler must be synchronized since it will be called concurrently from @@ -232,11 +236,11 @@ func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, } // Parse the identifier ${scope}/${namespace}/${path} - id, err := ParseChannelIdentifier(channel) - if err != nil { - return nil, err + addr := ParseChannelAddress(channel) + if !addr.IsValid() { + return nil, fmt.Errorf("invalid channel: %q", channel) } - logger.Info("initChannel", "channel", channel, "id", id) + logger.Info("initChannel", "channel", channel, "address", addr) g.channelsMu.Lock() defer g.channelsMu.Unlock() @@ -245,39 +249,48 @@ func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, return c, nil } - c, err = g.initChannel(id) + getter, err := g.GetChannelHandlerFactory(addr.Scope, addr.Namespace) if err != nil { return nil, err } + + // First access will initialize + c, err = getter.GetHandlerForPath(addr.Path) + if err != nil { + return nil, err + } + g.channels[channel] = c return c, nil } -func (g *GrafanaLive) initChannel(id ChannelIdentifier) (models.ChannelHandler, error) { - if id.Scope == "grafana" { - p, ok := g.GrafanaScope.Features[id.Namespace] +// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace. +// It gives threadsafe access to the channel. +func (g *GrafanaLive) GetChannelHandlerFactory(scope string, name string) (models.ChannelHandlerFactory, error) { + if scope == "grafana" { + p, ok := g.GrafanaScope.Features[name] if ok { - return p.GetHandlerForPath(id.Path) + return p, nil } - return nil, fmt.Errorf("Unknown feature: %s", id.Namespace) + return nil, fmt.Errorf("unknown feature: %q", name) } - if id.Scope == "ds" { - return nil, fmt.Errorf("todo... look up datasource: %s", id.Namespace) + if scope == "ds" { + return nil, fmt.Errorf("todo... look up datasource: %q", name) } - if id.Scope == "plugin" { - p, ok := plugins.Plugins[id.Namespace] + if scope == "plugin" { + p, ok := plugins.Plugins[name] if ok { h := &PluginHandler{ Plugin: p, } - return h.GetHandlerForPath(id.Path) + return h, nil } - return nil, fmt.Errorf("unknown plugin: %s", id.Namespace) + return nil, fmt.Errorf("unknown plugin: %q", name) } - return nil, fmt.Errorf("invalid scope: %s", id.Scope) + return nil, fmt.Errorf("invalid scope: %q", scope) } // Publish sends the data to the channel without checking permissions etc diff --git a/pkg/tsdb/testdatasource/scenarios.go b/pkg/tsdb/testdatasource/scenarios.go index 12503168404..81da5217259 100644 --- a/pkg/tsdb/testdatasource/scenarios.go +++ b/pkg/tsdb/testdatasource/scenarios.go @@ -259,6 +259,15 @@ func init() { }, }) + registerScenario(&Scenario{ + Id: "live", + Name: "Grafana Live", + Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult { + // Real work is in javascript client + return tsdb.NewQueryResult() + }, + }) + registerScenario(&Scenario{ Id: "grafana_api", Name: "Grafana API", diff --git a/public/app/features/live/dashboard/dashboardWatcher.ts b/public/app/features/live/dashboard/dashboardWatcher.ts index 0522e0245fe..0dcac078f1d 100644 --- a/public/app/features/live/dashboard/dashboardWatcher.ts +++ b/public/app/features/live/dashboard/dashboardWatcher.ts @@ -168,7 +168,6 @@ export function getDashboardChannelsFeature(): CoreGrafanaLiveFeature { const dashboardConfig: LiveChannelConfig = { path: '${uid}', description: 'Dashboard change events', - variables: [{ value: 'uid', label: '${uid}', description: 'unique id for a dashboard' }], hasPresence: true, canPublish: () => true, }; diff --git a/public/app/features/live/features.ts b/public/app/features/live/features.ts index 29c76da5128..63de8a347be 100644 --- a/public/app/features/live/features.ts +++ b/public/app/features/live/features.ts @@ -1,16 +1,24 @@ import { LiveChannelConfig } from '@grafana/data'; +import { MeasurementCollector } from '@grafana/runtime'; import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher'; +import { LiveMeasurementsSupport } from './measurements/measurementsSupport'; import { grafanaLiveCoreFeatures } from './scopes'; export function registerLiveFeatures() { - const channels = [ + const random2s = new MeasurementCollector(); + const randomFlakey = new MeasurementCollector(); + const channels: LiveChannelConfig[] = [ { path: 'random-2s-stream', description: 'Random stream with points every 2s', + getController: () => random2s, + processMessage: random2s.addBatch, }, { path: 'random-flakey-stream', description: 'Random stream with flakey data points', + getController: () => randomFlakey, + processMessage: randomFlakey.addBatch, }, ]; @@ -39,7 +47,13 @@ export function registerLiveFeatures() { }, getSupportedPaths: () => [broadcastConfig], }, - description: 'Broadcast will send/recieve any events on a channel', + description: 'Broadcast will send/receive any JSON object in a channel', + }); + + grafanaLiveCoreFeatures.register({ + name: 'measurements', + support: new LiveMeasurementsSupport(), + description: 'These channels listen for measurements and produce DataFrames', }); // dashboard/* diff --git a/public/app/features/live/measurements/measurementsSupport.ts b/public/app/features/live/measurements/measurementsSupport.ts new file mode 100644 index 00000000000..fe090499230 --- /dev/null +++ b/public/app/features/live/measurements/measurementsSupport.ts @@ -0,0 +1,40 @@ +import { LiveChannelSupport, LiveChannelConfig } from '@grafana/data'; +import { MeasurementCollector } from '@grafana/runtime'; + +interface MeasurementChannel { + config: LiveChannelConfig; + collector: MeasurementCollector; +} + +export class LiveMeasurementsSupport implements LiveChannelSupport { + private cache: Record = {}; + + /** + * Get the channel handler for the path, or throw an error if invalid + */ + getChannelConfig(path: string): LiveChannelConfig | undefined { + let c = this.cache[path]; + if (!c) { + // Create a new cache for each path + const collector = new MeasurementCollector(); + c = this.cache[path] = { + collector, + config: { + path, + processMessage: collector.addBatch, // << this converts the stream from a single event to the whole cache + getController: () => collector, + canPublish: () => true, + }, + }; + } + return c.config; + } + + /** + * Return a list of supported channels + */ + getSupportedPaths(): LiveChannelConfig[] { + // this should ask the server what channels it has seen + return []; + } +} diff --git a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx index 6393b5038ad..de86597e5cb 100644 --- a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx +++ b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx @@ -1,13 +1,16 @@ import defaults from 'lodash/defaults'; import React, { PureComponent } from 'react'; -import { InlineField, Select } from '@grafana/ui'; -import { QueryEditorProps, SelectableValue } from '@grafana/data'; +import { InlineField, Select, FeatureInfoBox } from '@grafana/ui'; +import { QueryEditorProps, SelectableValue, LiveChannelScope, FeatureState } from '@grafana/data'; +import { getLiveMeasurements, LiveMeasurements } from '@grafana/runtime'; import { GrafanaDatasource } from '../datasource'; import { defaultQuery, GrafanaQuery, GrafanaQueryType } from '../types'; type Props = QueryEditorProps; +const labelWidth = 12; + export class QueryEditor extends PureComponent { queryTypes: Array> = [ { @@ -15,6 +18,11 @@ export class QueryEditor extends PureComponent { value: GrafanaQueryType.RandomWalk, description: 'Random signal within the selected time rage', }, + { + label: 'Live Measurements', + value: GrafanaQueryType.LiveMeasurements, + description: 'Stream real-time measurements from Grafana', + }, ]; onQueryTypeChange = (sel: SelectableValue) => { @@ -23,18 +31,137 @@ export class QueryEditor extends PureComponent { onRunQuery(); }; + onChannelChange = (sel: SelectableValue) => { + const { onChange, query, onRunQuery } = this.props; + onChange({ ...query, channel: sel?.value }); + onRunQuery(); + }; + + onMeasurementNameChanged = (sel: SelectableValue) => { + const { onChange, query, onRunQuery } = this.props; + onChange({ + ...query, + measurements: { + ...query.measurements, + name: sel?.value, + }, + }); + onRunQuery(); + }; + + renderMeasurementsQuery() { + let { channel, measurements } = this.props.query; + const channels: Array> = []; + let currentChannel = channels.find(c => c.value === channel); + if (channel && !currentChannel) { + currentChannel = { + value: channel, + label: channel, + description: `Connected to ${channel}`, + }; + channels.push(currentChannel); + } + + if (!measurements) { + measurements = {}; + } + const names: Array> = [ + { value: '', label: 'All measurements', description: 'Show every measurement streamed to this channel' }, + ]; + + let info: LiveMeasurements | undefined = undefined; + if (channel) { + info = getLiveMeasurements({ + scope: LiveChannelScope.Grafana, + namespace: 'measurements', + path: channel, + }); + + let foundName = false; + if (info) { + for (const name of info.getDistinctNames()) { + names.push({ + value: name, + label: name, + }); + if (name === measurements.name) { + foundName = true; + } + } + } else { + console.log('NO INFO for', channel); + } + + if (measurements.name && !foundName) { + names.push({ + label: measurements.name, + value: measurements.name, + description: `Frames with name ${measurements.name}`, + }); + } + } + + return ( + <> +
+ + v.value === measurements?.name) || names[0]} + onChange={this.onMeasurementNameChanged} + allowCustomValue={true} + backspaceRemovesValue={true} + placeholder="Filter by name" + isClearable={true} + noOptionsMessage="Filter by name" + formatCreateLabel={(input: string) => `Show: ${input}`} + isSearchable={true} + /> + +
+ )} + + +

+ This supports real-time event streams in Grafana core. This feature is under heavy development. Expect the + interfaces and structures to change as this becomes more production ready. +

+
+ + ); + } + render() { const query = defaults(this.props.query, defaultQuery); return ( -
- - v.value === query.queryType) || this.queryTypes[0]} + onChange={this.onQueryTypeChange} + /> + +
+ {query.queryType === GrafanaQueryType.LiveMeasurements && this.renderMeasurementsQuery()} + ); } } diff --git a/public/app/plugins/datasource/grafana/datasource.test.ts b/public/app/plugins/datasource/grafana/datasource.test.ts index 8c7550a0c10..c107003dd37 100644 --- a/public/app/plugins/datasource/grafana/datasource.test.ts +++ b/public/app/plugins/datasource/grafana/datasource.test.ts @@ -7,6 +7,11 @@ import { GrafanaQuery, GrafanaAnnotationQuery, GrafanaAnnotationType } from './t jest.mock('@grafana/runtime', () => ({ ...((jest.requireActual('@grafana/runtime') as unknown) as object), getBackendSrv: () => backendSrv, + getTemplateSrv: () => ({ + replace: (val: string) => { + return val.replace('$var2', 'replaced__delimiter__replaced2').replace('$var', 'replaced'); + }, + }), })); describe('grafana data source', () => { @@ -25,13 +30,7 @@ describe('grafana data source', () => { return Promise.resolve([]); }); - const templateSrvStub = { - replace: (val: string) => { - return val.replace('$var2', 'replaced__delimiter__replaced2').replace('$var', 'replaced'); - }, - }; - - ds = new GrafanaDatasource({} as DataSourceInstanceSettings, templateSrvStub as any); + ds = new GrafanaDatasource({} as DataSourceInstanceSettings); }); describe('with tags that have template variables', () => { diff --git a/public/app/plugins/datasource/grafana/datasource.ts b/public/app/plugins/datasource/grafana/datasource.ts index 2eed2b85771..4b071a7b302 100644 --- a/public/app/plugins/datasource/grafana/datasource.ts +++ b/public/app/plugins/datasource/grafana/datasource.ts @@ -6,47 +6,55 @@ import { DataQueryResponse, DataSourceApi, DataSourceInstanceSettings, + LiveChannelScope, } from '@grafana/data'; -import { GrafanaQuery, GrafanaAnnotationQuery, GrafanaAnnotationType } from './types'; -import { getBackendSrv, getTemplateSrv, TemplateSrv, toDataQueryResponse } from '@grafana/runtime'; +import { GrafanaQuery, GrafanaAnnotationQuery, GrafanaAnnotationType, GrafanaQueryType } from './types'; +import { getBackendSrv, getTemplateSrv, toDataQueryResponse, getLiveMeasurementsObserver } from '@grafana/runtime'; import { Observable, of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; +let counter = 100; + export class GrafanaDatasource extends DataSourceApi { - constructor( - instanceSettings: DataSourceInstanceSettings, - private readonly templateSrv: TemplateSrv = getTemplateSrv() - ) { + constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } query(request: DataQueryRequest): Observable { - const { intervalMs, maxDataPoints, range, requestId } = request; - - // Yes, this implementaiton ignores multiple targets! But that matches exisitng behavior - const params: Record = { - intervalMs, - maxDataPoints, - from: range.from.valueOf(), - to: range.to.valueOf(), - }; - - return getBackendSrv() - .fetch({ - url: '/api/tsdb/testdata/random-walk', - method: 'GET', - params, - requestId, - }) - .pipe( - map((rsp: any) => { - return toDataQueryResponse(rsp); - }), - catchError(err => { - return of(toDataQueryResponse(err)); - }) - ); + const queries: Array> = []; + for (const target of request.targets) { + if (target.hide) { + continue; + } + if (target.queryType === GrafanaQueryType.LiveMeasurements) { + const { channel, measurements } = target; + if (channel) { + queries.push( + getLiveMeasurementsObserver( + { + scope: LiveChannelScope.Grafana, + namespace: 'measurements', + path: channel, + }, + `${request.requestId}.${counter++}`, + measurements + ) + ); + } + } else { + queries.push(getRandomWalk(request)); + } + } + // With a single query just return the results + if (queries.length === 1) { + return queries[0]; + } + if (queries.length > 1) { + // HELP!!! + return queries[0]; + } + return of(); // nothing } metricFindQuery(options: any) { @@ -54,6 +62,7 @@ export class GrafanaDatasource extends DataSourceApi { } annotationQuery(options: AnnotationQueryRequest): Promise { + const templateSrv = getTemplateSrv(); const annotation = (options.annotation as unknown) as GrafanaAnnotationQuery; const params: any = { from: options.range.from.valueOf(), @@ -80,7 +89,7 @@ export class GrafanaDatasource extends DataSourceApi { const delimiter = '__delimiter__'; const tags = []; for (const t of params.tags) { - const renderedValues = this.templateSrv.replace(t, {}, (value: any) => { + const renderedValues = templateSrv.replace(t, {}, (value: any) => { if (typeof value === 'string') { return value; } @@ -105,3 +114,32 @@ export class GrafanaDatasource extends DataSourceApi { return Promise.resolve(); } } + +// Note that the query does not actually matter +function getRandomWalk(request: DataQueryRequest): Observable { + const { intervalMs, maxDataPoints, range, requestId } = request; + + // Yes, this implementation ignores multiple targets! But that matches existing behavior + const params: Record = { + intervalMs, + maxDataPoints, + from: range.from.valueOf(), + to: range.to.valueOf(), + }; + + return getBackendSrv() + .fetch({ + url: '/api/tsdb/testdata/random-walk', + method: 'GET', + params, + requestId, + }) + .pipe( + map((rsp: any) => { + return toDataQueryResponse(rsp); + }), + catchError(err => { + return of(toDataQueryResponse(err)); + }) + ); +} diff --git a/public/app/plugins/datasource/grafana/types.ts b/public/app/plugins/datasource/grafana/types.ts index 2a3af073f96..e9fe0abb58c 100644 --- a/public/app/plugins/datasource/grafana/types.ts +++ b/public/app/plugins/datasource/grafana/types.ts @@ -1,4 +1,5 @@ import { AnnotationQuery, DataQuery } from '@grafana/data'; +import { MeasurementsQuery } from '@grafana/runtime'; //---------------------------------------------- // Query @@ -6,12 +7,13 @@ import { AnnotationQuery, DataQuery } from '@grafana/data'; export enum GrafanaQueryType { RandomWalk = 'randomWalk', - RandomStream = 'randomStream', - HostMetrics = 'hostmetrics', + LiveMeasurements = 'measurements', } export interface GrafanaQuery extends DataQuery { queryType: GrafanaQueryType; // RandomWalk by default + channel?: string; + measurements?: MeasurementsQuery; } export const defaultQuery: GrafanaQuery = { diff --git a/public/app/plugins/datasource/testdata/QueryEditor.tsx b/public/app/plugins/datasource/testdata/QueryEditor.tsx index 9e8697dce94..be6e2be7556 100644 --- a/public/app/plugins/datasource/testdata/QueryEditor.tsx +++ b/public/app/plugins/datasource/testdata/QueryEditor.tsx @@ -14,6 +14,7 @@ import { TestDataQuery, Scenario } from './types'; import { PredictablePulseEditor } from './components/PredictablePulseEditor'; import { CSVWaveEditor } from './components/CSVWaveEditor'; import { defaultQuery } from './constants'; +import { GrafanaLiveEditor } from './components/GrafanaLiveEditor'; const showLabelsFor = ['random_walk', 'predictable_pulse', 'predictable_csv_wave']; const endpoints = [ @@ -59,17 +60,23 @@ export const QueryEditor = ({ query, datasource, onChange, onRunQuery }: Props) return; } - let stringInput = scenario.stringInput ?? ''; + const update = { ...query, scenarioId: item.value! }; - if (scenario.id === 'grafana_api') { - stringInput = 'datasources'; + if (scenario.stringInput) { + update.stringInput = scenario.stringInput; } - onUpdate({ - ...query, - scenarioId: item.value!, - stringInput, - }); + if (scenario.id === 'grafana_api') { + update.stringInput = 'datasources'; + } else if (scenario.id === 'streaming_client') { + update.stringInput = ''; + } else if (scenario.id === 'live') { + if (!update.channel) { + update.channel = 'random-2s-stream'; // default stream + } + } + + onUpdate(update); }; const onInputChange = (e: FormEvent) => { @@ -179,6 +186,7 @@ export const QueryEditor = ({ query, datasource, onChange, onRunQuery }: Props) {scenarioId === 'manual_entry' && } {scenarioId === 'random_walk' && } {scenarioId === 'streaming_client' && } + {scenarioId === 'live' && } {scenarioId === 'logs' && ( diff --git a/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx new file mode 100644 index 00000000000..ba9a038d0a5 --- /dev/null +++ b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx @@ -0,0 +1,37 @@ +import React from 'react'; +import { InlineField, InlineFieldRow, Select } from '@grafana/ui'; +import { SelectableValue } from '@grafana/data'; +import { EditorProps } from '../QueryEditor'; + +const liveTestDataChannels = [ + { + label: 'random-2s-stream', + value: 'random-2s-stream', + description: 'Random stream with points every 2s', + }, + { + label: 'random-flakey-stream', + value: 'random-flakey-stream', + description: 'Stream that returns data in random intervals', + }, +]; + +export const GrafanaLiveEditor = ({ onChange, query }: EditorProps) => { + const onChannelChange = ({ value }: SelectableValue) => { + onChange({ ...query, channel: value }); + }; + + return ( + + + s.value === namespace) || namespace || ''} + value={ + namespaces.find(s => s.value === namespace) ?? + (namespace ? { label: namespace, value: namespace } : undefined) + } onChange={this.onNamespaceChanged} allowCustomValue={true} backspaceRemovesValue={true} @@ -135,7 +141,7 @@ export class LiveChannelEditor extends PureComponent {