import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime'; import { CentrifugeSrv } from './centrifuge/service'; import { Observable } from 'rxjs'; import { DataQueryResponse, LiveChannelAddress, LiveChannelEvent, LiveChannelPresenceStatus, toLiveChannelId, } from '@grafana/data'; type GrafanaLiveServiceDeps = { centrifugeSrv: CentrifugeSrv; backendSrv: BackendSrv; }; export class GrafanaLiveService implements GrafanaLiveSrv { constructor(private deps: GrafanaLiveServiceDeps) {} /** * Listen for changes to the connection state */ getConnectionState(): Observable { return this.deps.centrifugeSrv.getConnectionState(); } /** * Connect to a channel and return results as DataFrames */ getDataStream(options: LiveDataStreamOptions): Observable { return this.deps.centrifugeSrv.getDataStream(options); } /** * Watch for messages in a channel */ getStream(address: LiveChannelAddress): Observable> { return this.deps.centrifugeSrv.getStream(address); } /** * Publish into a channel * * @alpha -- experimental */ async publish(address: LiveChannelAddress, data: any): Promise { return this.deps.backendSrv.post(`api/live/publish`, { channel: toLiveChannelId(address), // orgId is from user data, }); } /** * For channels that support presence, this will request the current state from the server. * * Join and leave messages will be sent to the open stream */ async getPresence(address: LiveChannelAddress): Promise { return this.deps.centrifugeSrv.getPresence(address); } }