K8s/Client: Add watch support (#99060)

* add watch client

* add watch client

* add support for selectors

* parse labels

* always send watch

* reuse decoder

* Update public/app/features/apiserver/client.ts

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/apiserver/client.ts

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/apiserver/client.ts

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

---------

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>
This commit is contained in:
Ryan McKinley 2025-01-16 17:41:52 +03:00 committed by GitHub
parent 31deddafb6
commit d95484fba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 74 additions and 4 deletions

View File

@ -198,13 +198,14 @@ export class BackendSrv implements BackendService {
} }
} }
process() process()
.catch((e) => {
console.log(requestId, 'catch', e);
}) // from abort
.then(() => { .then(() => {
console.log(requestId, 'complete'); console.log(requestId, 'complete');
observer.complete(); observer.complete();
}); // runs in background }) // runs in background
.catch((e) => {
console.log(requestId, 'catch', e);
observer.error(e);
}); // from abort
}, },
error: (e) => { error: (e) => {
observer.error(e); observer.error(e);

View File

@ -1,3 +1,5 @@
import { Observable, from, retry, catchError, filter, map, mergeMap } from 'rxjs';
import { config, getBackendSrv } from '@grafana/runtime'; import { config, getBackendSrv } from '@grafana/runtime';
import { contextSrv } from 'app/core/core'; import { contextSrv } from 'app/core/core';
@ -11,8 +13,10 @@ import {
ResourceList, ResourceList,
ResourceClient, ResourceClient,
ObjectMeta, ObjectMeta,
WatchOptions,
K8sAPIGroupList, K8sAPIGroupList,
AnnoKeySavedFromUI, AnnoKeySavedFromUI,
ResourceEvent,
} from './types'; } from './types';
export interface GroupVersionResource { export interface GroupVersionResource {
@ -34,6 +38,44 @@ export class ScopedResourceClient<T = object, S = object, K = string> implements
return getBackendSrv().get<Resource<T, S, K>>(`${this.url}/${name}`); return getBackendSrv().get<Resource<T, S, K>>(`${this.url}/${name}`);
} }
public watch(opts?: WatchOptions): Observable<ResourceEvent<T, S, K>> {
const decoder = new TextDecoder();
const params = {
...opts,
watch: true,
labelSelector: this.parseListOptionsSelector(opts?.labelSelector),
fieldSelector: this.parseListOptionsSelector(opts?.fieldSelector),
};
return getBackendSrv()
.chunked({
url: params.name ? `${this.url}/${params.name}` : this.url,
params,
})
.pipe(
filter((response) => response.ok && response.data instanceof Uint8Array),
map((response) => {
const text = decoder.decode(response.data);
return text.split('\n');
}),
mergeMap((text) => from(text)),
filter((line) => line.length > 0),
map((line) => {
try {
return JSON.parse(line);
} catch (e) {
console.warn('Invalid JSON in watch stream:', e);
return null;
}
}),
filter((event): event is ResourceEvent<T, S, K> => event !== null),
retry({ count: 3, delay: 1000 }),
catchError((error) => {
console.error('Watch stream error:', error);
throw error;
})
);
}
public async subresource<S>(name: string, path: string): Promise<S> { public async subresource<S>(name: string, path: string): Promise<S> {
return getBackendSrv().get<S>(`${this.url}/${name}/${path}`); return getBackendSrv().get<S>(`${this.url}/${name}/${path}`);
} }

View File

@ -8,6 +8,8 @@
* *
*/ */
import { Observable } from 'rxjs';
/** The object type and version */ /** The object type and version */
export interface TypeMeta<K = string> { export interface TypeMeta<K = string> {
apiVersion: string; apiVersion: string;
@ -152,6 +154,25 @@ export interface ListOptions {
// Limit the response count // Limit the response count
limit?: number; limit?: number;
// Watch for changes
watch?: boolean;
}
export interface WatchOptions {
// A specific resource
name?: string;
// Start watching from a given resource version
resourceVersion?: string;
// Query by labels
// https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
labelSelector?: ListOptionsLabelSelector;
// Query by fields
// https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
fieldSelector?: ListOptionsFieldSelector;
} }
export interface MetaStatus { export interface MetaStatus {
@ -171,9 +192,15 @@ export interface MetaStatus {
details?: object; details?: object;
} }
export interface ResourceEvent<T = object, S = object, K = string> {
type: 'ADDED' | 'DELETED' | 'MODIFIED';
object: Resource<T, S, K>;
}
export interface ResourceClient<T = object, S = object, K = string> { export interface ResourceClient<T = object, S = object, K = string> {
create(obj: ResourceForCreate<T, K>): Promise<Resource<T, S, K>>; create(obj: ResourceForCreate<T, K>): Promise<Resource<T, S, K>>;
get(name: string): Promise<Resource<T, S, K>>; get(name: string): Promise<Resource<T, S, K>>;
watch(opts?: WatchOptions): Observable<ResourceEvent<T, S, K>>;
subresource<S>(name: string, path: string): Promise<S>; subresource<S>(name: string, path: string): Promise<S>;
list(opts?: ListOptions): Promise<ResourceList<T, S, K>>; list(opts?: ListOptions): Promise<ResourceList<T, S, K>>;
update(obj: ResourceForCreate<T, K>): Promise<Resource<T, S, K>>; update(obj: ResourceForCreate<T, K>): Promise<Resource<T, S, K>>;