mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: Extract scopes
from CentrifugeSrv (#41051)
* Refactor: remove `scopes` from CentrifugeSrv, remove dependencies on window/config/user context * Refactor: add GrafanaLiveChannelConfigService to wrap direct access to *Scope classes * Refactor: added GrafanaLiveService acting like a proxy to GrafanaLiveSrv + LiveChannelConfigSrv * Refactor: live module instantiation * Refactor: import fixes * Fix: URL construction in centrifugeSrv
This commit is contained in:
parent
ff8f98170c
commit
e8a30f651e
248
public/app/features/live/centrifuge/service.ts
Normal file
248
public/app/features/live/centrifuge/service.ts
Normal file
@ -0,0 +1,248 @@
|
||||
import Centrifuge from 'centrifuge/dist/centrifuge';
|
||||
import { LiveDataStreamOptions, toDataQueryError } from '@grafana/runtime';
|
||||
import { BehaviorSubject, Observable } from 'rxjs';
|
||||
import {
|
||||
DataFrame,
|
||||
DataFrameJSON,
|
||||
dataFrameToJSON,
|
||||
DataQueryResponse,
|
||||
isLiveChannelMessageEvent,
|
||||
isLiveChannelStatusEvent,
|
||||
LiveChannelAddress,
|
||||
LiveChannelConfig,
|
||||
LiveChannelConnectionState,
|
||||
LiveChannelEvent,
|
||||
LiveChannelPresenceStatus,
|
||||
LoadingState,
|
||||
StreamingDataFrame,
|
||||
} from '@grafana/data';
|
||||
import { CentrifugeLiveChannel } from './channel';
|
||||
import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer';
|
||||
|
||||
type CentrifugeSrvDeps = {
|
||||
appUrl: string;
|
||||
orgId: number;
|
||||
orgRole: string;
|
||||
sessionId: string;
|
||||
liveEnabled: boolean;
|
||||
};
|
||||
|
||||
export class CentrifugeSrv {
|
||||
readonly open = new Map<string, CentrifugeLiveChannel>();
|
||||
readonly centrifuge: Centrifuge;
|
||||
readonly connectionState: BehaviorSubject<boolean>;
|
||||
readonly connectionBlocker: Promise<void>;
|
||||
|
||||
constructor(private deps: CentrifugeSrvDeps) {
|
||||
const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`;
|
||||
this.centrifuge = new Centrifuge(liveUrl, {});
|
||||
this.centrifuge.setConnectData({
|
||||
sessionId: deps.sessionId,
|
||||
orgId: deps.orgId,
|
||||
});
|
||||
// orgRole is set when logged in *or* anonomus users can use grafana
|
||||
if (deps.liveEnabled && deps.orgRole !== '') {
|
||||
this.centrifuge.connect(); // do connection
|
||||
}
|
||||
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
|
||||
this.connectionBlocker = new Promise<void>((resolve) => {
|
||||
if (this.centrifuge.isConnected()) {
|
||||
return resolve();
|
||||
}
|
||||
const connectListener = () => {
|
||||
resolve();
|
||||
this.centrifuge.removeListener('connect', connectListener);
|
||||
};
|
||||
this.centrifuge.addListener('connect', connectListener);
|
||||
});
|
||||
|
||||
// Register global listeners
|
||||
this.centrifuge.on('connect', this.onConnect);
|
||||
this.centrifuge.on('disconnect', this.onDisconnect);
|
||||
this.centrifuge.on('publish', this.onServerSideMessage);
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
// Internal functions
|
||||
//----------------------------------------------------------
|
||||
|
||||
onConnect = (context: any) => {
|
||||
this.connectionState.next(true);
|
||||
};
|
||||
|
||||
onDisconnect = (context: any) => {
|
||||
this.connectionState.next(false);
|
||||
};
|
||||
|
||||
onServerSideMessage = (context: any) => {
|
||||
console.log('Publication from server-side channel', context);
|
||||
};
|
||||
|
||||
/**
|
||||
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
|
||||
* channel will be returned with an error state indicated in its status
|
||||
*/
|
||||
getChannel<TMessage>(addr: LiveChannelAddress, config: LiveChannelConfig): CentrifugeLiveChannel<TMessage> {
|
||||
const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`;
|
||||
let channel = this.open.get(id);
|
||||
if (channel != null) {
|
||||
return channel;
|
||||
}
|
||||
|
||||
channel = new CentrifugeLiveChannel(id, addr);
|
||||
channel.shutdownCallback = () => {
|
||||
this.open.delete(id); // remove it from the list of open channels
|
||||
};
|
||||
this.open.set(id, channel);
|
||||
|
||||
// Initialize the channel in the background
|
||||
this.initChannel(config, channel).catch((err) => {
|
||||
if (channel) {
|
||||
channel.currentStatus.state = LiveChannelConnectionState.Invalid;
|
||||
channel.shutdownWithError(err);
|
||||
}
|
||||
this.open.delete(id);
|
||||
});
|
||||
|
||||
// return the not-yet initalized channel
|
||||
return channel;
|
||||
}
|
||||
|
||||
private async initChannel(config: LiveChannelConfig, channel: CentrifugeLiveChannel): Promise<void> {
|
||||
const events = channel.initalize(config);
|
||||
if (!this.centrifuge.isConnected()) {
|
||||
await this.connectionBlocker;
|
||||
}
|
||||
channel.subscription = this.centrifuge.subscribe(channel.id, events);
|
||||
return;
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
// Exported functions
|
||||
//----------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Listen for changes to the connection state
|
||||
*/
|
||||
getConnectionState() {
|
||||
return this.connectionState.asObservable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch for messages in a channel
|
||||
*/
|
||||
getStream<T>(address: LiveChannelAddress, config: LiveChannelConfig): Observable<LiveChannelEvent<T>> {
|
||||
return this.getChannel<T>(address, config).getStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a channel and return results as DataFrames
|
||||
*/
|
||||
getDataStream(options: LiveDataStreamOptions, config: LiveChannelConfig): Observable<DataQueryResponse> {
|
||||
return new Observable<DataQueryResponse>((subscriber) => {
|
||||
const channel = this.getChannel(options.addr, config);
|
||||
const key = options.key ?? `xstr/${streamCounter++}`;
|
||||
let data: StreamingDataFrame | undefined = undefined;
|
||||
let filtered: DataFrame | undefined = undefined;
|
||||
let state = LoadingState.Streaming;
|
||||
let last = liveTimer.lastUpdate;
|
||||
let lastWidth = -1;
|
||||
|
||||
const process = (msg: DataFrameJSON) => {
|
||||
if (!data) {
|
||||
data = new StreamingDataFrame(msg, options.buffer);
|
||||
} else {
|
||||
data.push(msg);
|
||||
}
|
||||
state = LoadingState.Streaming;
|
||||
const sameWidth = lastWidth === data.fields.length;
|
||||
lastWidth = data.fields.length;
|
||||
|
||||
// Filter out fields
|
||||
if (!filtered || msg.schema || !sameWidth) {
|
||||
filtered = data;
|
||||
if (options.filter) {
|
||||
const { fields } = options.filter;
|
||||
if (fields?.length) {
|
||||
filtered = {
|
||||
...data,
|
||||
fields: data.fields.filter((f) => fields.includes(f.name)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const elapsed = liveTimer.lastUpdate - last;
|
||||
if (elapsed > 1000 || liveTimer.ok) {
|
||||
filtered.length = data.length; // make sure they stay up-to-date
|
||||
subscriber.next({ state, data: [filtered], key });
|
||||
last = liveTimer.lastUpdate;
|
||||
}
|
||||
};
|
||||
|
||||
if (options.frame) {
|
||||
process(dataFrameToJSON(options.frame));
|
||||
} else if (channel.lastMessageWithSchema) {
|
||||
process(channel.lastMessageWithSchema);
|
||||
}
|
||||
|
||||
const sub = channel.getStream().subscribe({
|
||||
error: (err: any) => {
|
||||
console.log('LiveQuery [error]', { err }, options.addr);
|
||||
state = LoadingState.Error;
|
||||
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
|
||||
sub.unsubscribe(); // close after error
|
||||
},
|
||||
complete: () => {
|
||||
console.log('LiveQuery [complete]', options.addr);
|
||||
if (state !== LoadingState.Error) {
|
||||
state = LoadingState.Done;
|
||||
}
|
||||
// or track errors? subscriber.next({ state, data: [data], key });
|
||||
subscriber.complete();
|
||||
sub.unsubscribe();
|
||||
},
|
||||
next: (evt: LiveChannelEvent) => {
|
||||
if (isLiveChannelMessageEvent(evt)) {
|
||||
process(evt.message);
|
||||
return;
|
||||
}
|
||||
if (isLiveChannelStatusEvent(evt)) {
|
||||
if (evt.error) {
|
||||
let error = toDataQueryError(evt.error);
|
||||
error.message = `Streaming channel error: ${error.message}`;
|
||||
state = LoadingState.Error;
|
||||
subscriber.next({ state, data: [data], key, error });
|
||||
return;
|
||||
} else if (
|
||||
evt.state === LiveChannelConnectionState.Connected ||
|
||||
evt.state === LiveChannelConnectionState.Pending
|
||||
) {
|
||||
if (evt.message) {
|
||||
process(evt.message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
console.log('ignore state', evt);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* For channels that support presence, this will request the current state from the server.
|
||||
*
|
||||
* Join and leave messages will be sent to the open stream
|
||||
*/
|
||||
getPresence(address: LiveChannelAddress, config: LiveChannelConfig): Promise<LiveChannelPresenceStatus> {
|
||||
return this.getChannel(address, config).getPresence();
|
||||
}
|
||||
}
|
||||
|
||||
// This is used to give a unique key for each stream. The actual value does not matter
|
||||
let streamCounter = 0;
|
36
public/app/features/live/channel-config/index.ts
Normal file
36
public/app/features/live/channel-config/index.ts
Normal file
@ -0,0 +1,36 @@
|
||||
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
|
||||
import {
|
||||
grafanaLiveCoreFeatures,
|
||||
GrafanaLiveDataSourceScope,
|
||||
GrafanaLivePluginScope,
|
||||
GrafanaLiveScope,
|
||||
GrafanaLiveStreamScope,
|
||||
} from './scope';
|
||||
import { GrafanaLiveChannelConfigSrv, ExistingLiveChannelScope } from './types';
|
||||
|
||||
export class GrafanaLiveChannelConfigService implements GrafanaLiveChannelConfigSrv {
|
||||
private readonly scopes: Record<LiveChannelScope, GrafanaLiveScope>;
|
||||
|
||||
constructor() {
|
||||
this.scopes = Object.freeze({
|
||||
[LiveChannelScope.Grafana]: grafanaLiveCoreFeatures,
|
||||
[LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(),
|
||||
[LiveChannelScope.Plugin]: new GrafanaLivePluginScope(),
|
||||
[LiveChannelScope.Stream]: new GrafanaLiveStreamScope(),
|
||||
});
|
||||
}
|
||||
|
||||
private getScope = (liveChannelScope: ExistingLiveChannelScope): GrafanaLiveScope =>
|
||||
this.scopes[liveChannelScope as LiveChannelScope];
|
||||
|
||||
doesScopeExist = (liveChannelScope: LiveChannelScope): liveChannelScope is ExistingLiveChannelScope =>
|
||||
Boolean(this.scopes[liveChannelScope]);
|
||||
|
||||
getChannelSupport = async (
|
||||
liveChannelScope: ExistingLiveChannelScope,
|
||||
namespace: string
|
||||
): Promise<LiveChannelSupport | undefined> => this.getScope(liveChannelScope).getChannelSupport(namespace);
|
||||
|
||||
getNamespaces = async (liveChannelScope: ExistingLiveChannelScope): Promise<Array<SelectableValue<string>>> =>
|
||||
this.getScope(liveChannelScope).listNamespaces();
|
||||
}
|
@ -1,8 +1,9 @@
|
||||
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
|
||||
import { getDataSourceSrv } from '@grafana/runtime';
|
||||
import { config } from 'app/core/config';
|
||||
import { loadPlugin } from '../plugins/PluginPage';
|
||||
import { LiveMeasurementsSupport } from './measurements/measurementsSupport';
|
||||
import { loadPlugin } from 'app/features/plugins/PluginPage';
|
||||
import { LiveMeasurementsSupport } from '../measurements/measurementsSupport';
|
||||
import { CoreGrafanaLiveFeature } from './types';
|
||||
|
||||
export abstract class GrafanaLiveScope {
|
||||
constructor(protected scope: LiveChannelScope) {}
|
||||
@ -18,12 +19,6 @@ export abstract class GrafanaLiveScope {
|
||||
abstract listNamespaces(): Promise<Array<SelectableValue<string>>>;
|
||||
}
|
||||
|
||||
export interface CoreGrafanaLiveFeature {
|
||||
name: string;
|
||||
support: LiveChannelSupport;
|
||||
description: string;
|
||||
}
|
||||
|
||||
class GrafanaLiveCoreScope extends GrafanaLiveScope {
|
||||
readonly features = new Map<string, LiveChannelSupport>();
|
||||
readonly namespaces: Array<SelectableValue<string>> = [];
|
18
public/app/features/live/channel-config/types.ts
Normal file
18
public/app/features/live/channel-config/types.ts
Normal file
@ -0,0 +1,18 @@
|
||||
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
|
||||
|
||||
export interface CoreGrafanaLiveFeature {
|
||||
name: string;
|
||||
support: LiveChannelSupport;
|
||||
description: string;
|
||||
}
|
||||
|
||||
export type ExistingLiveChannelScope = LiveChannelScope & { readonly discriminator: unique symbol };
|
||||
|
||||
export interface GrafanaLiveChannelConfigSrv {
|
||||
doesScopeExist: (liveChannelScope: LiveChannelScope) => liveChannelScope is ExistingLiveChannelScope;
|
||||
getChannelSupport: (
|
||||
liveChannelScope: ExistingLiveChannelScope,
|
||||
namespace: string
|
||||
) => Promise<LiveChannelSupport | undefined>;
|
||||
getNamespaces: (liveChannelScope: ExistingLiveChannelScope) => Promise<Array<SelectableValue<string>>>;
|
||||
}
|
@ -12,8 +12,8 @@ import {
|
||||
} from '@grafana/data';
|
||||
import { DashboardChangedModal } from './DashboardChangedModal';
|
||||
import { DashboardEvent, DashboardEventAction } from './types';
|
||||
import { CoreGrafanaLiveFeature } from '../scopes';
|
||||
import { sessionId } from '../live';
|
||||
import { CoreGrafanaLiveFeature } from '../channel-config/types';
|
||||
import { sessionId } from 'app/features/live';
|
||||
import { ShowModalReactEvent } from '../../../types/events';
|
||||
import { Unsubscribable } from 'rxjs';
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { LiveChannelType } from '@grafana/data';
|
||||
import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher';
|
||||
import { grafanaLiveCoreFeatures } from './scopes';
|
||||
import { grafanaLiveCoreFeatures } from './channel-config/scope';
|
||||
|
||||
export function registerLiveFeatures() {
|
||||
grafanaLiveCoreFeatures.register({
|
||||
|
40
public/app/features/live/index.ts
Normal file
40
public/app/features/live/index.ts
Normal file
@ -0,0 +1,40 @@
|
||||
import { config, getBackendSrv, getGrafanaLiveSrv, setGrafanaLiveSrv } from '@grafana/runtime';
|
||||
import { CentrifugeSrv } from './centrifuge/service';
|
||||
import { registerLiveFeatures } from './features';
|
||||
import { GrafanaLiveService } from './live';
|
||||
import { GrafanaLiveChannelConfigService } from './channel-config';
|
||||
import { GrafanaLiveChannelConfigSrv } from './channel-config/types';
|
||||
import { contextSrv } from '../../core/services/context_srv';
|
||||
|
||||
const grafanaLiveScopesSingleton = new GrafanaLiveChannelConfigService();
|
||||
|
||||
export const getGrafanaLiveScopes = (): GrafanaLiveChannelConfigSrv => grafanaLiveScopesSingleton;
|
||||
|
||||
export const sessionId =
|
||||
(window as any)?.grafanaBootData?.user?.id +
|
||||
'/' +
|
||||
Date.now().toString(16) +
|
||||
'/' +
|
||||
Math.random().toString(36).substring(2, 15);
|
||||
|
||||
export function initGrafanaLive() {
|
||||
const centrifugeSrv = new CentrifugeSrv({
|
||||
appUrl: `${window.location.origin}${config.appSubUrl}`,
|
||||
orgId: contextSrv.user.orgId,
|
||||
orgRole: contextSrv.user.orgRole,
|
||||
liveEnabled: config.liveEnabled,
|
||||
sessionId,
|
||||
});
|
||||
setGrafanaLiveSrv(
|
||||
new GrafanaLiveService({
|
||||
scopes: getGrafanaLiveScopes(),
|
||||
centrifugeSrv,
|
||||
backendSrv: getBackendSrv(),
|
||||
})
|
||||
);
|
||||
registerLiveFeatures();
|
||||
}
|
||||
|
||||
export function getGrafanaLiveCentrifugeSrv() {
|
||||
return getGrafanaLiveSrv() as GrafanaLiveService;
|
||||
}
|
@ -1,176 +1,81 @@
|
||||
import Centrifuge from 'centrifuge/dist/centrifuge';
|
||||
import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime';
|
||||
import { CentrifugeSrv } from './centrifuge/service';
|
||||
|
||||
import { mergeMap, from, of, Observable } from 'rxjs';
|
||||
import {
|
||||
GrafanaLiveSrv,
|
||||
setGrafanaLiveSrv,
|
||||
getGrafanaLiveSrv,
|
||||
config,
|
||||
LiveDataStreamOptions,
|
||||
toDataQueryError,
|
||||
getBackendSrv,
|
||||
} from '@grafana/runtime';
|
||||
import { BehaviorSubject, Observable, of } from 'rxjs';
|
||||
import {
|
||||
LiveChannelScope,
|
||||
LiveChannelAddress,
|
||||
LiveChannelConnectionState,
|
||||
LiveChannelConfig,
|
||||
LiveChannelEvent,
|
||||
DataQueryResponse,
|
||||
LiveChannelPresenceStatus,
|
||||
isValidLiveChannelAddress,
|
||||
LiveChannelAddress,
|
||||
LiveChannelConfig,
|
||||
LiveChannelConnectionState,
|
||||
LiveChannelEvent,
|
||||
LiveChannelEventType,
|
||||
LiveChannelPresenceStatus,
|
||||
LoadingState,
|
||||
DataFrameJSON,
|
||||
StreamingDataFrame,
|
||||
DataFrame,
|
||||
dataFrameToJSON,
|
||||
isLiveChannelMessageEvent,
|
||||
isLiveChannelStatusEvent,
|
||||
toLiveChannelId,
|
||||
} from '@grafana/data';
|
||||
import { CentrifugeLiveChannel, getErrorChannel } from './channel';
|
||||
import {
|
||||
GrafanaLiveScope,
|
||||
grafanaLiveCoreFeatures,
|
||||
GrafanaLiveDataSourceScope,
|
||||
GrafanaLivePluginScope,
|
||||
GrafanaLiveStreamScope,
|
||||
} from './scopes';
|
||||
import { registerLiveFeatures } from './features';
|
||||
import { contextSrv } from '../../core/services/context_srv';
|
||||
import { liveTimer } from '../dashboard/dashgrid/liveTimer';
|
||||
import { GrafanaLiveChannelConfigSrv } from './channel-config/types';
|
||||
import { catchError } from 'rxjs/operators';
|
||||
|
||||
export const sessionId =
|
||||
(window as any)?.grafanaBootData?.user?.id +
|
||||
'/' +
|
||||
Date.now().toString(16) +
|
||||
'/' +
|
||||
Math.random().toString(36).substring(2, 15);
|
||||
type GrafanaLiveServiceDeps = {
|
||||
scopes: GrafanaLiveChannelConfigSrv;
|
||||
centrifugeSrv: CentrifugeSrv;
|
||||
backendSrv: BackendSrv;
|
||||
};
|
||||
|
||||
export class CentrifugeSrv implements GrafanaLiveSrv {
|
||||
readonly open = new Map<string, CentrifugeLiveChannel>();
|
||||
readonly centrifuge: Centrifuge;
|
||||
readonly connectionState: BehaviorSubject<boolean>;
|
||||
readonly connectionBlocker: Promise<void>;
|
||||
readonly scopes: Record<LiveChannelScope, GrafanaLiveScope>;
|
||||
private readonly orgId: number;
|
||||
|
||||
constructor() {
|
||||
const baseURL = window.location.origin.replace('http', 'ws');
|
||||
const liveUrl = `${baseURL}${config.appSubUrl}/api/live/ws`;
|
||||
|
||||
this.orgId = contextSrv.user.orgId;
|
||||
this.centrifuge = new Centrifuge(liveUrl, {});
|
||||
this.centrifuge.setConnectData({
|
||||
sessionId,
|
||||
orgId: this.orgId,
|
||||
});
|
||||
// orgRole is set when logged in *or* anonomus users can use grafana
|
||||
if (config.liveEnabled && contextSrv.user.orgRole !== '') {
|
||||
this.centrifuge.connect(); // do connection
|
||||
}
|
||||
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
|
||||
this.connectionBlocker = new Promise<void>((resolve) => {
|
||||
if (this.centrifuge.isConnected()) {
|
||||
return resolve();
|
||||
}
|
||||
const connectListener = () => {
|
||||
resolve();
|
||||
this.centrifuge.removeListener('connect', connectListener);
|
||||
};
|
||||
this.centrifuge.addListener('connect', connectListener);
|
||||
});
|
||||
|
||||
this.scopes = {
|
||||
[LiveChannelScope.Grafana]: grafanaLiveCoreFeatures,
|
||||
[LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(),
|
||||
[LiveChannelScope.Plugin]: new GrafanaLivePluginScope(),
|
||||
[LiveChannelScope.Stream]: new GrafanaLiveStreamScope(),
|
||||
};
|
||||
|
||||
// Register global listeners
|
||||
this.centrifuge.on('connect', this.onConnect);
|
||||
this.centrifuge.on('disconnect', this.onDisconnect);
|
||||
this.centrifuge.on('publish', this.onServerSideMessage);
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
// Internal functions
|
||||
//----------------------------------------------------------
|
||||
|
||||
onConnect = (context: any) => {
|
||||
this.connectionState.next(true);
|
||||
};
|
||||
|
||||
onDisconnect = (context: any) => {
|
||||
this.connectionState.next(false);
|
||||
};
|
||||
|
||||
onServerSideMessage = (context: any) => {
|
||||
console.log('Publication from server-side channel', context);
|
||||
};
|
||||
|
||||
/**
|
||||
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
|
||||
* channel will be returned with an error state indicated in its status
|
||||
*/
|
||||
getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> {
|
||||
const id = `${this.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`;
|
||||
let channel = this.open.get(id);
|
||||
if (channel != null) {
|
||||
return channel;
|
||||
}
|
||||
|
||||
const scope = this.scopes[addr.scope];
|
||||
if (!scope) {
|
||||
return getErrorChannel<TMessage>('invalid scope', id, addr) as any;
|
||||
}
|
||||
|
||||
channel = new CentrifugeLiveChannel(id, addr);
|
||||
channel.shutdownCallback = () => {
|
||||
this.open.delete(id); // remove it from the list of open channels
|
||||
};
|
||||
this.open.set(id, channel);
|
||||
|
||||
// Initialize the channel in the background
|
||||
this.initChannel(scope, channel).catch((err) => {
|
||||
if (channel) {
|
||||
channel.currentStatus.state = LiveChannelConnectionState.Invalid;
|
||||
channel.shutdownWithError(err);
|
||||
}
|
||||
this.open.delete(id);
|
||||
});
|
||||
|
||||
// return the not-yet initalized channel
|
||||
return channel;
|
||||
}
|
||||
|
||||
private async initChannel(scope: GrafanaLiveScope, channel: CentrifugeLiveChannel): Promise<void> {
|
||||
const { addr } = channel;
|
||||
const support = await scope.getChannelSupport(addr.namespace);
|
||||
if (!support) {
|
||||
throw new Error(channel.addr.namespace + ' does not support streaming');
|
||||
}
|
||||
const config = support.getChannelConfig(addr.path);
|
||||
if (!config) {
|
||||
throw new Error('unknown path: ' + addr.path);
|
||||
}
|
||||
const events = channel.initalize(config);
|
||||
if (!this.centrifuge.isConnected()) {
|
||||
await this.connectionBlocker;
|
||||
}
|
||||
channel.subscription = this.centrifuge.subscribe(channel.id, events);
|
||||
return;
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
// Exported functions
|
||||
//----------------------------------------------------------
|
||||
export class GrafanaLiveService implements GrafanaLiveSrv {
|
||||
constructor(private deps: GrafanaLiveServiceDeps) {}
|
||||
|
||||
/**
|
||||
* Listen for changes to the connection state
|
||||
*/
|
||||
getConnectionState() {
|
||||
return this.connectionState.asObservable();
|
||||
getConnectionState(): Observable<boolean> {
|
||||
return this.deps.centrifugeSrv.getConnectionState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a channel and return results as DataFrames
|
||||
*/
|
||||
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
|
||||
const channelConfig = this.getChannelInfo(options.addr);
|
||||
|
||||
return from(channelConfig).pipe(
|
||||
mergeMap((config) => this.deps.centrifugeSrv.getDataStream(options, config)),
|
||||
catchError((error) => this.getInvalidDataStream(error, options))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch for messages in a channel
|
||||
*/
|
||||
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
|
||||
const channelConfig = this.getChannelInfo(address);
|
||||
return from(channelConfig).pipe(
|
||||
mergeMap((config) => this.deps.centrifugeSrv.getStream<T>(address, config)),
|
||||
catchError((error) => this.getInvalidChannelStream<T>(error, address))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish into a channel
|
||||
*
|
||||
* @alpha -- experimental
|
||||
*/
|
||||
async publish(address: LiveChannelAddress, data: any): Promise<any> {
|
||||
return this.deps.backendSrv.post(`api/live/publish`, {
|
||||
channel: toLiveChannelId(address), // orgId is from user
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* For channels that support presence, this will request the current state from the server.
|
||||
*
|
||||
* Join and leave messages will be sent to the open stream
|
||||
*/
|
||||
async getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
|
||||
const channelConfig = await this.getChannelInfo(address);
|
||||
return this.deps.centrifugeSrv.getPresence(address, channelConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -182,162 +87,42 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
|
||||
* the channel is shutdown
|
||||
*/
|
||||
async getChannelInfo(addr: LiveChannelAddress): Promise<LiveChannelConfig> {
|
||||
const scope = this.scopes[addr.scope];
|
||||
if (!scope) {
|
||||
if (!isValidLiveChannelAddress(addr)) {
|
||||
return Promise.reject('invalid live channel address');
|
||||
}
|
||||
|
||||
if (!this.deps.scopes.doesScopeExist(addr.scope)) {
|
||||
return Promise.reject('invalid scope');
|
||||
}
|
||||
|
||||
const support = await scope.getChannelSupport(addr.namespace);
|
||||
const support = await this.deps.scopes.getChannelSupport(addr.scope, addr.namespace);
|
||||
if (!support) {
|
||||
return Promise.reject(addr.namespace + ' does not support streaming');
|
||||
}
|
||||
return support.getChannelConfig(addr.path)!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch for messages in a channel
|
||||
*/
|
||||
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
|
||||
return this.getChannel<T>(address).getStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a channel and return results as DataFrames
|
||||
*/
|
||||
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
|
||||
if (!isValidLiveChannelAddress(options.addr)) {
|
||||
return of({
|
||||
error: toDataQueryError(`invalid channel address: ${JSON.stringify(options.addr)}`),
|
||||
state: LoadingState.Error,
|
||||
data: options.frame ? [options.frame] : [],
|
||||
});
|
||||
}
|
||||
|
||||
return new Observable<DataQueryResponse>((subscriber) => {
|
||||
const channel = this.getChannel(options.addr);
|
||||
const key = options.key ?? `xstr/${streamCounter++}`;
|
||||
let data: StreamingDataFrame | undefined = undefined;
|
||||
let filtered: DataFrame | undefined = undefined;
|
||||
let state = LoadingState.Streaming;
|
||||
let last = liveTimer.lastUpdate;
|
||||
let lastWidth = -1;
|
||||
|
||||
const process = (msg: DataFrameJSON) => {
|
||||
if (!data) {
|
||||
data = new StreamingDataFrame(msg, options.buffer);
|
||||
} else {
|
||||
data.push(msg);
|
||||
}
|
||||
state = LoadingState.Streaming;
|
||||
const sameWidth = lastWidth === data.fields.length;
|
||||
lastWidth = data.fields.length;
|
||||
|
||||
// Filter out fields
|
||||
if (!filtered || msg.schema || !sameWidth) {
|
||||
filtered = data;
|
||||
if (options.filter) {
|
||||
const { fields } = options.filter;
|
||||
if (fields?.length) {
|
||||
filtered = {
|
||||
...data,
|
||||
fields: data.fields.filter((f) => fields.includes(f.name)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const elapsed = liveTimer.lastUpdate - last;
|
||||
if (elapsed > 1000 || liveTimer.ok) {
|
||||
filtered.length = data.length; // make sure they stay up-to-date
|
||||
subscriber.next({ state, data: [filtered], key });
|
||||
last = liveTimer.lastUpdate;
|
||||
}
|
||||
};
|
||||
|
||||
if (options.frame) {
|
||||
process(dataFrameToJSON(options.frame));
|
||||
} else if (channel.lastMessageWithSchema) {
|
||||
process(channel.lastMessageWithSchema);
|
||||
}
|
||||
|
||||
const sub = channel.getStream().subscribe({
|
||||
error: (err: any) => {
|
||||
console.log('LiveQuery [error]', { err }, options.addr);
|
||||
state = LoadingState.Error;
|
||||
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
|
||||
sub.unsubscribe(); // close after error
|
||||
},
|
||||
complete: () => {
|
||||
console.log('LiveQuery [complete]', options.addr);
|
||||
if (state !== LoadingState.Error) {
|
||||
state = LoadingState.Done;
|
||||
}
|
||||
// or track errors? subscriber.next({ state, data: [data], key });
|
||||
subscriber.complete();
|
||||
sub.unsubscribe();
|
||||
},
|
||||
next: (evt: LiveChannelEvent) => {
|
||||
if (isLiveChannelMessageEvent(evt)) {
|
||||
process(evt.message);
|
||||
return;
|
||||
}
|
||||
if (isLiveChannelStatusEvent(evt)) {
|
||||
if (evt.error) {
|
||||
let error = toDataQueryError(evt.error);
|
||||
error.message = `Streaming channel error: ${error.message}`;
|
||||
state = LoadingState.Error;
|
||||
subscriber.next({ state, data: [data], key, error });
|
||||
return;
|
||||
} else if (
|
||||
evt.state === LiveChannelConnectionState.Connected ||
|
||||
evt.state === LiveChannelConnectionState.Pending
|
||||
) {
|
||||
if (evt.message) {
|
||||
process(evt.message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
console.log('ignore state', evt);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
private getInvalidChannelStream = <T>(error: Error, address: LiveChannelAddress): Observable<LiveChannelEvent<T>> => {
|
||||
return of({
|
||||
type: LiveChannelEventType.Status,
|
||||
id: `${address.scope}/${address.namespace}/${address.path}`,
|
||||
timestamp: Date.now(),
|
||||
state: LiveChannelConnectionState.Invalid,
|
||||
error,
|
||||
message: error.message,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* For channels that support presence, this will request the current state from the server.
|
||||
*
|
||||
* Join and leave messages will be sent to the open stream
|
||||
*/
|
||||
getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
|
||||
return this.getChannel(address).getPresence();
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish into a channel
|
||||
*
|
||||
* @alpha -- experimental
|
||||
*/
|
||||
publish(address: LiveChannelAddress, data: any): Promise<any> {
|
||||
return getBackendSrv().post(`api/live/publish`, {
|
||||
channel: toLiveChannelId(address), // orgId is from user
|
||||
data,
|
||||
private getInvalidDataStream = (error: Error, options: LiveDataStreamOptions): Observable<DataQueryResponse> => {
|
||||
return of({
|
||||
error: {
|
||||
data: {
|
||||
error: error.stack,
|
||||
},
|
||||
message: error.message,
|
||||
},
|
||||
state: LoadingState.Error,
|
||||
data: options.frame ? [options.frame] : [],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// This is used to give a unique key for each stream. The actual value does not matter
|
||||
let streamCounter = 0;
|
||||
|
||||
export function getGrafanaLiveCentrifugeSrv() {
|
||||
return getGrafanaLiveSrv() as CentrifugeSrv;
|
||||
}
|
||||
|
||||
export function initGrafanaLive() {
|
||||
setGrafanaLiveSrv(new CentrifugeSrv());
|
||||
registerLiveFeatures();
|
||||
};
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import {
|
||||
} from '@grafana/data';
|
||||
|
||||
import { LivePanelOptions } from './types';
|
||||
import { getGrafanaLiveCentrifugeSrv } from 'app/features/live/live';
|
||||
import { getGrafanaLiveScopes } from 'app/features/live';
|
||||
import { config } from 'app/core/config';
|
||||
|
||||
type Props = StandardEditorProps<LiveChannelAddress, any, LivePanelOptions>;
|
||||
@ -44,12 +44,27 @@ export class LiveChannelEditor extends PureComponent<Props, State> {
|
||||
}
|
||||
}
|
||||
|
||||
async getScopeDetails() {
|
||||
const { scope, namespace } = this.props.value;
|
||||
const srv = getGrafanaLiveScopes();
|
||||
|
||||
if (!srv.doesScopeExist(scope)) {
|
||||
return {
|
||||
namespaces: [],
|
||||
support: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
const namespaces = await srv.getNamespaces(scope);
|
||||
const support = namespace ? await srv.getChannelSupport(scope, namespace) : undefined;
|
||||
return {
|
||||
namespaces,
|
||||
support,
|
||||
};
|
||||
}
|
||||
|
||||
async updateSelectOptions() {
|
||||
const { value } = this.props;
|
||||
const { scope, namespace } = value;
|
||||
const srv = getGrafanaLiveCentrifugeSrv();
|
||||
const namespaces = await srv.scopes[scope].listNamespaces();
|
||||
const support = namespace ? await srv.scopes[scope].getChannelSupport(namespace) : undefined;
|
||||
const { namespaces, support } = await this.getScopeDetails();
|
||||
|
||||
this.setState({
|
||||
namespaces,
|
||||
|
@ -29,7 +29,7 @@ import { DashboardSrv, setDashboardSrv } from 'app/features/dashboard/services/D
|
||||
import { IRootScopeService, IAngularEvent, auto } from 'angular';
|
||||
import { AppEvent } from '@grafana/data';
|
||||
import { backendSrv } from 'app/core/services/backend_srv';
|
||||
import { initGrafanaLive } from 'app/features/live/live';
|
||||
import { initGrafanaLive } from 'app/features/live';
|
||||
|
||||
export type GrafanaRootScope = IRootScopeService & AppEventEmitter & AppEventConsumer & { colors: string[] };
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user