From d95484fba6aab5c26a9fb7e13bca3a62b86dc8a4 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Thu, 16 Jan 2025 17:41:52 +0300 Subject: [PATCH] K8s/Client: Add watch support (#99060) * add watch client * add watch client * add support for selectors * parse labels * always send watch * reuse decoder * Update public/app/features/apiserver/client.ts Co-authored-by: Alex Khomenko * Update public/app/features/apiserver/client.ts Co-authored-by: Alex Khomenko * Update public/app/features/apiserver/client.ts Co-authored-by: Alex Khomenko --------- Co-authored-by: Alex Khomenko --- public/app/core/services/backend_srv.ts | 9 +++--- public/app/features/apiserver/client.ts | 42 +++++++++++++++++++++++++ public/app/features/apiserver/types.ts | 27 ++++++++++++++++ 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/public/app/core/services/backend_srv.ts b/public/app/core/services/backend_srv.ts index ce7dab8c838..cc667ac6c56 100644 --- a/public/app/core/services/backend_srv.ts +++ b/public/app/core/services/backend_srv.ts @@ -198,13 +198,14 @@ export class BackendSrv implements BackendService { } } process() - .catch((e) => { - console.log(requestId, 'catch', e); - }) // from abort .then(() => { console.log(requestId, 'complete'); observer.complete(); - }); // runs in background + }) // runs in background + .catch((e) => { + console.log(requestId, 'catch', e); + observer.error(e); + }); // from abort }, error: (e) => { observer.error(e); diff --git a/public/app/features/apiserver/client.ts b/public/app/features/apiserver/client.ts index 8ac5b531095..ba647d4cf6c 100644 --- a/public/app/features/apiserver/client.ts +++ b/public/app/features/apiserver/client.ts @@ -1,3 +1,5 @@ +import { Observable, from, retry, catchError, filter, map, mergeMap } from 'rxjs'; + import { config, getBackendSrv } from '@grafana/runtime'; import { contextSrv } from 'app/core/core'; @@ -11,8 +13,10 @@ import { ResourceList, ResourceClient, ObjectMeta, + WatchOptions, K8sAPIGroupList, AnnoKeySavedFromUI, + ResourceEvent, } from './types'; export interface GroupVersionResource { @@ -34,6 +38,44 @@ export class ScopedResourceClient implements return getBackendSrv().get>(`${this.url}/${name}`); } + public watch(opts?: WatchOptions): Observable> { + const decoder = new TextDecoder(); + const params = { + ...opts, + watch: true, + labelSelector: this.parseListOptionsSelector(opts?.labelSelector), + fieldSelector: this.parseListOptionsSelector(opts?.fieldSelector), + }; + return getBackendSrv() + .chunked({ + url: params.name ? `${this.url}/${params.name}` : this.url, + params, + }) + .pipe( + filter((response) => response.ok && response.data instanceof Uint8Array), + map((response) => { + const text = decoder.decode(response.data); + return text.split('\n'); + }), + mergeMap((text) => from(text)), + filter((line) => line.length > 0), + map((line) => { + try { + return JSON.parse(line); + } catch (e) { + console.warn('Invalid JSON in watch stream:', e); + return null; + } + }), + filter((event): event is ResourceEvent => event !== null), + retry({ count: 3, delay: 1000 }), + catchError((error) => { + console.error('Watch stream error:', error); + throw error; + }) + ); + } + public async subresource(name: string, path: string): Promise { return getBackendSrv().get(`${this.url}/${name}/${path}`); } diff --git a/public/app/features/apiserver/types.ts b/public/app/features/apiserver/types.ts index 054aa62d810..c6d142f4019 100644 --- a/public/app/features/apiserver/types.ts +++ b/public/app/features/apiserver/types.ts @@ -8,6 +8,8 @@ * */ +import { Observable } from 'rxjs'; + /** The object type and version */ export interface TypeMeta { apiVersion: string; @@ -152,6 +154,25 @@ export interface ListOptions { // Limit the response count limit?: number; + + // Watch for changes + watch?: boolean; +} + +export interface WatchOptions { + // A specific resource + name?: string; + + // Start watching from a given resource version + resourceVersion?: string; + + // Query by labels + // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors + labelSelector?: ListOptionsLabelSelector; + + // Query by fields + // https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ + fieldSelector?: ListOptionsFieldSelector; } export interface MetaStatus { @@ -171,9 +192,15 @@ export interface MetaStatus { details?: object; } +export interface ResourceEvent { + type: 'ADDED' | 'DELETED' | 'MODIFIED'; + object: Resource; +} + export interface ResourceClient { create(obj: ResourceForCreate): Promise>; get(name: string): Promise>; + watch(opts?: WatchOptions): Observable>; subresource(name: string, path: string): Promise; list(opts?: ListOptions): Promise>; update(obj: ResourceForCreate): Promise>;