EventBus: add origin to all events and support nested EventBus (#33548)

This commit is contained in:
Ryan McKinley 2021-04-30 13:33:29 -07:00 committed by GitHub
parent cedac5f4d4
commit 69f2a43063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 136 additions and 88 deletions

View File

@ -1,4 +1,4 @@
import { EventBusSrv, EventBusWithSource } from './EventBus';
import { EventBusSrv } from './EventBus';
import { BusEvent, BusEventWithPayload } from './types';
import { eventFactory } from './eventFactory';
import { DataHoverEvent } from './common';
@ -50,8 +50,8 @@ describe('EventBus', () => {
describe('EventBusWithSource', () => {
it('can add sources to the source path', () => {
const bus = new EventBusSrv();
const busWithSource = new EventBusWithSource(bus, 'foo');
expect(busWithSource.source).toEqual('foo');
const busWithSource = bus.newScopedBus('foo');
expect((busWithSource as any).path).toEqual(['foo']);
});
it('adds the source to the event payload', () => {
@ -60,11 +60,11 @@ describe('EventBus', () => {
bus.subscribe(DataHoverEvent, (event) => events.push(event));
const busWithSource = new EventBusWithSource(bus, 'foo');
const busWithSource = bus.newScopedBus('foo');
busWithSource.publish({ type: DataHoverEvent.type });
expect(events.length).toEqual(1);
expect(events[0].payload.source).toEqual('foo');
expect(events[0].origin).toEqual(busWithSource);
});
});

View File

@ -1,6 +1,6 @@
import EventEmitter from 'eventemitter3';
import { Unsubscribable, Observable } from 'rxjs';
import { PayloadWithSource } from './common';
import { filter } from 'rxjs/operators';
import {
EventBus,
LegacyEmitter,
@ -9,7 +9,7 @@ import {
LegacyEventHandler,
BusEvent,
AppEvent,
BusEventWithPayload,
EventFilterOptions,
} from './types';
/**
@ -44,6 +44,10 @@ export class EventBusSrv implements EventBus, LegacyEmitter {
});
}
newScopedBus(key: string, filter?: EventFilterOptions): EventBus {
return new ScopedEventBus([key], this, filter);
}
/**
* Legacy functions
*/
@ -94,40 +98,48 @@ export class EventBusSrv implements EventBus, LegacyEmitter {
}
/**
* @alpha
*
* Wraps EventBus and adds a source to help with identifying if a subscriber should react to the event or not.
*/
export class EventBusWithSource implements EventBus {
source: string;
eventBus: EventBus;
class ScopedEventBus implements EventBus {
// will be mutated by panel runners
filterConfig: EventFilterOptions;
constructor(eventBus: EventBus, source: string) {
this.eventBus = eventBus;
this.source = source;
// The path is not yet exposed, but can be used to indicate nested groups and support faster filtering
constructor(public path: string[], private eventBus: EventBus, filter?: EventFilterOptions) {
this.filterConfig = filter ?? { onlyLocal: false };
}
publish<T extends BusEvent>(event: T): void {
const decoratedEvent = {
...event,
...{ payload: { ...event.payload, ...{ source: this.source } } },
};
this.eventBus.publish(decoratedEvent);
if (!event.origin) {
(event as any).origin = this;
}
this.eventBus.publish(event);
}
subscribe<T extends BusEvent>(eventType: BusEventType<T>, handler: BusEventHandler<T>): Unsubscribable {
return this.eventBus.subscribe(eventType, handler);
}
filter = (event: BusEvent) => {
if (this.filterConfig.onlyLocal) {
return event.origin === this;
}
return true;
};
getStream<T extends BusEvent>(eventType: BusEventType<T>): Observable<T> {
return this.eventBus.getStream(eventType);
return this.eventBus.getStream(eventType).pipe(filter(this.filter)) as Observable<T>;
}
// syntax sugar
subscribe<T extends BusEvent>(typeFilter: BusEventType<T>, handler: BusEventHandler<T>): Unsubscribable {
return this.getStream(typeFilter).subscribe({ next: handler });
}
removeAllListeners(): void {
this.eventBus.removeAllListeners();
}
isOwnEvent(event: BusEventWithPayload<PayloadWithSource>): boolean {
return event.payload.source === this.source;
/**
* Creates a nested event bus structure
*/
newScopedBus(key: string, filter: EventFilterOptions): EventBus {
return new ScopedEventBus([...this.path, key], this, filter);
}
}

View File

@ -2,12 +2,7 @@ import { DataFrame } from '../types';
import { BusEventWithPayload } from './types';
/** @alpha */
export interface PayloadWithSource {
source?: string; // source from where the event originates
}
/** @alpha */
export interface DataHoverPayload extends PayloadWithSource {
export interface DataHoverPayload {
raw: any; // Original mouse event (includes pageX etc)
x: Record<string, any>; // { time: 5678 },

View File

@ -7,6 +7,7 @@ import { Unsubscribable, Observable } from 'rxjs';
export interface BusEvent {
readonly type: string;
readonly payload?: any;
readonly origin?: EventBus;
}
/**
@ -52,6 +53,14 @@ export interface BusEventHandler<T extends BusEvent> {
(event: T): void;
}
/**
* @alpha
* Main minimal interface
*/
export interface EventFilterOptions {
onlyLocal: boolean;
}
/**
* @alpha
* Main minimal interface
@ -62,20 +71,29 @@ export interface EventBus {
*/
publish<T extends BusEvent>(event: T): void;
/**
* Subscribe to single event
*/
subscribe<T extends BusEvent>(eventType: BusEventType<T>, handler: BusEventHandler<T>): Unsubscribable;
/**
* Get observable of events
*/
getStream<T extends BusEvent>(eventType: BusEventType<T>): Observable<T>;
/**
* Subscribe to an event stream
*
* This function is a wrapper around the `getStream(...)` function
*/
subscribe<T extends BusEvent>(eventType: BusEventType<T>, handler: BusEventHandler<T>): Unsubscribable;
/**
* Remove all event subscriptions
*/
removeAllListeners(): void;
/**
* Returns a new bus scoped that knows where it exists in a heiarchy
*
* @internal -- This is included for internal use only should not be used directly
*/
newScopedBus(key: string, filter: EventFilterOptions): EventBus;
}
/**

View File

@ -1,11 +1,14 @@
import { EventBusWithSource } from '@grafana/data';
import { EventBusSrv, EventBus } from '@grafana/data';
import React from 'react';
interface PanelContext {
eventBus?: EventBusWithSource;
/** @alpha */
export interface PanelContext {
eventBus: EventBus;
}
const PanelContextRoot = React.createContext<PanelContext>({});
const PanelContextRoot = React.createContext<PanelContext>({
eventBus: new EventBusSrv(),
});
/**
* @alpha

View File

@ -37,4 +37,4 @@ export {
ErrorIndicatorProps as PanelChromeErrorIndicatorProps,
} from './ErrorIndicator';
export { usePanelContext, PanelContextProvider } from './PanelContext';
export { usePanelContext, PanelContextProvider, PanelContext } from './PanelContext';

View File

@ -143,25 +143,17 @@ function useSliceHighlightState() {
const { eventBus } = usePanelContext();
useEffect(() => {
if (!eventBus) {
return;
}
const setHighlightedSlice = (event: DataHoverEvent) => {
if (eventBus.isOwnEvent(event)) {
setHighlightedTitle(event.payload.dataId);
}
setHighlightedTitle(event.payload.dataId);
};
const resetHighlightedSlice = (event: DataHoverClearEvent) => {
if (eventBus.isOwnEvent(event)) {
setHighlightedTitle(undefined);
}
setHighlightedTitle(undefined);
};
const subs = new Subscription()
.add(eventBus.subscribe(DataHoverEvent, setHighlightedSlice))
.add(eventBus.subscribe(DataHoverClearEvent, resetHighlightedSlice));
.add(eventBus.getStream(DataHoverEvent).subscribe({ next: setHighlightedSlice }))
.add(eventBus.getStream(DataHoverClearEvent).subscribe({ next: resetHighlightedSlice }));
return () => {
subs.unsubscribe();

View File

@ -95,6 +95,7 @@ export {
PanelChromeErrorIndicator,
PanelChromeErrorIndicatorProps,
PanelContextProvider,
PanelContext,
usePanelContext,
} from './PanelChrome';
export { VizLayout, VizLayoutComponentType, VizLayoutLegendProps, VizLayoutProps } from './VizLayout/VizLayout';

View File

@ -18,15 +18,19 @@ export const AnnotationPicker = ({ annotation, events, onEnabledChanged }: Annot
const onCancel = () => getDashboardQueryRunner().cancel(annotation);
useEffect(() => {
const started = events.subscribe(AnnotationQueryStarted, (event) => {
if (event.payload === annotation) {
setLoading(true);
}
const started = events.getStream(AnnotationQueryStarted).subscribe({
next: (event) => {
if (event.payload === annotation) {
setLoading(true);
}
},
});
const stopped = events.subscribe(AnnotationQueryFinished, (event) => {
if (event.payload === annotation) {
setLoading(false);
}
const stopped = events.getStream(AnnotationQueryFinished).subscribe({
next: (event) => {
if (event.payload === annotation) {
setLoading(false);
}
},
});
return () => {

View File

@ -4,7 +4,7 @@ import classNames from 'classnames';
import { Subscription } from 'rxjs';
// Components
import { PanelHeader } from './PanelHeader/PanelHeader';
import { ErrorBoundary, PanelContextProvider } from '@grafana/ui';
import { ErrorBoundary, PanelContextProvider, PanelContext } from '@grafana/ui';
// Utils & Services
import { getTimeSrv, TimeSrv } from '../services/TimeSrv';
import { applyPanelTimeOverrides } from 'app/features/dashboard/utils/panel';
@ -15,7 +15,8 @@ import { DashboardModel, PanelModel } from '../state';
import { PANEL_BORDER } from 'app/core/constants';
import {
AbsoluteTimeRange,
EventBusWithSource,
EventBusSrv,
EventFilterOptions,
FieldConfigSource,
getDefaultTimeRange,
LoadingState,
@ -47,22 +48,34 @@ export interface State {
renderCounter: number;
errorMessage?: string;
refreshWhenInView: boolean;
eventBus: EventBusWithSource;
context: PanelContext;
data: PanelData;
}
export class PanelChrome extends Component<Props, State> {
private readonly timeSrv: TimeSrv = getTimeSrv();
private subs = new Subscription();
private eventFilter: EventFilterOptions = { onlyLocal: true };
constructor(props: Props) {
super(props);
// Can this eventBus be on PanelModel?
// when we have more complex event filtering, that may be a better option
const eventBus = props.dashboard.events
? props.dashboard.events.newScopedBus(
`panel:${props.panel.id}`, // panelID
this.eventFilter
)
: new EventBusSrv();
this.state = {
isFirstLoad: true,
renderCounter: 0,
refreshWhenInView: false,
eventBus: new EventBusWithSource(props.dashboard.events, `panel-${props.panel.id}`),
context: {
eventBus,
},
data: this.getInitialPanelDataState(),
};
}
@ -297,10 +310,14 @@ export class PanelChrome extends Component<Props, State> {
});
const panelOptions = panel.getOptions();
// Update the event filter (dashboard settings may have changed)
// Yes this is called ever render for a function that is triggered on every mouse move
this.eventFilter.onlyLocal = dashboard.graphTooltip === 0;
return (
<>
<div className={panelContentClassNames}>
<PanelContextProvider value={{ eventBus: this.state.eventBus }}>
<PanelContextProvider value={this.state.context}>
<PanelComponent
id={panel.id}
data={data}

View File

@ -11,7 +11,6 @@ import {
DataLinkBuiltInVars,
DataQuery,
DataTransformerConfig,
EventBus,
EventBusSrv,
FieldConfigSource,
PanelPlugin,
@ -170,13 +169,18 @@ export class PanelModel implements DataConfigSource {
isInView = false;
configRev = 0; // increments when configs change
hasRefreshed?: boolean;
events: EventBus;
cacheTimeout?: any;
cachedPluginOptions: Record<string, PanelOptionsCache> = {};
legend?: { show: boolean; sort?: string; sortDesc?: boolean };
plugin?: PanelPlugin;
dataSupport?: PanelPluginDataSupport;
/**
* The PanelModel event bus only used for internal and legacy angular support.
* The EventBus passed to panels is based on the dashboard event model.
*/
events: EventBusSrv;
private queryRunner?: PanelQueryRunner;
constructor(model: any) {

View File

@ -55,10 +55,11 @@ async function setupTestContext({
const props: Props = {
data: { state: LoadingState.Done, timeRange: getDefaultTimeRange(), series: [] },
eventBus: {
subscribe: jest.fn(),
getStream: jest.fn(),
publish: jest.fn(),
removeAllListeners: jest.fn(),
subscribe: jest.fn(),
newScopedBus: jest.fn(),
},
fieldConfig: ({} as unknown) as FieldConfigSource,
height: 400,

View File

@ -3,12 +3,10 @@ import { CustomScrollbar } from '@grafana/ui';
import {
BusEvent,
CircularVector,
DataHoverPayload,
DataHoverEvent,
DataHoverClearEvent,
DataSelectEvent,
EventBus,
BusEventHandler,
} from '@grafana/data';
import { PartialObserver, Unsubscribable } from 'rxjs';
@ -24,22 +22,25 @@ interface State {
interface BusEventEx {
key: number;
type: string;
payload: DataHoverPayload;
path: string;
payload: any;
}
let counter = 100;
export class EventBusLoggerPanel extends PureComponent<Props, State> {
history = new CircularVector<BusEventEx>({ capacity: 40, append: 'head' });
subs: Unsubscribable[] = [];
subs: Unsubscribable[];
constructor(props: Props) {
super(props);
this.state = { counter };
this.subs.push(props.eventBus.subscribe(DataHoverEvent, this.hoverHandler));
props.eventBus.getStream(DataHoverClearEvent).subscribe(this.eventObserver);
props.eventBus.getStream(DataSelectEvent).subscribe(this.eventObserver);
const subs: Unsubscribable[] = [];
subs.push(props.eventBus.getStream(DataHoverEvent).subscribe(this.eventObserver));
subs.push(props.eventBus.getStream(DataHoverClearEvent).subscribe(this.eventObserver));
subs.push(props.eventBus.getStream(DataSelectEvent).subscribe(this.eventObserver));
this.subs = subs;
}
componentWillUnmount() {
@ -48,17 +49,17 @@ export class EventBusLoggerPanel extends PureComponent<Props, State> {
}
}
hoverHandler: BusEventHandler<DataHoverEvent> = (event: DataHoverEvent) => {
this.history.add({
key: counter++,
type: event.type,
payload: event.payload,
});
this.setState({ counter });
};
eventObserver: PartialObserver<BusEvent> = {
next: (v: BusEvent) => {},
next: (event: BusEvent) => {
const origin = event.origin as any;
this.history.add({
key: counter++,
type: event.type,
path: origin?.path,
payload: event.payload,
});
this.setState({ counter });
},
};
render() {
@ -66,7 +67,7 @@ export class EventBusLoggerPanel extends PureComponent<Props, State> {
<CustomScrollbar autoHeightMin="100%" autoHeightMax="100%">
{this.history.map((v, idx) => (
<div key={v.key}>
{v.key} {v.type} / X:{JSON.stringify(v.payload.x)} / Y:{JSON.stringify(v.payload.y)}
{JSON.stringify(v.path)} {v.type} / X:{JSON.stringify(v.payload.x)} / Y:{JSON.stringify(v.payload.y)}
</div>
))}
</CustomScrollbar>