mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: improved channel state (#27672)
This commit is contained in:
@@ -9,6 +9,7 @@ import { DataFrame, DataFrameDTO } from './dataFrame';
|
|||||||
import { RawTimeRange, TimeRange } from './time';
|
import { RawTimeRange, TimeRange } from './time';
|
||||||
import { ScopedVars } from './ScopedVars';
|
import { ScopedVars } from './ScopedVars';
|
||||||
import { CoreApp } from './app';
|
import { CoreApp } from './app';
|
||||||
|
import { LiveChannelSupport } from './live';
|
||||||
|
|
||||||
export interface DataSourcePluginOptionsEditorProps<JSONData = DataSourceJsonData, SecureJSONData = {}> {
|
export interface DataSourcePluginOptionsEditorProps<JSONData = DataSourceJsonData, SecureJSONData = {}> {
|
||||||
options: DataSourceSettings<JSONData, SecureJSONData>;
|
options: DataSourceSettings<JSONData, SecureJSONData>;
|
||||||
@@ -284,6 +285,15 @@ export abstract class DataSourceApi<
|
|||||||
* @deprecated -- prefer using {@link AnnotationSupport}
|
* @deprecated -- prefer using {@link AnnotationSupport}
|
||||||
*/
|
*/
|
||||||
annotationQuery?(options: AnnotationQueryRequest<TQuery>): Promise<AnnotationEvent[]>;
|
annotationQuery?(options: AnnotationQueryRequest<TQuery>): Promise<AnnotationEvent[]>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define live streaming behavior within this datasource settings
|
||||||
|
*
|
||||||
|
* Note: `plugin.json` must also define `live: true`
|
||||||
|
*
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
channelSupport?: LiveChannelSupport;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface MetadataInspectorProps<
|
export interface MetadataInspectorProps<
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ export * from './orgs';
|
|||||||
export * from './flot';
|
export * from './flot';
|
||||||
export * from './trace';
|
export * from './trace';
|
||||||
export * from './explore';
|
export * from './explore';
|
||||||
|
export * from './live';
|
||||||
|
|
||||||
import * as AppEvents from './appEvents';
|
import * as AppEvents from './appEvents';
|
||||||
import { AppEvent } from './appEvents';
|
import { AppEvent } from './appEvents';
|
||||||
|
|||||||
172
packages/grafana-data/src/types/live.ts
Normal file
172
packages/grafana-data/src/types/live.ts
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
import { SelectableValue } from './select';
|
||||||
|
import { Observable } from 'rxjs';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The channel id is defined as:
|
||||||
|
*
|
||||||
|
* ${scope}/${namespace}/${path}
|
||||||
|
*
|
||||||
|
* The scope drives how the namespace is used and controlled
|
||||||
|
*/
|
||||||
|
export enum LiveChannelScope {
|
||||||
|
DataSource = 'ds', // namespace = data source ID
|
||||||
|
Plugin = 'plugin', // namespace = plugin name (singleton works for apps too)
|
||||||
|
Grafana = 'grafana', // namespace = feature
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannelConfig<TMessage = any> {
|
||||||
|
/**
|
||||||
|
* The path definition. either static, or it may contain variables identifed with {varname}
|
||||||
|
*/
|
||||||
|
path: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An optional description for the channel
|
||||||
|
*/
|
||||||
|
description?: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When variables exist, this list will identify each one
|
||||||
|
*/
|
||||||
|
variables?: Array<SelectableValue<string>>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The channel keeps track of who else is connected to the same channel
|
||||||
|
*/
|
||||||
|
hasPresense?: boolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will be defined if it is possible to publish in this channel.
|
||||||
|
* The function will return true/false if the current user can publish
|
||||||
|
*/
|
||||||
|
canPublish?: () => boolean;
|
||||||
|
|
||||||
|
/** convert the raw stream message into a message that should be broadcast */
|
||||||
|
processMessage?: (msg: any) => TMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
export enum LiveChannelConnectionState {
|
||||||
|
/** The connection is not yet established */
|
||||||
|
Pending = 'pending',
|
||||||
|
/** Connected to the channel */
|
||||||
|
Connected = 'connected',
|
||||||
|
/** Disconnected from the channel. The channel will reconnect when possible */
|
||||||
|
Disconnected = 'disconnected',
|
||||||
|
/** Was at some point connected, and will not try to reconnect */
|
||||||
|
Shutdown = 'shutdown',
|
||||||
|
/** Channel configuraiton was invalid and will not connect */
|
||||||
|
Invalid = 'invalid',
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannelStatus {
|
||||||
|
/**
|
||||||
|
* {scope}/{namespace}/{path}
|
||||||
|
*/
|
||||||
|
id: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* unix millies timestamp for the last status change
|
||||||
|
*/
|
||||||
|
timestamp: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* flag if the channel is activly connected to the channel.
|
||||||
|
* This may be false while the connections get established or if the network is lost
|
||||||
|
* As long as the `shutdown` flag is not set, the connection will try to reestablish
|
||||||
|
*/
|
||||||
|
state: LiveChannelConnectionState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The last error.
|
||||||
|
*
|
||||||
|
* This will remain in the status until a new message is succesfully recieved from the channel
|
||||||
|
*/
|
||||||
|
error?: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannelJoinLeave {
|
||||||
|
user: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannelPresense {
|
||||||
|
users: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface LiveChannelMessage<TMessage = any> {
|
||||||
|
type: 'status' | 'message' | 'join' | 'leave';
|
||||||
|
message: TMessage | LiveChannelStatus | LiveChannelJoinLeave;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannel<TMessage = any, TPublish = any> {
|
||||||
|
/** The fully qualified channel id: ${scope}/${namespace}/${path} */
|
||||||
|
id: string;
|
||||||
|
|
||||||
|
/** The scope for this channel */
|
||||||
|
scope: LiveChannelScope;
|
||||||
|
|
||||||
|
/** datasourceId/plugin name/feature depending on scope */
|
||||||
|
namespace: string;
|
||||||
|
|
||||||
|
/** additional qualifier */
|
||||||
|
path: string;
|
||||||
|
|
||||||
|
/** Unix timestamp for when the channel connected */
|
||||||
|
opened: number;
|
||||||
|
|
||||||
|
/** Static definition of the channel definition. This may describe the channel usage */
|
||||||
|
config?: LiveChannelConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watch all events in this channel
|
||||||
|
*/
|
||||||
|
getStream: () => Observable<LiveChannelMessage<TMessage>>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For channels that support presense, this will request the current state from the server.
|
||||||
|
*
|
||||||
|
* Join and leave messages will be sent to the open stream
|
||||||
|
*/
|
||||||
|
getPresense?: () => Promise<LiveChannelPresense>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a message into the channel
|
||||||
|
*
|
||||||
|
* NOTE: This feature is supported by a limited set of channels
|
||||||
|
*/
|
||||||
|
publish?: (msg: TPublish) => Promise<any>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will close and terminate this channel
|
||||||
|
*/
|
||||||
|
disconnect: () => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental
|
||||||
|
*/
|
||||||
|
export interface LiveChannelSupport {
|
||||||
|
/**
|
||||||
|
* Get the channel handler for the path, or throw an error if invalid
|
||||||
|
*/
|
||||||
|
getChannelConfig(path: string): LiveChannelConfig | undefined;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a list of supported channels
|
||||||
|
*/
|
||||||
|
getSupportedPaths(): LiveChannelConfig[];
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
import { ComponentClass } from 'react';
|
import { ComponentClass } from 'react';
|
||||||
import { KeyValue } from './data';
|
import { KeyValue } from './data';
|
||||||
|
import { LiveChannelSupport } from './live';
|
||||||
|
|
||||||
export enum PluginState {
|
export enum PluginState {
|
||||||
alpha = 'alpha', // Only included it `enable_alpha` is true
|
alpha = 'alpha', // Only included it `enable_alpha` is true
|
||||||
@@ -47,6 +48,7 @@ export interface PluginMeta<T extends KeyValue = {}> {
|
|||||||
latestVersion?: string;
|
latestVersion?: string;
|
||||||
pinned?: boolean;
|
pinned?: boolean;
|
||||||
signature?: PluginSignatureStatus;
|
signature?: PluginSignatureStatus;
|
||||||
|
live?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface PluginDependencyInfo {
|
interface PluginDependencyInfo {
|
||||||
@@ -139,6 +141,13 @@ export class GrafanaPlugin<T extends PluginMeta = PluginMeta> {
|
|||||||
// This is set if the plugin system had errors loading the plugin
|
// This is set if the plugin system had errors loading the plugin
|
||||||
loadError?: boolean;
|
loadError?: boolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Live streaming support
|
||||||
|
*
|
||||||
|
* Note: `plugin.json` must also define `live: true`
|
||||||
|
*/
|
||||||
|
channelSupport?: LiveChannelSupport;
|
||||||
|
|
||||||
// Config control (app/datasource)
|
// Config control (app/datasource)
|
||||||
angularConfigCtrl?: any;
|
angularConfigCtrl?: any;
|
||||||
|
|
||||||
@@ -154,6 +163,14 @@ export class GrafanaPlugin<T extends PluginMeta = PluginMeta> {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify how the plugin should support paths within the live streaming environment
|
||||||
|
*/
|
||||||
|
setChannelSupport(support: LiveChannelSupport) {
|
||||||
|
this.channelSupport = support;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.meta = {} as T;
|
this.meta = {} as T;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,25 +1,6 @@
|
|||||||
|
import { LiveChannel, LiveChannelScope } from '@grafana/data';
|
||||||
import { Observable } from 'rxjs';
|
import { Observable } from 'rxjs';
|
||||||
|
|
||||||
/**
|
|
||||||
* @experimental
|
|
||||||
*/
|
|
||||||
export interface ChannelHandler<T = any> {
|
|
||||||
/**
|
|
||||||
* Process the raw message from the server before broadcasting it
|
|
||||||
* to all subscribeers on this channel
|
|
||||||
*/
|
|
||||||
onPublish(msg: any): T;
|
|
||||||
}
|
|
||||||
|
|
||||||
// export interface SubscriptionEvents {
|
|
||||||
// publish?: (ctx: PublicationContext) => void;
|
|
||||||
// join?: (ctx: JoinLeaveContext) => void;
|
|
||||||
// leave?: (ctx: JoinLeaveContext) => void;
|
|
||||||
// subscribe?: (ctx: SubscribeSuccessContext) => void;
|
|
||||||
// error?: (ctx: SubscribeErrorContext) => void;
|
|
||||||
// unsubscribe?: (ctx: UnsubscribeContext) => void;
|
|
||||||
// }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @experimental
|
* @experimental
|
||||||
*/
|
*/
|
||||||
@@ -30,24 +11,23 @@ export interface GrafanaLiveSrv {
|
|||||||
isConnected(): boolean;
|
isConnected(): boolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Listen for changes to the connection state
|
* Listen for changes to the main service
|
||||||
*/
|
*/
|
||||||
getConnectionState(): Observable<boolean>;
|
getConnectionState(): Observable<boolean>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure a channel with the given setup
|
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
|
||||||
|
* channel will be returned with an error state indicated in its status.
|
||||||
|
*
|
||||||
|
* This is a singleton instance that stays active until explicitly shutdown.
|
||||||
|
* Multiple requests for this channel will return the same object until
|
||||||
|
* the channel is shutdown
|
||||||
*/
|
*/
|
||||||
initChannel<T>(channel: string, handler: ChannelHandler<T>): void;
|
getChannel<TMessage, TPublish>(
|
||||||
|
scope: LiveChannelScope,
|
||||||
/**
|
namespace: string,
|
||||||
* Subscribe to activity on a given channel
|
path: string
|
||||||
*/
|
): LiveChannel<TMessage, TPublish>;
|
||||||
getChannelStream<T>(channel: string): Observable<T>;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send data to a channel. This feature is disabled for most channels and will return an error
|
|
||||||
*/
|
|
||||||
publish<T>(channel: string, data: any): Promise<T>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let singletonInstance: GrafanaLiveSrv;
|
let singletonInstance: GrafanaLiveSrv;
|
||||||
|
|||||||
@@ -86,8 +86,8 @@ func (hs *HTTPServer) Init() error {
|
|||||||
hs.Live = node
|
hs.Live = node
|
||||||
|
|
||||||
// Spit random walk to example
|
// Spit random walk to example
|
||||||
go live.RunRandomCSV(hs.Live, "random-2s-stream", 2000, 0)
|
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-2s-stream", 2000, 0)
|
||||||
go live.RunRandomCSV(hs.Live, "random-flakey-stream", 400, .6)
|
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-flakey-stream", 400, .6)
|
||||||
}
|
}
|
||||||
|
|
||||||
hs.macaron = hs.newMacaron()
|
hs.macaron = hs.newMacaron()
|
||||||
|
|||||||
@@ -1,85 +1,137 @@
|
|||||||
import React, { PureComponent } from 'react';
|
import React, { PureComponent } from 'react';
|
||||||
import { hot } from 'react-hot-loader';
|
import { hot } from 'react-hot-loader';
|
||||||
import { connect } from 'react-redux';
|
import { connect } from 'react-redux';
|
||||||
|
import { css } from 'emotion';
|
||||||
import { StoreState } from 'app/types';
|
import { StoreState } from 'app/types';
|
||||||
import { getNavModel } from 'app/core/selectors/navModel';
|
import { getNavModel } from 'app/core/selectors/navModel';
|
||||||
import Page from 'app/core/components/Page/Page';
|
import Page from 'app/core/components/Page/Page';
|
||||||
import { NavModel, SelectableValue, FeatureState } from '@grafana/data';
|
import {
|
||||||
|
NavModel,
|
||||||
|
SelectableValue,
|
||||||
|
FeatureState,
|
||||||
|
LiveChannelScope,
|
||||||
|
LiveChannelConfig,
|
||||||
|
LiveChannelSupport,
|
||||||
|
} from '@grafana/data';
|
||||||
import { LivePanel } from './LivePanel';
|
import { LivePanel } from './LivePanel';
|
||||||
import { Select, Input, Button, FeatureInfoBox, Container } from '@grafana/ui';
|
import { Select, FeatureInfoBox, Container } from '@grafana/ui';
|
||||||
import { getGrafanaLiveSrv } from '@grafana/runtime';
|
import { getGrafanaLiveCentrifugeSrv } from '../live/live';
|
||||||
|
|
||||||
interface Props {
|
interface Props {
|
||||||
navModel: NavModel;
|
navModel: NavModel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const scopes: Array<SelectableValue<LiveChannelScope>> = [
|
||||||
|
{ label: 'Grafana', value: LiveChannelScope.Grafana, description: 'Core grafana live features' },
|
||||||
|
{ label: 'Data Sources', value: LiveChannelScope.DataSource, description: 'Data sources with live support' },
|
||||||
|
{ label: 'Plugins', value: LiveChannelScope.Plugin, description: 'Plugins with live support' },
|
||||||
|
];
|
||||||
|
|
||||||
interface State {
|
interface State {
|
||||||
channel: string;
|
scope: LiveChannelScope;
|
||||||
text: string;
|
namespace?: string;
|
||||||
|
path?: string;
|
||||||
|
|
||||||
|
namespaces: Array<SelectableValue<string>>;
|
||||||
|
paths: Array<SelectableValue<string>>;
|
||||||
|
support?: LiveChannelSupport;
|
||||||
|
config?: LiveChannelConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class LiveAdmin extends PureComponent<Props, State> {
|
export class LiveAdmin extends PureComponent<Props, State> {
|
||||||
state: State = {
|
state: State = {
|
||||||
channel: 'random-2s-stream',
|
scope: LiveChannelScope.Grafana,
|
||||||
text: '', // publish text to a channel
|
namespace: 'testdata',
|
||||||
|
path: 'random-2s-stream',
|
||||||
|
namespaces: [],
|
||||||
|
paths: [],
|
||||||
};
|
};
|
||||||
|
// onTextChanged: ((event: FormEvent<HTMLInputElement>) => void) | undefined;
|
||||||
|
// onPublish: ((event: MouseEvent<HTMLButtonElement, MouseEvent>) => void) | undefined;
|
||||||
|
|
||||||
onChannelChanged = (v: SelectableValue<string>) => {
|
async componentDidMount() {
|
||||||
|
const { scope, namespace, path } = this.state;
|
||||||
|
const srv = getGrafanaLiveCentrifugeSrv();
|
||||||
|
const namespaces = await srv.scopes[scope].listNamespaces();
|
||||||
|
const support = namespace ? await srv.scopes[scope].getChannelSupport(namespace) : undefined;
|
||||||
|
const paths = support ? await support.getSupportedPaths() : undefined;
|
||||||
|
const config = support && path ? await support.getChannelConfig(path) : undefined;
|
||||||
|
|
||||||
|
this.setState({
|
||||||
|
namespaces,
|
||||||
|
support,
|
||||||
|
paths: paths
|
||||||
|
? paths.map(p => ({
|
||||||
|
label: p.path,
|
||||||
|
value: p.path,
|
||||||
|
description: p.description,
|
||||||
|
}))
|
||||||
|
: [],
|
||||||
|
config,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
onScopeChanged = async (v: SelectableValue<LiveChannelScope>) => {
|
||||||
if (v.value) {
|
if (v.value) {
|
||||||
|
const srv = getGrafanaLiveCentrifugeSrv();
|
||||||
|
|
||||||
this.setState({
|
this.setState({
|
||||||
channel: v.value,
|
scope: v.value,
|
||||||
|
namespace: undefined,
|
||||||
|
path: undefined,
|
||||||
|
namespaces: await srv.scopes[v.value!].listNamespaces(),
|
||||||
|
paths: [],
|
||||||
|
support: undefined,
|
||||||
|
config: undefined,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
onTextChanged = (event: React.ChangeEvent<HTMLInputElement>) => {
|
onNamespaceChanged = async (v: SelectableValue<string>) => {
|
||||||
this.setState({ text: event.target.value });
|
if (v.value) {
|
||||||
};
|
const namespace = v.value;
|
||||||
|
const srv = getGrafanaLiveCentrifugeSrv();
|
||||||
|
const support = await srv.scopes[this.state.scope].getChannelSupport(namespace);
|
||||||
|
|
||||||
onPublish = () => {
|
this.setState({
|
||||||
const { text, channel } = this.state;
|
namespace: v.value,
|
||||||
if (text) {
|
paths: support!.getSupportedPaths().map(p => ({
|
||||||
const msg = {
|
label: p.path,
|
||||||
line: text,
|
value: p.path,
|
||||||
};
|
description: p.description,
|
||||||
|
})),
|
||||||
const srv = getGrafanaLiveSrv();
|
path: undefined,
|
||||||
srv.publish(channel, msg).then(v => {
|
config: undefined,
|
||||||
console.log('PUBLISHED', text, v);
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
onPathChanged = async (v: SelectableValue<string>) => {
|
||||||
|
if (v.value) {
|
||||||
|
const path = v.value;
|
||||||
|
const srv = getGrafanaLiveCentrifugeSrv();
|
||||||
|
const support = await srv.scopes[this.state.scope].getChannelSupport(this.state.namespace!);
|
||||||
|
if (!support) {
|
||||||
|
this.setState({
|
||||||
|
namespace: undefined,
|
||||||
|
paths: [],
|
||||||
|
config: undefined,
|
||||||
|
support,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.setState({
|
||||||
|
path,
|
||||||
|
support,
|
||||||
|
config: support.getChannelConfig(path),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
this.setState({ text: '' });
|
|
||||||
};
|
};
|
||||||
|
|
||||||
render() {
|
render() {
|
||||||
const { navModel } = this.props;
|
const { navModel } = this.props;
|
||||||
const { channel, text } = this.state;
|
const { scope, namespace, namespaces, path, paths, config } = this.state;
|
||||||
|
|
||||||
const channels: Array<SelectableValue<string>> = [
|
|
||||||
{
|
|
||||||
label: 'random-2s-stream',
|
|
||||||
value: 'random-2s-stream',
|
|
||||||
description: 'Random stream that updates every 2s',
|
|
||||||
},
|
|
||||||
{
|
|
||||||
label: 'random-flakey-stream',
|
|
||||||
value: 'random-flakey-stream',
|
|
||||||
description: 'Random stream with intermittent updates',
|
|
||||||
},
|
|
||||||
{
|
|
||||||
label: 'example-chat',
|
|
||||||
value: 'example-chat',
|
|
||||||
description: 'A channel that expects chat messages',
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let current = channels.find(f => f.value === channel);
|
|
||||||
if (!current) {
|
|
||||||
current = {
|
|
||||||
label: channel,
|
|
||||||
value: channel,
|
|
||||||
};
|
|
||||||
channels.push(current);
|
|
||||||
}
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<Page navModel={navModel}>
|
<Page navModel={navModel}>
|
||||||
@@ -99,19 +151,36 @@ export class LiveAdmin extends PureComponent<Props, State> {
|
|||||||
<br />
|
<br />
|
||||||
</Container>
|
</Container>
|
||||||
|
|
||||||
<h2>Channels</h2>
|
<div
|
||||||
<Select options={channels} value={current} onChange={this.onChannelChanged} allowCustomValue={true} />
|
className={css`
|
||||||
<br />
|
width: 100%;
|
||||||
|
display: flex;
|
||||||
<LivePanel channel={channel} />
|
> div {
|
||||||
|
margin-right: 8px;
|
||||||
|
min-width: 150px;
|
||||||
|
}
|
||||||
|
`}
|
||||||
|
>
|
||||||
|
<div>
|
||||||
|
<h5>Scope</h5>
|
||||||
|
<Select options={scopes} value={scopes.find(s => s.value === scope)} onChange={this.onScopeChanged} />
|
||||||
|
</div>
|
||||||
|
<div>
|
||||||
|
<h5>Namespace</h5>
|
||||||
|
<Select
|
||||||
|
options={namespaces}
|
||||||
|
value={namespaces.find(s => s.value === namespace) || ''}
|
||||||
|
onChange={this.onNamespaceChanged}
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
<div>
|
||||||
|
<h5>Path</h5>
|
||||||
|
<Select options={paths} value={paths.find(s => s.value === path) || ''} onChange={this.onPathChanged} />
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
<br />
|
<br />
|
||||||
<br />
|
<br />
|
||||||
<h3>Write to channel</h3>
|
{scope && namespace && path && <LivePanel scope={scope} namespace={namespace} path={path} config={config} />}
|
||||||
<Input value={text} onChange={this.onTextChanged} />
|
|
||||||
<Button onClick={this.onPublish} variant={text ? 'primary' : 'secondary'}>
|
|
||||||
Publish
|
|
||||||
</Button>
|
|
||||||
</Page.Contents>
|
</Page.Contents>
|
||||||
</Page>
|
</Page>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,52 +1,71 @@
|
|||||||
import React, { PureComponent } from 'react';
|
import React, { PureComponent } from 'react';
|
||||||
import { Unsubscribable, PartialObserver } from 'rxjs';
|
import { Unsubscribable, PartialObserver } from 'rxjs';
|
||||||
import { getGrafanaLiveSrv } from '@grafana/runtime';
|
import { getGrafanaLiveSrv } from '@grafana/runtime';
|
||||||
|
import {
|
||||||
|
AppEvents,
|
||||||
|
LiveChannel,
|
||||||
|
LiveChannelConfig,
|
||||||
|
LiveChannelConnectionState,
|
||||||
|
LiveChannelMessage,
|
||||||
|
LiveChannelScope,
|
||||||
|
LiveChannelStatus,
|
||||||
|
} from '@grafana/data';
|
||||||
|
import { Input, Button } from '@grafana/ui';
|
||||||
|
import { appEvents } from 'app/core/core';
|
||||||
|
|
||||||
interface Props {
|
interface Props {
|
||||||
channel: string;
|
scope: LiveChannelScope;
|
||||||
|
namespace: string;
|
||||||
|
path: string;
|
||||||
|
config?: LiveChannelConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface State {
|
interface State {
|
||||||
connected: boolean;
|
channel?: LiveChannel;
|
||||||
|
status: LiveChannelStatus;
|
||||||
count: number;
|
count: number;
|
||||||
lastTime: number;
|
lastTime: number;
|
||||||
lastBody: string;
|
lastBody: string;
|
||||||
|
text: string; // for publish!
|
||||||
}
|
}
|
||||||
|
|
||||||
export class LivePanel extends PureComponent<Props, State> {
|
export class LivePanel extends PureComponent<Props, State> {
|
||||||
state: State = {
|
state: State = {
|
||||||
connected: false,
|
status: { id: '?', state: LiveChannelConnectionState.Pending, timestamp: Date.now() },
|
||||||
count: 0,
|
count: 0,
|
||||||
lastTime: 0,
|
lastTime: 0,
|
||||||
lastBody: '',
|
lastBody: '',
|
||||||
|
text: '',
|
||||||
};
|
};
|
||||||
subscription?: Unsubscribable;
|
subscription?: Unsubscribable;
|
||||||
|
|
||||||
observer: PartialObserver<any> = {
|
streamObserver: PartialObserver<LiveChannelMessage> = {
|
||||||
next: (msg: any) => {
|
next: (msg: LiveChannelMessage) => {
|
||||||
this.setState({
|
if (msg.type === 'status') {
|
||||||
count: this.state.count + 1,
|
this.setState({ status: msg.message as LiveChannelStatus });
|
||||||
lastTime: Date.now(),
|
} else {
|
||||||
lastBody: JSON.stringify(msg),
|
this.setState({
|
||||||
});
|
count: this.state.count + 1,
|
||||||
|
lastTime: Date.now(),
|
||||||
|
lastBody: JSON.stringify(msg),
|
||||||
|
});
|
||||||
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
startSubscription = () => {
|
startSubscription = () => {
|
||||||
if (this.subscription) {
|
const { scope, namespace, path } = this.props;
|
||||||
this.subscription.unsubscribe();
|
const channel = getGrafanaLiveSrv().getChannel(scope, namespace, path);
|
||||||
this.subscription = undefined;
|
if (this.state.channel === channel) {
|
||||||
|
return; // no change!
|
||||||
}
|
}
|
||||||
|
|
||||||
const srv = getGrafanaLiveSrv();
|
if (this.subscription) {
|
||||||
if (srv.isConnected()) {
|
this.subscription.unsubscribe();
|
||||||
const stream = srv.getChannelStream(this.props.channel);
|
|
||||||
this.subscription = stream.subscribe(this.observer);
|
|
||||||
this.setState({ connected: true, count: 0, lastTime: 0, lastBody: '' });
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
console.log('Not yet connected... try again...');
|
|
||||||
setTimeout(this.startSubscription, 200);
|
this.subscription = channel.getStream().subscribe(this.streamObserver);
|
||||||
|
this.setState({ channel });
|
||||||
};
|
};
|
||||||
|
|
||||||
componentDidMount = () => {
|
componentDidMount = () => {
|
||||||
@@ -56,21 +75,47 @@ export class LivePanel extends PureComponent<Props, State> {
|
|||||||
componentWillUnmount() {
|
componentWillUnmount() {
|
||||||
if (this.subscription) {
|
if (this.subscription) {
|
||||||
this.subscription.unsubscribe();
|
this.subscription.unsubscribe();
|
||||||
this.subscription = undefined;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
componentDidUpdate(oldProps: Props) {
|
componentDidUpdate(oldProps: Props) {
|
||||||
if (oldProps.channel !== this.props.channel) {
|
if (oldProps.config !== this.props.config) {
|
||||||
this.startSubscription();
|
this.startSubscription();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onTextChanged = (event: React.ChangeEvent<HTMLInputElement>) => {
|
||||||
|
this.setState({ text: event.target.value });
|
||||||
|
};
|
||||||
|
|
||||||
|
onPublish = () => {
|
||||||
|
const { text, channel } = this.state;
|
||||||
|
if (text && channel) {
|
||||||
|
const msg = {
|
||||||
|
line: text,
|
||||||
|
};
|
||||||
|
|
||||||
|
channel.publish!(msg)
|
||||||
|
.then(v => {
|
||||||
|
console.log('PUBLISHED', text, v);
|
||||||
|
})
|
||||||
|
.catch(err => {
|
||||||
|
appEvents.emit(AppEvents.alertError, ['Publish error', `${err}`]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.setState({ text: '' });
|
||||||
|
};
|
||||||
|
|
||||||
render() {
|
render() {
|
||||||
const { lastBody, lastTime, count } = this.state;
|
const { lastBody, lastTime, count, status, text } = this.state;
|
||||||
|
const { config } = this.props;
|
||||||
|
const showPublish = config && config.canPublish && config.canPublish();
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div>
|
<div>
|
||||||
|
<h5>Status: {config ? '' : '(no config)'}</h5>
|
||||||
|
<pre>{JSON.stringify(status)}</pre>
|
||||||
|
|
||||||
<h5>Count: {count}</h5>
|
<h5>Count: {count}</h5>
|
||||||
{lastTime > 0 && (
|
{lastTime > 0 && (
|
||||||
<>
|
<>
|
||||||
@@ -82,6 +127,16 @@ export class LivePanel extends PureComponent<Props, State> {
|
|||||||
)}
|
)}
|
||||||
</>
|
</>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
|
{showPublish && (
|
||||||
|
<div>
|
||||||
|
<h3>Write to channel</h3>
|
||||||
|
<Input value={text} onChange={this.onTextChanged} />
|
||||||
|
<Button onClick={this.onPublish} variant={text ? 'primary' : 'secondary'}>
|
||||||
|
Publish
|
||||||
|
</Button>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
209
public/app/features/live/channel.ts
Normal file
209
public/app/features/live/channel.ts
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
import {
|
||||||
|
LiveChannelConfig,
|
||||||
|
LiveChannel,
|
||||||
|
LiveChannelScope,
|
||||||
|
LiveChannelStatus,
|
||||||
|
LiveChannelPresense,
|
||||||
|
LiveChannelJoinLeave,
|
||||||
|
LiveChannelMessage,
|
||||||
|
LiveChannelConnectionState,
|
||||||
|
} from '@grafana/data';
|
||||||
|
import Centrifuge, {
|
||||||
|
JoinLeaveContext,
|
||||||
|
PublicationContext,
|
||||||
|
SubscribeErrorContext,
|
||||||
|
SubscribeSuccessContext,
|
||||||
|
SubscriptionEvents,
|
||||||
|
UnsubscribeContext,
|
||||||
|
} from 'centrifuge/dist/centrifuge.protobuf';
|
||||||
|
import { Subject, of, merge } from 'rxjs';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal class that maps Centrifuge support to GrafanaLive
|
||||||
|
*/
|
||||||
|
export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements LiveChannel<TMessage, TPublish> {
|
||||||
|
readonly currentStatus: LiveChannelStatus;
|
||||||
|
|
||||||
|
readonly opened = Date.now();
|
||||||
|
readonly id: string;
|
||||||
|
readonly scope: LiveChannelScope;
|
||||||
|
readonly namespace: string;
|
||||||
|
readonly path: string;
|
||||||
|
|
||||||
|
readonly stream = new Subject<LiveChannelMessage<TMessage>>();
|
||||||
|
|
||||||
|
// When presense is enabled (rarely), this will be initalized
|
||||||
|
private presense?: Subject<LiveChannelPresense>;
|
||||||
|
|
||||||
|
/** Static definition of the channel definition. This may describe the channel usage */
|
||||||
|
config?: LiveChannelConfig;
|
||||||
|
subscription?: Centrifuge.Subscription;
|
||||||
|
shutdownCallback?: () => void;
|
||||||
|
|
||||||
|
constructor(id: string, scope: LiveChannelScope, namespace: string, path: string) {
|
||||||
|
this.id = id;
|
||||||
|
this.scope = scope;
|
||||||
|
this.namespace = namespace;
|
||||||
|
this.path = path;
|
||||||
|
this.currentStatus = {
|
||||||
|
id,
|
||||||
|
timestamp: this.opened,
|
||||||
|
state: LiveChannelConnectionState.Pending,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should only be called when centrifuge is connected
|
||||||
|
initalize(config: LiveChannelConfig): SubscriptionEvents {
|
||||||
|
if (this.config) {
|
||||||
|
throw new Error('Channel already initalized: ' + this.id);
|
||||||
|
}
|
||||||
|
this.config = config;
|
||||||
|
const prepare = config.processMessage ? config.processMessage : (v: any) => v;
|
||||||
|
|
||||||
|
const events: SubscriptionEvents = {
|
||||||
|
// This means a message was recieved from the server
|
||||||
|
publish: (ctx: PublicationContext) => {
|
||||||
|
this.stream.next(prepare(ctx.data));
|
||||||
|
|
||||||
|
// Clear any error messages
|
||||||
|
if (this.currentStatus.error) {
|
||||||
|
this.currentStatus.timestamp = Date.now();
|
||||||
|
delete this.currentStatus.error;
|
||||||
|
this.sendStatus();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
error: (ctx: SubscribeErrorContext) => {
|
||||||
|
this.currentStatus.timestamp = Date.now();
|
||||||
|
this.currentStatus.error = ctx.error;
|
||||||
|
this.sendStatus();
|
||||||
|
},
|
||||||
|
subscribe: (ctx: SubscribeSuccessContext) => {
|
||||||
|
this.currentStatus.timestamp = Date.now();
|
||||||
|
this.currentStatus.state = LiveChannelConnectionState.Connected;
|
||||||
|
this.sendStatus();
|
||||||
|
},
|
||||||
|
unsubscribe: (ctx: UnsubscribeContext) => {
|
||||||
|
this.currentStatus.timestamp = Date.now();
|
||||||
|
this.currentStatus.state = LiveChannelConnectionState.Disconnected;
|
||||||
|
this.sendStatus();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if (config.hasPresense) {
|
||||||
|
events.join = (ctx: JoinLeaveContext) => {
|
||||||
|
const message: LiveChannelJoinLeave = {
|
||||||
|
user: ctx.info.user,
|
||||||
|
};
|
||||||
|
this.stream.next({
|
||||||
|
type: 'join',
|
||||||
|
message,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
events.leave = (ctx: JoinLeaveContext) => {
|
||||||
|
const message: LiveChannelJoinLeave = {
|
||||||
|
user: ctx.info.user,
|
||||||
|
};
|
||||||
|
this.stream.next({
|
||||||
|
type: 'leave',
|
||||||
|
message,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
this.getPresense = () => {
|
||||||
|
return this.subscription!.presence().then(v => {
|
||||||
|
return {
|
||||||
|
users: Object.keys(v.presence),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sendStatus() {
|
||||||
|
this.stream.next({ type: 'status', message: { ...this.currentStatus } });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the stream of events and
|
||||||
|
*/
|
||||||
|
getStream() {
|
||||||
|
const status: LiveChannelMessage<TMessage> = { type: 'status', message: { ...this.currentStatus } };
|
||||||
|
return merge(of(status), this.stream.asObservable());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is configured by the server when the config supports presense
|
||||||
|
*/
|
||||||
|
getPresense?: () => Promise<LiveChannelPresense>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is configured by the server when config supports writing
|
||||||
|
*/
|
||||||
|
publish?: (msg: TPublish) => Promise<any>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will close and terminate all streams for this channel
|
||||||
|
*/
|
||||||
|
disconnect() {
|
||||||
|
this.currentStatus.state = LiveChannelConnectionState.Shutdown;
|
||||||
|
this.currentStatus.timestamp = Date.now();
|
||||||
|
|
||||||
|
if (this.subscription) {
|
||||||
|
this.subscription.unsubscribe();
|
||||||
|
this.subscription.removeAllListeners(); // they keep all listeners attached after unsubscribe
|
||||||
|
this.subscription = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.stream.complete();
|
||||||
|
|
||||||
|
if (this.presense) {
|
||||||
|
this.presense.complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.stream.next({ type: 'status', message: { ...this.currentStatus } });
|
||||||
|
this.stream.complete();
|
||||||
|
|
||||||
|
if (this.shutdownCallback) {
|
||||||
|
this.shutdownCallback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdownWithError(err: string) {
|
||||||
|
this.currentStatus.error = err;
|
||||||
|
this.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getErrorChannel(
|
||||||
|
msg: string,
|
||||||
|
id: string,
|
||||||
|
scope: LiveChannelScope,
|
||||||
|
namespace: string,
|
||||||
|
path: string
|
||||||
|
): LiveChannel {
|
||||||
|
const errorStatus: LiveChannelStatus = {
|
||||||
|
id,
|
||||||
|
timestamp: Date.now(),
|
||||||
|
state: LiveChannelConnectionState.Invalid,
|
||||||
|
error: msg,
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
opened: Date.now(),
|
||||||
|
scope,
|
||||||
|
namespace,
|
||||||
|
path,
|
||||||
|
|
||||||
|
// return an error
|
||||||
|
getStream: () =>
|
||||||
|
of({
|
||||||
|
type: 'status',
|
||||||
|
message: errorStatus,
|
||||||
|
}),
|
||||||
|
|
||||||
|
// already disconnected
|
||||||
|
disconnect: () => {},
|
||||||
|
};
|
||||||
|
}
|
||||||
47
public/app/features/live/features.ts
Normal file
47
public/app/features/live/features.ts
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import { LiveChannelConfig } from '@grafana/data';
|
||||||
|
import { grafanaLiveCoreFeatures } from './scopes';
|
||||||
|
|
||||||
|
export function registerLiveFeatures() {
|
||||||
|
const channels = [
|
||||||
|
{
|
||||||
|
path: 'random-2s-stream',
|
||||||
|
description: 'Random stream with points every 2s',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
path: 'random-flakey-stream',
|
||||||
|
description: 'Random stream with flakey data points',
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
grafanaLiveCoreFeatures.register(
|
||||||
|
'testdata',
|
||||||
|
{
|
||||||
|
getChannelConfig: (path: string) => {
|
||||||
|
return channels.find(c => c.path === path);
|
||||||
|
},
|
||||||
|
getSupportedPaths: () => channels,
|
||||||
|
},
|
||||||
|
'Test data generations'
|
||||||
|
);
|
||||||
|
|
||||||
|
const chatConfig: LiveChannelConfig = {
|
||||||
|
path: 'chat',
|
||||||
|
description: 'Broadcast text messages to a channel',
|
||||||
|
canPublish: () => true,
|
||||||
|
hasPresense: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
grafanaLiveCoreFeatures.register(
|
||||||
|
'experimental',
|
||||||
|
{
|
||||||
|
getChannelConfig: (path: string) => {
|
||||||
|
if ('chat' === path) {
|
||||||
|
return chatConfig;
|
||||||
|
}
|
||||||
|
throw new Error('invalid path: ' + path);
|
||||||
|
},
|
||||||
|
getSupportedPaths: () => [chatConfig],
|
||||||
|
},
|
||||||
|
'Experimental features'
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -1,26 +1,24 @@
|
|||||||
import Centrifuge, {
|
import Centrifuge from 'centrifuge/dist/centrifuge.protobuf';
|
||||||
PublicationContext,
|
|
||||||
SubscriptionEvents,
|
|
||||||
SubscribeSuccessContext,
|
|
||||||
UnsubscribeContext,
|
|
||||||
JoinLeaveContext,
|
|
||||||
SubscribeErrorContext,
|
|
||||||
} from 'centrifuge/dist/centrifuge.protobuf';
|
|
||||||
import SockJS from 'sockjs-client';
|
import SockJS from 'sockjs-client';
|
||||||
import { GrafanaLiveSrv, setGrafanaLiveSrv, ChannelHandler, config } from '@grafana/runtime';
|
import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime';
|
||||||
import { Observable, Subject, BehaviorSubject } from 'rxjs';
|
import { BehaviorSubject } from 'rxjs';
|
||||||
import { KeyValue } from '@grafana/data';
|
import { LiveChannel, LiveChannelScope } from '@grafana/data';
|
||||||
|
import { CentrifugeLiveChannel, getErrorChannel } from './channel';
|
||||||
|
import {
|
||||||
|
GrafanaLiveScope,
|
||||||
|
grafanaLiveCoreFeatures,
|
||||||
|
GrafanaLiveDataSourceScope,
|
||||||
|
GrafanaLivePluginScope,
|
||||||
|
} from './scopes';
|
||||||
|
import { registerLiveFeatures } from './features';
|
||||||
|
|
||||||
interface Channel<T = any> {
|
export class CentrifugeSrv implements GrafanaLiveSrv {
|
||||||
subject: Subject<T>;
|
readonly open = new Map<string, CentrifugeLiveChannel>();
|
||||||
subscription?: Centrifuge.Subscription;
|
|
||||||
}
|
|
||||||
|
|
||||||
class CentrifugeSrv implements GrafanaLiveSrv {
|
readonly centrifuge: Centrifuge;
|
||||||
centrifuge: Centrifuge;
|
readonly connectionState: BehaviorSubject<boolean>;
|
||||||
channels: KeyValue<Channel> = {};
|
readonly connectionBlocker: Promise<void>;
|
||||||
connectionState: BehaviorSubject<boolean>;
|
readonly scopes: Record<LiveChannelScope, GrafanaLiveScope>;
|
||||||
standardCallbacks: SubscriptionEvents;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.centrifuge = new Centrifuge(`${config.appUrl}live/sockjs`, {
|
this.centrifuge = new Centrifuge(`${config.appUrl}live/sockjs`, {
|
||||||
@@ -29,19 +27,27 @@ class CentrifugeSrv implements GrafanaLiveSrv {
|
|||||||
});
|
});
|
||||||
this.centrifuge.connect(); // do connection
|
this.centrifuge.connect(); // do connection
|
||||||
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
|
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
|
||||||
|
this.connectionBlocker = new Promise<void>(resolve => {
|
||||||
|
if (this.centrifuge.isConnected()) {
|
||||||
|
return resolve();
|
||||||
|
}
|
||||||
|
const connectListener = () => {
|
||||||
|
resolve();
|
||||||
|
this.centrifuge.removeListener('connect', connectListener);
|
||||||
|
};
|
||||||
|
this.centrifuge.addListener('connect', connectListener);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.scopes = {
|
||||||
|
[LiveChannelScope.Grafana]: grafanaLiveCoreFeatures,
|
||||||
|
[LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(),
|
||||||
|
[LiveChannelScope.Plugin]: new GrafanaLivePluginScope(),
|
||||||
|
};
|
||||||
|
|
||||||
// Register global listeners
|
// Register global listeners
|
||||||
this.centrifuge.on('connect', this.onConnect);
|
this.centrifuge.on('connect', this.onConnect);
|
||||||
this.centrifuge.on('disconnect', this.onDisconnect);
|
this.centrifuge.on('disconnect', this.onDisconnect);
|
||||||
this.centrifuge.on('publish', this.onServerSideMessage);
|
this.centrifuge.on('publish', this.onServerSideMessage);
|
||||||
|
|
||||||
this.standardCallbacks = {
|
|
||||||
subscribe: this.onSubscribe,
|
|
||||||
unsubscribe: this.onUnsubscribe,
|
|
||||||
join: this.onJoin,
|
|
||||||
leave: this.onLeave,
|
|
||||||
error: this.onError,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------------------------
|
//----------------------------------------------------------
|
||||||
@@ -62,38 +68,61 @@ class CentrifugeSrv implements GrafanaLiveSrv {
|
|||||||
console.log('Publication from server-side channel', context);
|
console.log('Publication from server-side channel', context);
|
||||||
};
|
};
|
||||||
|
|
||||||
//----------------------------------------------------------
|
/**
|
||||||
// Channel functions
|
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
|
||||||
//----------------------------------------------------------
|
* channel will be returned with an error state indicated in its status
|
||||||
|
*/
|
||||||
|
getChannel<TMessage, TPublish>(
|
||||||
|
scopeId: LiveChannelScope,
|
||||||
|
namespace: string,
|
||||||
|
path: string
|
||||||
|
): LiveChannel<TMessage, TPublish> {
|
||||||
|
const id = `${scopeId}/${namespace}/${path}`;
|
||||||
|
let channel = this.open.get(id);
|
||||||
|
if (channel != null) {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
// export interface SubscriptionEvents {
|
const scope = this.scopes[scopeId];
|
||||||
// publish?: (ctx: PublicationContext) => void;
|
if (!scope) {
|
||||||
// join?: (ctx: JoinLeaveContext) => void;
|
return getErrorChannel('invalid scope', id, scopeId, namespace, path);
|
||||||
// leave?: (ctx: JoinLeaveContex) => void;
|
}
|
||||||
// subscribe?: (ctx: SubscribeSuccessContext) => void;
|
|
||||||
// error?: (ctx: SubscribeErrorContext) => void;
|
|
||||||
// unsubscribe?: (ctx: UnsubscribeContext) => void;
|
|
||||||
// }
|
|
||||||
|
|
||||||
onSubscribe = (context: SubscribeSuccessContext) => {
|
channel = new CentrifugeLiveChannel(id, scopeId, namespace, path);
|
||||||
console.log('onSubscribe', context);
|
channel.shutdownCallback = () => {
|
||||||
};
|
this.open.delete(id); // remove it from the list of open channels
|
||||||
|
};
|
||||||
|
this.open.set(id, channel);
|
||||||
|
|
||||||
onUnsubscribe = (context: UnsubscribeContext) => {
|
// Initalize the channel in the bacground
|
||||||
console.log('onUnsubscribe', context);
|
this.initChannel(scope, channel).catch(err => {
|
||||||
};
|
channel?.shutdownWithError(err);
|
||||||
|
this.open.delete(id);
|
||||||
|
});
|
||||||
|
|
||||||
onJoin = (context: JoinLeaveContext) => {
|
// return the not-yet initalized channel
|
||||||
console.log('onJoin', context);
|
return channel;
|
||||||
};
|
}
|
||||||
|
|
||||||
onLeave = (context: JoinLeaveContext) => {
|
private async initChannel(scope: GrafanaLiveScope, channel: CentrifugeLiveChannel): Promise<void> {
|
||||||
console.log('onLeave', context);
|
const support = await scope.getChannelSupport(channel.namespace);
|
||||||
};
|
if (!support) {
|
||||||
|
throw new Error(channel.namespace + 'does not support streaming');
|
||||||
onError = (context: SubscribeErrorContext) => {
|
}
|
||||||
console.log('onError', context);
|
const config = support.getChannelConfig(channel.path);
|
||||||
};
|
if (!config) {
|
||||||
|
throw new Error('unknown path: ' + channel.path);
|
||||||
|
}
|
||||||
|
const events = channel.initalize(config);
|
||||||
|
if (!this.centrifuge.isConnected()) {
|
||||||
|
await this.connectionBlocker;
|
||||||
|
}
|
||||||
|
if (config.canPublish && config.canPublish()) {
|
||||||
|
channel.publish = (data: any) => this.centrifuge.publish(channel.id, data);
|
||||||
|
}
|
||||||
|
channel.subscription = this.centrifuge.subscribe(channel.id, events);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
//----------------------------------------------------------
|
//----------------------------------------------------------
|
||||||
// Exported functions
|
// Exported functions
|
||||||
@@ -112,52 +141,13 @@ class CentrifugeSrv implements GrafanaLiveSrv {
|
|||||||
getConnectionState() {
|
getConnectionState() {
|
||||||
return this.connectionState.asObservable();
|
return this.connectionState.asObservable();
|
||||||
}
|
}
|
||||||
|
|
||||||
initChannel<T>(path: string, handler: ChannelHandler<T>) {
|
|
||||||
if (this.channels[path]) {
|
|
||||||
console.log('Already connected to:', path);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const c: Channel = {
|
|
||||||
subject: new Subject<T>(),
|
|
||||||
};
|
|
||||||
this.channels[path] = c;
|
|
||||||
|
|
||||||
console.log('initChannel', this.centrifuge.isConnected(), path, handler);
|
|
||||||
const callbacks: SubscriptionEvents = {
|
|
||||||
...this.standardCallbacks,
|
|
||||||
publish: (ctx: PublicationContext) => {
|
|
||||||
// console.log('GOT', JSON.stringify(ctx.data), ctx);
|
|
||||||
const v = handler.onPublish(ctx.data);
|
|
||||||
c.subject.next(v);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
c.subscription = this.centrifuge.subscribe(path, callbacks);
|
|
||||||
}
|
|
||||||
|
|
||||||
getChannelStream<T>(path: string): Observable<T> {
|
|
||||||
let c = this.channels[path];
|
|
||||||
if (!c) {
|
|
||||||
this.initChannel(path, noopChannelHandler);
|
|
||||||
c = this.channels[path];
|
|
||||||
}
|
|
||||||
return c!.subject.asObservable();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send data to a channel. This feature is disabled for most channels and will return an error
|
|
||||||
*/
|
|
||||||
publish<T>(channel: string, data: any): Promise<T> {
|
|
||||||
return this.centrifuge.publish(channel, data);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const noopChannelHandler: ChannelHandler = {
|
export function getGrafanaLiveCentrifugeSrv() {
|
||||||
onPublish: (v: any) => {
|
return getGrafanaLiveSrv() as CentrifugeSrv;
|
||||||
return v; // Just pass the object along
|
}
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
export function initGrafanaLive() {
|
export function initGrafanaLive() {
|
||||||
setGrafanaLiveSrv(new CentrifugeSrv());
|
setGrafanaLiveSrv(new CentrifugeSrv());
|
||||||
|
registerLiveFeatures();
|
||||||
}
|
}
|
||||||
|
|||||||
146
public/app/features/live/scopes.ts
Normal file
146
public/app/features/live/scopes.ts
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
|
||||||
|
import { getDataSourceSrv } from '@grafana/runtime';
|
||||||
|
import { config } from 'app/core/config';
|
||||||
|
import { loadPlugin } from '../plugins/PluginPage';
|
||||||
|
|
||||||
|
export abstract class GrafanaLiveScope {
|
||||||
|
constructor(protected scope: LiveChannelScope) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the real namespaces
|
||||||
|
*/
|
||||||
|
abstract async getChannelSupport(namespace: string): Promise<LiveChannelSupport | undefined>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List the possible values within this scope
|
||||||
|
*/
|
||||||
|
abstract async listNamespaces(): Promise<Array<SelectableValue<string>>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GrafanaLiveCoreScope extends GrafanaLiveScope {
|
||||||
|
readonly features = new Map<string, LiveChannelSupport>();
|
||||||
|
readonly namespaces: Array<SelectableValue<string>> = [];
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super(LiveChannelScope.Grafana);
|
||||||
|
}
|
||||||
|
|
||||||
|
register(feature: string, support: LiveChannelSupport, description: string): GrafanaLiveCoreScope {
|
||||||
|
this.features.set(feature, support);
|
||||||
|
this.namespaces.push({
|
||||||
|
value: feature,
|
||||||
|
label: feature,
|
||||||
|
description,
|
||||||
|
});
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the real namespaces
|
||||||
|
*/
|
||||||
|
async getChannelSupport(namespace: string) {
|
||||||
|
const v = this.features.get(namespace);
|
||||||
|
if (v) {
|
||||||
|
return Promise.resolve(v);
|
||||||
|
}
|
||||||
|
throw new Error('unknown feature: ' + namespace);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List the possible values within this scope
|
||||||
|
*/
|
||||||
|
listNamespaces() {
|
||||||
|
return Promise.resolve(this.namespaces);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
export const grafanaLiveCoreFeatures = new GrafanaLiveCoreScope();
|
||||||
|
|
||||||
|
export class GrafanaLiveDataSourceScope extends GrafanaLiveScope {
|
||||||
|
names?: Array<SelectableValue<string>>;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super(LiveChannelScope.DataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the real namespaces
|
||||||
|
*/
|
||||||
|
async getChannelSupport(namespace: string) {
|
||||||
|
const ds = await getDataSourceSrv().get(namespace);
|
||||||
|
return ds.channelSupport;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List the possible values within this scope
|
||||||
|
*/
|
||||||
|
async listNamespaces() {
|
||||||
|
if (this.names) {
|
||||||
|
return Promise.resolve(this.names);
|
||||||
|
}
|
||||||
|
const names: Array<SelectableValue<string>> = [];
|
||||||
|
for (const [key, ds] of Object.entries(config.datasources)) {
|
||||||
|
if (ds.meta.live) {
|
||||||
|
try {
|
||||||
|
const s = this.getChannelSupport(key); // ds.name or ID?
|
||||||
|
if (s) {
|
||||||
|
names.push({
|
||||||
|
label: ds.name,
|
||||||
|
value: ds.type,
|
||||||
|
description: ds.type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
err.isHandled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (this.names = names);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class GrafanaLivePluginScope extends GrafanaLiveScope {
|
||||||
|
names?: Array<SelectableValue<string>>;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super(LiveChannelScope.Plugin);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the real namespaces
|
||||||
|
*/
|
||||||
|
async getChannelSupport(namespace: string) {
|
||||||
|
const plugin = await loadPlugin(namespace);
|
||||||
|
if (!plugin.channelSupport) {
|
||||||
|
throw new Error('Unknown plugin: ' + namespace);
|
||||||
|
}
|
||||||
|
return plugin.channelSupport;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List the possible values within this scope
|
||||||
|
*/
|
||||||
|
async listNamespaces() {
|
||||||
|
if (this.names) {
|
||||||
|
return Promise.resolve(this.names);
|
||||||
|
}
|
||||||
|
const names: Array<SelectableValue<string>> = [];
|
||||||
|
// TODO add list to config
|
||||||
|
for (const [key, panel] of Object.entries(config.panels)) {
|
||||||
|
if (panel.live) {
|
||||||
|
try {
|
||||||
|
const s = this.getChannelSupport(key); // ds.name or ID?
|
||||||
|
if (s) {
|
||||||
|
names.push({
|
||||||
|
label: panel.name,
|
||||||
|
value: key,
|
||||||
|
description: panel.info?.description,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
err.isHandled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (this.names = names);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -42,7 +42,7 @@ export function getLoadingNav(): NavModel {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function loadPlugin(pluginId: string): Promise<GrafanaPlugin> {
|
export function loadPlugin(pluginId: string): Promise<GrafanaPlugin> {
|
||||||
return getPluginSettings(pluginId).then(info => {
|
return getPluginSettings(pluginId).then(info => {
|
||||||
if (info.type === PluginType.app) {
|
if (info.type === PluginType.app) {
|
||||||
return importAppPlugin(info);
|
return importAppPlugin(info);
|
||||||
|
|||||||
Reference in New Issue
Block a user