Live: Simplify live interface (#33196)

This commit is contained in:
Ryan McKinley 2021-04-23 14:21:38 -07:00 committed by GitHub
parent 1dd9e9b184
commit 693915de35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 405 additions and 382 deletions

View File

@ -1,5 +1,3 @@
import { Observable } from 'rxjs';
/**
* The channel id is defined as:
*
@ -16,30 +14,40 @@ export enum LiveChannelScope {
Stream = 'stream', // namespace = id for the managed data stream
}
/**
* The type of data to expect in a given channel
*
* @alpha
*/
export enum LiveChannelType {
DataStream = 'stream', // each message contains a batch of rows that will be appened to previous values
DataFrame = 'frame', // each message is an entire data frame and should *replace* previous content
JSON = 'json', // arbitray json message
}
/**
* @alpha -- experimental
*/
export interface LiveChannelConfig {
/**
* The path definition. either static, or it may contain variables identifed with {varname}
*/
path: string;
/**
* An optional description for the channel
*/
description?: string;
/**
* What kind of data do you expect
*/
type?: LiveChannelType;
/**
* The channel keeps track of who else is connected to the same channel
*/
hasPresence?: boolean;
/**
* This method will be defined if it is possible to publish in this channel.
* The function will return true/false if the current user can publish
* Allow users to write to the connection
*/
canPublish?: () => boolean;
canPublish?: boolean;
}
export enum LiveChannelConnectionState {
@ -172,50 +180,31 @@ export function parseLiveChannelAddress(id?: string): LiveChannelAddress | undef
/**
* Check if the address has a scope, namespace, and path
*
* @alpha -- experimental
*/
export function isValidLiveChannelAddress(addr?: LiveChannelAddress): addr is LiveChannelAddress {
return !!(addr?.path && addr.namespace && addr.scope);
}
/**
* Convert the address to an explicit channel path
*
* @alpha -- experimental
*/
export interface LiveChannel<TMessage = any, TPublish = any> {
/** The fully qualified channel id: ${scope}/${namespace}/${path} */
id: string;
/** The channel address */
addr: LiveChannelAddress;
/** Unix timestamp for when the channel connected */
opened: number;
/** Static definition of the channel definition. This may describe the channel usage */
config?: LiveChannelConfig;
/**
* Watch for messages in a channel
*/
getStream: () => Observable<LiveChannelEvent<TMessage>>;
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresence?: () => Promise<LiveChannelPresenceStatus>;
/**
* Write a message into the channel
*
* NOTE: This feature is supported by a limited set of channels
*/
publish?: (msg: TPublish) => Promise<any>;
/**
* Close and terminate the channel for everyone
*/
disconnect: () => void;
export function toLiveChannelId(addr: LiveChannelAddress): string {
if (!addr.scope) {
return '';
}
let id = addr.scope as string;
if (!addr.namespace) {
return id;
}
id += '/' + addr.namespace;
if (!addr.path) {
return id;
}
return id + '/' + addr.path;
}
/**
@ -226,9 +215,4 @@ export interface LiveChannelSupport {
* Get the channel handler for the path, or throw an error if invalid
*/
getChannelConfig(path: string): LiveChannelConfig | undefined;
/**
* Return a list of supported channels
*/
getSupportedPaths(): LiveChannelConfig[];
}

View File

@ -6,7 +6,6 @@
export * from './services';
export * from './config';
export * from './types';
export * from './utils/liveQuery';
export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin';
export { reportMetaAnalytics } from './utils/analytics';
export { logInfo, logDebug, logWarning, logError } from './utils/logging';

View File

@ -1,15 +1,36 @@
import { LiveChannel, LiveChannelAddress } from '@grafana/data';
import {
DataFrame,
DataQueryResponse,
LiveChannelAddress,
LiveChannelConfig,
LiveChannelEvent,
LiveChannelPresenceStatus,
StreamingFrameOptions,
} from '@grafana/data';
import { Observable } from 'rxjs';
/**
* @alpha -- experimental
*/
export interface GrafanaLiveSrv {
/**
* Is the server currently connected
*/
isConnected(): boolean;
export interface LiveDataFilter {
fields?: string[];
}
/**
* @alpha
*/
export interface LiveDataStreamOptions {
addr: LiveChannelAddress;
frame?: DataFrame; // initial results
key?: string;
buffer?: StreamingFrameOptions;
filter?: LiveDataFilter;
}
/**
* @alpha -- experimental
*/
export interface GrafanaLiveSrv {
/**
* Listen for changes to the main service
*/
@ -23,7 +44,24 @@ export interface GrafanaLiveSrv {
* Multiple requests for this channel will return the same object until
* the channel is shutdown
*/
getChannel<TMessage, TPublish = any>(address: LiveChannelAddress): LiveChannel<TMessage, TPublish>;
getChannelInfo(address: LiveChannelAddress): Promise<LiveChannelConfig>;
/**
* Watch for messages in a channel
*/
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>>;
/**
* Connect to a channel and return results as DataFrames
*/
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>;
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus>;
}
let singletonInstance: GrafanaLiveSrv;

View File

@ -25,6 +25,7 @@ const backendSrv = ({
} as unknown) as BackendSrv;
jest.mock('../services', () => ({
...(jest.requireActual('../services') as any),
getBackendSrv: () => backendSrv,
getDataSourceSrv: () => {
return {

View File

@ -13,9 +13,8 @@ import {
} from '@grafana/data';
import { merge, Observable, of } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { getBackendSrv, getDataSourceSrv } from '../services';
import { getBackendSrv, getDataSourceSrv, getGrafanaLiveSrv } from '../services';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
import { getLiveDataStream } from './liveQuery';
const ExpressionDatasourceID = '__expr__';
@ -223,6 +222,11 @@ export function toStreamingDataResponse(
request: DataQueryRequest,
rsp: DataQueryResponse
): Observable<DataQueryResponse> {
const live = getGrafanaLiveSrv();
if (!live) {
return of(rsp); // add warning?
}
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};
@ -238,7 +242,7 @@ export function toStreamingDataResponse(
const addr = parseLiveChannelAddress(frame.meta?.channel);
if (addr) {
streams.push(
getLiveDataStream({
live.getDataStream({
addr,
buffer,
frame: frame as DataFrame,

View File

@ -1,152 +0,0 @@
import {
DataFrame,
DataFrameJSON,
dataFrameToJSON,
DataQueryResponse,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
isValidLiveChannelAddress,
LiveChannelAddress,
LiveChannelConnectionState,
LiveChannelEvent,
LoadingState,
StreamingDataFrame,
StreamingFrameOptions,
} from '@grafana/data';
import { getGrafanaLiveSrv } from '../services/live';
import { Observable, of } from 'rxjs';
import { toDataQueryError } from './queryResponse';
import { perf } from './perf';
export interface LiveDataFilter {
fields?: string[];
}
/**
* @alpha
*/
export interface LiveDataStreamOptions {
key?: string;
addr: LiveChannelAddress;
frame?: DataFrame; // initial results
buffer?: StreamingFrameOptions;
filter?: LiveDataFilter;
}
/**
* Continue executing requests as long as `getNextQuery` returns a query
*
* @alpha
*/
export function getLiveDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
if (!isValidLiveChannelAddress(options.addr)) {
return of({
error: toDataQueryError(`invalid channel address: ${JSON.stringify(options.addr)}`),
state: LoadingState.Error,
data: options.frame ? [options.frame] : [],
});
}
const live = getGrafanaLiveSrv();
if (!live) {
return of({ error: toDataQueryError('grafana live is not initalized'), data: [] });
}
return new Observable<DataQueryResponse>((subscriber) => {
let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Loading;
let { key } = options;
let last = perf.last;
if (options.frame) {
const msg = dataFrameToJSON(options.frame);
data = new StreamingDataFrame(msg, options.buffer);
state = LoadingState.Streaming;
}
if (!key) {
key = `xstr/${streamCounter++}`;
}
const process = (msg: DataFrameJSON) => {
if (!data) {
data = new StreamingDataFrame(msg, options.buffer);
} else {
data.push(msg);
}
state = LoadingState.Streaming;
// Filter out fields
if (!filtered || msg.schema) {
filtered = data;
if (options.filter) {
const { fields } = options.filter;
if (fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => fields.includes(f.name)),
};
}
}
}
const elapsed = perf.last - last;
if (elapsed > 1000 || perf.ok) {
filtered.length = data.length; // make sure they stay up-to-date
subscriber.next({ state, data: [filtered], key });
last = perf.last;
}
};
const sub = live
.getChannel<DataFrameJSON>(options.addr)
.getStream()
.subscribe({
error: (err: any) => {
console.log('LiveQuery [error]', { err }, options.addr);
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
sub.unsubscribe(); // close after error
},
complete: () => {
console.log('LiveQuery [complete]', options.addr);
if (state !== LoadingState.Error) {
state = LoadingState.Done;
}
// or track errors? subscriber.next({ state, data: [data], key });
subscriber.complete();
sub.unsubscribe();
},
next: (evt: LiveChannelEvent) => {
if (isLiveChannelMessageEvent(evt)) {
process(evt.message);
return;
}
if (isLiveChannelStatusEvent(evt)) {
if (evt.error) {
let error = toDataQueryError(evt.error);
error.message = `Streaming channel error: ${error.message}`;
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error });
return;
} else if (
evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending
) {
if (evt.message) {
process(evt.message);
}
}
console.log('ignore state', evt);
}
},
});
return () => {
sub.unsubscribe();
};
});
}
// incremet the stream ids
let streamCounter = 10;

View File

@ -456,7 +456,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub
return response.Error(http.StatusBadRequest, "Bad channel address", nil)
}
logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "cmd", cmd)
logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel)
channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel)
if err != nil {

View File

@ -5,9 +5,10 @@ import (
"strings"
)
// StableSchemaFromValues assumes stable
func StableSchemaFromValues(values url.Values) bool {
key := "gf_live_stable_schema"
return strings.ToLower(values.Get(key)) == "true" || values.Get(key) == "1"
return !(strings.ToLower(values.Get(key)) == "false" || values.Get(key) == "0")
}
func FrameFormatFromValues(values url.Values) string {

View File

@ -1,12 +1,12 @@
import {
LiveChannelConfig,
LiveChannel,
LiveChannelStatusEvent,
LiveChannelEvent,
LiveChannelEventType,
LiveChannelConnectionState,
LiveChannelPresenceStatus,
LiveChannelAddress,
DataFrameJSON,
} from '@grafana/data';
import Centrifuge, {
JoinLeaveContext,
@ -22,14 +22,17 @@ import { Subject, of, Observable } from 'rxjs';
/**
* Internal class that maps Centrifuge support to GrafanaLive
*/
export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements LiveChannel<TMessage, TPublish> {
export class CentrifugeLiveChannel<T = any> {
readonly currentStatus: LiveChannelStatusEvent;
readonly opened = Date.now();
readonly id: string;
readonly addr: LiveChannelAddress;
readonly stream = new Subject<LiveChannelEvent<TMessage>>();
readonly stream = new Subject<LiveChannelEvent<T>>();
// Hold on to the last header with schema
lastMessageWithSchema?: DataFrameJSON;
/** Static definition of the channel definition. This may describe the channel usage */
config?: LiveChannelConfig;
@ -59,6 +62,10 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
publish: (ctx: PublicationContext) => {
try {
if (ctx.data) {
if (ctx.data.schema) {
this.lastMessageWithSchema = ctx.data as DataFrameJSON;
}
this.stream.next({
type: LiveChannelEventType.Message,
message: ctx.data,
@ -72,7 +79,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
this.sendStatus();
}
} catch (err) {
console.log('publish error', config.path, err);
console.log('publish error', this.addr, err);
this.currentStatus.error = err;
this.currentStatus.timestamp = Date.now();
this.sendStatus();
@ -87,6 +94,11 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
this.currentStatus.timestamp = Date.now();
this.currentStatus.state = LiveChannelConnectionState.Connected;
delete this.currentStatus.error;
if (ctx.data?.schema) {
this.lastMessageWithSchema = ctx.data as DataFrameJSON;
}
this.sendStatus(ctx.data);
},
unsubscribe: (ctx: UnsubscribeContext) => {
@ -103,14 +115,6 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
events.leave = (ctx: JoinLeaveContext) => {
this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
};
this.getPresence = () => {
return this.subscription!.presence().then((v) => {
return {
users: Object.keys(v.presence),
};
});
};
}
return events;
}
@ -123,6 +127,13 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
this.stream.next(copy);
}
disconnectIfNoListeners = () => {
const count = this.stream.observers.length;
if (count === 0) {
this.disconnect();
}
};
/**
* Get the stream of events and
*/
@ -133,25 +144,29 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
return () => {
sub.unsubscribe();
const count = this.stream.observers.length;
console.log('unsubscribe stream', this.addr, count);
// Fully disconnect when no more listeners
// Wait 1/4 second to fully disconnect
if (count === 0) {
this.disconnect();
setTimeout(this.disconnectIfNoListeners, 250);
}
};
}) as Observable<LiveChannelEvent<TMessage>>;
}) as Observable<LiveChannelEvent<T>>;
}
/**
* This is configured by the server when the config supports presence
*/
getPresence?: () => Promise<LiveChannelPresenceStatus>;
async getPresence(): Promise<LiveChannelPresenceStatus> {
if (!this.subscription) {
return Promise.reject('not subscribed');
}
/**
* This is configured by the server when config supports writing
*/
publish?: (msg: TPublish) => Promise<any>;
return this.subscription!.presence().then((v) => {
return {
users: Object.keys(v.presence),
};
});
}
/**
* This will close and terminate all streams for this channel
@ -183,7 +198,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
}
}
export function getErrorChannel(msg: string, id: string, addr: LiveChannelAddress): LiveChannel {
export function getErrorChannel<TMessage>(msg: string, id: string, addr: LiveChannelAddress) {
return {
id,
opened: Date.now(),

View File

@ -5,26 +5,26 @@ import {
AppEvents,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
LiveChannel,
LiveChannelConfig,
LiveChannelConnectionState,
LiveChannelEvent,
LiveChannelScope,
toLiveChannelId,
} from '@grafana/data';
import { DashboardChangedModal } from './DashboardChangedModal';
import { DashboardEvent, DashboardEventAction } from './types';
import { CoreGrafanaLiveFeature } from '../scopes';
import { sessionId } from '../live';
import { ShowModalReactEvent } from '../../../types/events';
import { Unsubscribable } from 'rxjs';
import { getBackendSrv } from 'app/core/services/backend_srv';
class DashboardWatcher {
channel?: LiveChannel<DashboardEvent>;
channel?: string; // path to the channel
uid?: string;
ignoreSave?: boolean;
editing = false;
lastEditing?: DashboardEvent;
subscription?: Unsubscribable;
setEditingState(state: boolean) {
const changed = (this.editing = state);
@ -38,7 +38,7 @@ class DashboardWatcher {
private sendEditingState() {
if (this.channel && this.uid) {
getBackendSrv().post(`api/live/publish`, {
channel: this.channel.id,
channel: this.channel,
data: {
sessionId,
uid: this.uid,
@ -57,23 +57,25 @@ class DashboardWatcher {
// Check for changes
if (uid !== this.uid) {
const addr = {
scope: LiveChannelScope.Grafana,
namespace: 'dashboard',
path: `uid/${uid}`,
};
this.leave();
if (uid) {
this.channel = live.getChannel({
scope: LiveChannelScope.Grafana,
namespace: 'dashboard',
path: `uid/${uid}`,
});
this.channel.getStream().subscribe(this.observer);
this.subscription = live.getStream(addr).subscribe(this.observer);
}
this.uid = uid;
this.channel = toLiveChannelId(addr);
}
}
leave() {
if (this.channel) {
this.channel.disconnect();
if (this.subscription) {
this.subscription.unsubscribe();
}
this.subscription = undefined;
this.uid = undefined;
}
@ -161,23 +163,13 @@ class DashboardWatcher {
export const dashboardWatcher = new DashboardWatcher();
export function getDashboardChannelsFeature(): CoreGrafanaLiveFeature {
const dashboardConfig: LiveChannelConfig = {
path: '${uid}',
description: 'Dashboard change events',
hasPresence: true,
canPublish: () => true,
};
return {
name: 'dashboard',
support: {
getChannelConfig: (path: string) => {
return {
...dashboardConfig,
path, // set the real path
};
},
getSupportedPaths: () => [dashboardConfig],
getChannelConfig: (path: string) => ({
description: 'Dashboard change events',
hasPresence: true,
}),
},
description: 'Dashboard listener',
};

View File

@ -1,58 +1,34 @@
import { LiveChannelConfig } from '@grafana/data';
import { LiveChannelType } from '@grafana/data';
import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher';
import { LiveMeasurementsSupport } from './measurements/measurementsSupport';
import { grafanaLiveCoreFeatures } from './scopes';
export function registerLiveFeatures() {
const channels: LiveChannelConfig[] = [
{
path: 'random-2s-stream',
description: 'Random stream with points every 2s',
},
{
path: 'random-flakey-stream',
description: 'Random stream with flakey data points',
},
{
path: 'random-20Hz-stream',
description: 'Random stream with points in 20Hz',
},
];
grafanaLiveCoreFeatures.register({
name: 'testdata',
support: {
getChannelConfig: (path: string) => {
return channels.find((c) => c.path === path);
return {
type: LiveChannelType.DataStream,
};
},
getSupportedPaths: () => channels,
},
description: 'Test data generations',
});
const broadcastConfig: LiveChannelConfig = {
path: '${path}',
description: 'Broadcast any messages to a channel',
canPublish: () => true,
};
grafanaLiveCoreFeatures.register({
name: 'broadcast',
support: {
getChannelConfig: (path: string) => {
return broadcastConfig;
return {
type: LiveChannelType.JSON,
canPublish: true,
description: 'Broadcast any messages to a channel',
};
},
getSupportedPaths: () => [broadcastConfig],
},
description: 'Broadcast will send/receive any JSON object in a channel',
});
grafanaLiveCoreFeatures.register({
name: 'measurements',
support: new LiveMeasurementsSupport(),
description: 'These channels listen for measurements and produce DataFrames',
});
// dashboard/*
grafanaLiveCoreFeatures.register(getDashboardChannelsFeature());
}

View File

@ -1,7 +1,30 @@
import Centrifuge from 'centrifuge/dist/centrifuge';
import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime';
import { BehaviorSubject } from 'rxjs';
import { LiveChannel, LiveChannelScope, LiveChannelAddress, LiveChannelConnectionState } from '@grafana/data';
import {
GrafanaLiveSrv,
setGrafanaLiveSrv,
getGrafanaLiveSrv,
config,
LiveDataStreamOptions,
toDataQueryError,
} from '@grafana/runtime';
import { BehaviorSubject, Observable, of } from 'rxjs';
import {
LiveChannelScope,
LiveChannelAddress,
LiveChannelConnectionState,
LiveChannelConfig,
LiveChannelEvent,
DataQueryResponse,
LiveChannelPresenceStatus,
isValidLiveChannelAddress,
LoadingState,
DataFrameJSON,
StreamingDataFrame,
DataFrame,
dataFrameToJSON,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
} from '@grafana/data';
import { CentrifugeLiveChannel, getErrorChannel } from './channel';
import {
GrafanaLiveScope,
@ -11,6 +34,7 @@ import {
GrafanaLiveStreamScope,
} from './scopes';
import { registerLiveFeatures } from './features';
import { perf } from './perf';
export const sessionId =
(window as any)?.grafanaBootData?.user?.id +
@ -21,7 +45,6 @@ export const sessionId =
export class CentrifugeSrv implements GrafanaLiveSrv {
readonly open = new Map<string, CentrifugeLiveChannel>();
readonly centrifuge: Centrifuge;
readonly connectionState: BehaviorSubject<boolean>;
readonly connectionBlocker: Promise<void>;
@ -84,7 +107,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status
*/
getChannel<TMessage, TPublish = any>(addr: LiveChannelAddress): LiveChannel<TMessage, TPublish> {
getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> {
const id = `${addr.scope}/${addr.namespace}/${addr.path}`;
let channel = this.open.get(id);
if (channel != null) {
@ -93,7 +116,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
const scope = this.scopes[addr.scope];
if (!scope) {
return getErrorChannel('invalid scope', id, addr);
return getErrorChannel<TMessage>('invalid scope', id, addr) as any;
}
channel = new CentrifugeLiveChannel(id, addr);
@ -125,9 +148,6 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
if (!config) {
throw new Error('unknown path: ' + addr.path);
}
if (config.canPublish?.()) {
channel.publish = (data: any) => this.centrifuge.publish(channel.id, data);
}
const events = channel.initalize(config);
if (!this.centrifuge.isConnected()) {
await this.connectionBlocker;
@ -140,21 +160,164 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
// Exported functions
//----------------------------------------------------------
/**
* Is the server currently connected
*/
isConnected() {
return this.centrifuge.isConnected();
}
/**
* Listen for changes to the connection state
*/
getConnectionState() {
return this.connectionState.asObservable();
}
/**
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status.
*
* This is a singleton instance that stays active until explicitly shutdown.
* Multiple requests for this channel will return the same object until
* the channel is shutdown
*/
async getChannelInfo(addr: LiveChannelAddress): Promise<LiveChannelConfig> {
const scope = this.scopes[addr.scope];
if (!scope) {
return Promise.reject('invalid scope');
}
const support = await scope.getChannelSupport(addr.namespace);
if (!support) {
return Promise.reject(addr.namespace + ' does not support streaming');
}
return support.getChannelConfig(addr.path)!;
}
/**
* Watch for messages in a channel
*/
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
return this.getChannel<T>(address).getStream();
}
/**
* Connect to a channel and return results as DataFrames
*/
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
if (!isValidLiveChannelAddress(options.addr)) {
return of({
error: toDataQueryError(`invalid channel address: ${JSON.stringify(options.addr)}`),
state: LoadingState.Error,
data: options.frame ? [options.frame] : [],
});
}
return new Observable<DataQueryResponse>((subscriber) => {
const channel = this.getChannel(options.addr);
let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Loading;
let { key } = options;
let last = perf.last;
if (options.frame) {
const msg = dataFrameToJSON(options.frame);
data = new StreamingDataFrame(msg, options.buffer);
state = LoadingState.Streaming;
}
if (channel.lastMessageWithSchema && !data) {
data = new StreamingDataFrame(channel.lastMessageWithSchema, options.buffer);
}
if (!key) {
key = `xstr/${streamCounter++}`;
}
const process = (msg: DataFrameJSON) => {
if (!data) {
data = new StreamingDataFrame(msg, options.buffer);
} else {
data.push(msg);
}
state = LoadingState.Streaming;
// Filter out fields
if (!filtered || msg.schema) {
filtered = data;
if (options.filter) {
const { fields } = options.filter;
if (fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => fields.includes(f.name)),
};
}
}
}
const elapsed = perf.last - last;
if (elapsed > 1000 || perf.ok) {
filtered.length = data.length; // make sure they stay up-to-date
subscriber.next({ state, data: [filtered], key });
last = perf.last;
}
};
const sub = channel.getStream().subscribe({
error: (err: any) => {
console.log('LiveQuery [error]', { err }, options.addr);
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
sub.unsubscribe(); // close after error
},
complete: () => {
console.log('LiveQuery [complete]', options.addr);
if (state !== LoadingState.Error) {
state = LoadingState.Done;
}
// or track errors? subscriber.next({ state, data: [data], key });
subscriber.complete();
sub.unsubscribe();
},
next: (evt: LiveChannelEvent) => {
if (isLiveChannelMessageEvent(evt)) {
process(evt.message);
return;
}
if (isLiveChannelStatusEvent(evt)) {
if (evt.error) {
let error = toDataQueryError(evt.error);
error.message = `Streaming channel error: ${error.message}`;
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error });
return;
} else if (
evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending
) {
if (evt.message) {
process(evt.message);
}
return;
}
console.log('ignore state', evt);
}
},
});
return () => {
sub.unsubscribe();
};
});
}
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
return this.getChannel(address).getPresence();
}
}
// This is used to give a unique key for each stream. The actual value does not matter
let streamCounter = 0;
export function getGrafanaLiveCentrifugeSrv() {
return getGrafanaLiveSrv() as CentrifugeSrv;
}

View File

@ -1,26 +1,17 @@
import { LiveChannelSupport, LiveChannelConfig } from '@grafana/data';
import { LiveChannelSupport, LiveChannelConfig, LiveChannelType } from '@grafana/data';
/**
* Generic description of channels that support streams
*
* @alpha
*/
export class LiveMeasurementsSupport implements LiveChannelSupport {
private cache: Record<string, LiveChannelConfig> = {};
/**
* Get the channel handler for the path, or throw an error if invalid
*/
getChannelConfig(path: string): LiveChannelConfig | undefined {
let c = this.cache[path];
if (!c) {
c = {
path,
};
}
return c;
}
/**
* Return a list of supported channels
*/
getSupportedPaths(): LiveChannelConfig[] {
// this should ask the server what channels it has seen
return [];
return {
type: LiveChannelType.DataStream,
};
}
}

View File

@ -190,12 +190,11 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
mergeMap((response: TSDBResponse) => {
const dataQueryResponse = toDataQueryResponse({ data: response }, options.targets);
const channelName: string = dataQueryResponse.data[0].meta.custom.channelName;
const channel = getGrafanaLiveSrv().getChannel({
return getGrafanaLiveSrv().getStream({
scope: LiveChannelScope.Plugin,
namespace: 'cloudwatch',
path: channelName,
});
return channel.getStream();
}),
filter((e: LiveChannelEvent<any>) => e.type === 'message'),
map(({ message }: LiveChannelMessageEvent<TSDBResponse>) => {

View File

@ -7,6 +7,7 @@ import { CloudWatchJsonData, CloudWatchQuery } from './types';
import { CloudWatchLogsQueryEditor } from './components/LogsQueryEditor';
import { PanelQueryEditor } from './components/PanelQueryEditor';
import LogsCheatSheet from './components/LogsCheatSheet';
import { LiveMeasurementsSupport } from 'app/features/live/measurements/measurementsSupport';
export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery, CloudWatchJsonData>(
CloudWatchDatasource
@ -17,9 +18,4 @@ export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery
.setExploreMetricsQueryField(PanelQueryEditor)
.setExploreLogsQueryField(CloudWatchLogsQueryEditor)
.setAnnotationQueryCtrl(CloudWatchAnnotationsQueryCtrl)
.setChannelSupport({
getChannelConfig: (path: string) => ({
path,
}),
getSupportedPaths: () => [],
});
.setChannelSupport(new LiveMeasurementsSupport());

View File

@ -11,7 +11,7 @@ import {
} from '@grafana/data';
import { GrafanaQuery, GrafanaAnnotationQuery, GrafanaAnnotationType, GrafanaQueryType } from './types';
import { getBackendSrv, getTemplateSrv, toDataQueryResponse, getLiveDataStream } from '@grafana/runtime';
import { getBackendSrv, getGrafanaLiveSrv, getTemplateSrv, toDataQueryResponse } from '@grafana/runtime';
import { Observable, of, merge } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
@ -53,7 +53,7 @@ export class GrafanaDatasource extends DataSourceApi<GrafanaQuery> {
}
queries.push(
getLiveDataStream({
getGrafanaLiveSrv().getDataStream({
key: `${request.requestId}.${counter++}`,
addr: addr!,
filter,

View File

@ -14,7 +14,7 @@ import {
TimeRange,
} from '@grafana/data';
import { Scenario, TestDataQuery } from './types';
import { DataSourceWithBackend, getBackendSrv, getLiveDataStream, getTemplateSrv, TemplateSrv } from '@grafana/runtime';
import { DataSourceWithBackend, getBackendSrv, getGrafanaLiveSrv, getTemplateSrv, TemplateSrv } from '@grafana/runtime';
import { queryMetricTree } from './metricTree';
import { runStream } from './runStreams';
import { getSearchFilterScopedVar } from 'app/features/variables/utils';
@ -188,7 +188,7 @@ function runGrafanaLiveQuery(
if (!target.channel) {
throw new Error(`Missing channel config`);
}
return getLiveDataStream({
return getGrafanaLiveSrv().getDataStream({
addr: {
scope: LiveChannelScope.Plugin,
namespace: 'testdata',

View File

@ -16,7 +16,7 @@ import {
import { TestDataQuery, StreamingQuery } from './types';
import { getRandomLine } from './LogIpsum';
import { perf } from '@grafana/runtime/src/utils/perf'; // not exported
import { perf } from 'app/features/live/perf';
export const defaultStreamQuery: StreamingQuery = {
type: 'signal',

View File

@ -51,18 +51,11 @@ export class LiveChannelEditor extends PureComponent<Props, State> {
const srv = getGrafanaLiveCentrifugeSrv();
const namespaces = await srv.scopes[scope].listNamespaces();
const support = namespace ? await srv.scopes[scope].getChannelSupport(namespace) : undefined;
const paths = support ? await support.getSupportedPaths() : undefined;
this.setState({
namespaces,
support,
paths: paths
? paths.map((p) => ({
label: p.path,
value: p.path,
description: p.description,
}))
: [],
paths: [],
});
}

View File

@ -6,7 +6,6 @@ import {
PanelProps,
LiveChannelStatusEvent,
isValidLiveChannelAddress,
LiveChannel,
LiveChannelEvent,
isLiveChannelStatusEvent,
isLiveChannelMessageEvent,
@ -15,17 +14,22 @@ import {
LoadingState,
applyFieldOverrides,
StreamingDataFrame,
LiveChannelAddress,
LiveChannelConfig,
toLiveChannelId,
} from '@grafana/data';
import { TablePanel } from '../table/TablePanel';
import { LivePanelOptions, MessageDisplayMode } from './types';
import { config, getGrafanaLiveSrv } from '@grafana/runtime';
import { config, getBackendSrv, getGrafanaLiveSrv } from '@grafana/runtime';
import { css, cx } from '@emotion/css';
import { isEqual } from 'lodash';
interface Props extends PanelProps<LivePanelOptions> {}
interface State {
error?: any;
channel?: LiveChannel;
addr?: LiveChannelAddress;
info?: LiveChannelConfig;
status?: LiveChannelStatusEvent;
message?: any;
changed: number;
@ -84,26 +88,37 @@ export class LivePanel extends PureComponent<Props, State> {
console.log('INVALID', addr);
this.unsubscribe();
this.setState({
channel: undefined,
addr: undefined,
info: undefined,
});
return;
}
const channel = getGrafanaLiveSrv().getChannel(addr);
const changed = channel.id !== this.state.channel?.id;
console.log('LOAD', addr, changed, channel);
if (changed) {
this.unsubscribe();
if (isEqual(addr, this.state.addr)) {
console.log('Same channel', this.state.addr);
return;
}
// Subscribe to new events
try {
this.subscription = channel.getStream().subscribe(this.streamObserver);
this.setState({ channel, error: undefined });
} catch (err) {
this.setState({ channel: undefined, error: err });
}
} else {
console.log('Same channel', channel);
const live = getGrafanaLiveSrv();
if (!live) {
console.log('INVALID', addr);
this.unsubscribe();
this.setState({
addr: undefined,
info: undefined,
});
return;
}
this.unsubscribe();
console.log('LOAD', addr);
// Subscribe to new events
try {
this.subscription = live.getStream(addr).subscribe(this.streamObserver);
this.setState({ addr, error: undefined });
} catch (err) {
this.setState({ addr: undefined, error: err });
}
}
@ -137,18 +152,23 @@ export class LivePanel extends PureComponent<Props, State> {
};
onPublishClicked = async () => {
const { channel } = this.state;
if (!channel?.publish) {
const { addr, info } = this.state;
if (!info?.canPublish || !addr) {
console.log('channel does not support publishing');
return;
}
const json = this.props.options?.json;
if (json) {
const rsp = await channel.publish(json);
console.log('onPublishClicked (response from publish)', rsp);
} else {
const data = this.props.options?.json;
if (!data) {
console.log('nothing to publish');
return;
}
const rsp = await getBackendSrv().post(`api/live/publish`, {
channel: toLiveChannelId(addr),
data,
});
console.log('onPublishClicked (response from publish)', rsp);
};
renderMessage(height: number) {
@ -194,8 +214,11 @@ export class LivePanel extends PureComponent<Props, State> {
}
renderPublish(height: number) {
const { channel } = this.state;
if (!channel?.publish) {
const { info } = this.state;
if (!info) {
return <div>No info</div>;
}
if (!info.canPublish) {
return <div>This channel does not support publishing</div>;
}
@ -272,8 +295,8 @@ export class LivePanel extends PureComponent<Props, State> {
if (!this.isValid) {
return this.renderNotEnabled();
}
const { channel, error } = this.state;
if (!channel) {
const { addr, error } = this.state;
if (!addr) {
return (
<FeatureInfoBox
title="Grafana Live"