From 1be12be6d984c659f47a504b43d90bfdb13fb6c9 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Thu, 5 Dec 2024 01:52:45 +0100 Subject: [PATCH 1/3] Support for pagination #64 --- CHANGELOG.md | 7 + openeo.d.ts | 87 +++++++++- src/connection.js | 413 ++++++++++++++++++++++++++++++---------------- 3 files changed, 362 insertions(+), 145 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15523f2..9ba7fe9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New function `getMissingBackends` for `Logs` - New property `federation:backends` added to the array returned by `validateProcess` +- New functions in `Connection` for paginating through lists: + - `paginateProcesses` + - `paginateCollections` + - `paginateJobs` + - `paginateFiles` + - `paginateUserProcesses` + - `paginateServices` ## [2.6.0] - 2024-07-11 diff --git a/openeo.d.ts b/openeo.d.ts index d4db7fa..f207010 100644 --- a/openeo.d.ts +++ b/openeo.d.ts @@ -2026,6 +2026,17 @@ declare module OpenEO { * @throws {Error} */ listCollections(): Promise; + /** + * Paginate through the collections available on the back-end. + * + * The collections returned always comply to the latest STAC version (currently 1.0.0). + * + * @async + * @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections. + * @yields {Promise} A response compatible to the API specification. + * @throws {Error} + */ + paginateCollections(limit?: number | null): AsyncGenerator; /** * Get further information about a single collection. * @@ -2039,12 +2050,11 @@ declare module OpenEO { describeCollection(collectionId: string): Promise; /** * Loads items for a specific image collection. + * * May not be available for all collections. * * The items returned always comply to the latest STAC version (currently 1.0.0). * - * This is an experimental API and is subject to change. - * * @async * @param {string} collectionId - Collection ID to request items for. * @param {?Array.} [spatialExtent=null] - Limits the items to the given bounding box in WGS84: @@ -2076,7 +2086,7 @@ declare module OpenEO { */ protected normalizeNamespace(namespace: string | null): string | null; /** - * List processes available on the back-end. + * List all processes available on the back-end. * * Requests pre-defined processes by default. * Set the namespace parameter to request processes from a specific namespace. @@ -2090,6 +2100,22 @@ declare module OpenEO { * @throws {Error} */ listProcesses(namespace?: string | null): Promise; + /** + * Paginate through the processes available on the back-end. + * + * Requests pre-defined processes by default. + * Set the namespace parameter to request processes from a specific namespace. + * + * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. + * The namespaces are then listed in the property `namespaces`. + * + * @async + * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! + * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. + * @yields {Promise} - A response compatible to the API specification. + * @throws {Error} + */ + paginateProcesses(namespace?: string | null, limit?: number | null): AsyncGenerator; /** * Get information about a single process. * @@ -2239,13 +2265,22 @@ declare module OpenEO { */ describeAccount(): Promise; /** - * Lists all files from the user workspace. + * List all files from the user workspace. * * @async * @returns {Promise>} A list of files. * @throws {Error} */ listFiles(): Promise>; + /** + * Paginate through the files from the user workspace. + * + * @async + * @param {?number} [limit=50] - The number of files per request/page as integer. If `null`, requests all files. + * @yields {Promise>} A list of files. + * @throws {Error} + */ + paginateFiles(limit?: number | null): AsyncGenerator; /** * A callback that is executed on upload progress updates. * @@ -2299,7 +2334,7 @@ declare module OpenEO { */ validateProcess(process: Process): Promise; /** - * Lists all user-defined processes of the authenticated user. + * List all user-defined processes of the authenticated user. * * @async * @param {Array.} [oldProcesses=[]] - A list of existing user-defined processes to update. @@ -2307,6 +2342,16 @@ declare module OpenEO { * @throws {Error} */ listUserProcesses(oldProcesses?: Array): Promise>; + /** + * Paginates through the user-defined processes of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. + * @param {Array.} [oldProcesses=[]] - A list of existing user-defined processes to update. + * @yields {Promise>} A list of user-defined processes. + * @throws {Error} + */ + paginateUserProcesses(limit?: number | null, oldProcesses?: Array): AsyncGenerator; /** * Creates a new stored user-defined process at the back-end. * @@ -2359,7 +2404,7 @@ declare module OpenEO { */ downloadResult(process: Process, targetPath: string, plan?: string | null, budget?: number | null, abortController?: AbortController | null): Promise; /** - * Lists all batch jobs of the authenticated user. + * List all batch jobs of the authenticated user. * * @async * @param {Array.} [oldJobs=[]] - A list of existing jobs to update. @@ -2367,6 +2412,16 @@ declare module OpenEO { * @throws {Error} */ listJobs(oldJobs?: Array): Promise>; + /** + * Paginate through the batch jobs of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of jobs per request/page as integer. If `null`, requests all jobs. + * @param {Array.} [oldJobs=[]] - A list of existing jobs to update. + * @yields {Promise>} A list of jobs. + * @throws {Error} + */ + paginateJobs(limit?: number | null, oldJobs?: Array): AsyncGenerator; /** * Creates a new batch job at the back-end. * @@ -2391,7 +2446,7 @@ declare module OpenEO { */ getJob(id: string): Promise; /** - * Lists all secondary web services of the authenticated user. + * List all secondary web services of the authenticated user. * * @async * @param {Array.} [oldServices=[]] - A list of existing services to update. @@ -2399,6 +2454,16 @@ declare module OpenEO { * @throws {Error} */ listServices(oldServices?: Array): Promise>; + /** + * Paginate through the secondary web services of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of services per request/page as integer. If `null` (default), requests all services. + * @param {Array.} [oldServices=[]] - A list of existing services to update. + * @yields {Promise>} A list of services. + * @throws {Error} + */ + paginateServices(limit?: number | null, oldServices?: Array): AsyncGenerator; /** * Creates a new secondary web service at the back-end. * @@ -2446,6 +2511,14 @@ declare module OpenEO { * @throws {Error} */ protected _getLinkHref(links: Array, rel: string | Array): string | null; + /** + * Get the URL of the next page from a response. + * + * @protected + * @param {AxiosResponse} response + * @returns {string | null} + */ + protected _getNextLink(response: AxiosResponse): string | null; /** * Makes all links in the list absolute. * diff --git a/src/connection.js b/src/connection.js index c9cba0d..87fb34b 100644 --- a/src/connection.js +++ b/src/connection.js @@ -106,14 +106,14 @@ class Connection { * @throws {Error} */ async init() { - let response = await this._get('/'); - let data = Object.assign({}, response.data); + const response = await this._get('/'); + const data = Object.assign({}, response.data); data.links = this.makeLinksAbsolute(data.links, response); if (!Array.isArray(data.conformsTo) && Array.isArray(data.links)) { - let conformanceLink = this._getLinkHref(data.links, CONFORMANCE_RELS); + const conformanceLink = this._getLinkHref(data.links, CONFORMANCE_RELS); if (conformanceLink) { - let response2 = await this._get(conformanceLink); + const response2 = await this._get(conformanceLink); if (Utils.isObject(response2.data) && Array.isArray(response2.data.conformsTo)) { data.conformsTo = response2.data.conformsTo; } @@ -135,10 +135,10 @@ class Connection { if (this.processes.count() === 0) { return; } - let promises = this.processes.namespaces().map(namespace => { + const promises = this.processes.namespaces().map(namespace => { let fn = () => Promise.resolve(); if (namespace === 'user') { - let userProcesses = this.processes.namespace('user'); + const userProcesses = this.processes.namespace('user'); if (!this.isAuthenticated()) { fn = () => (this.processes.remove(null, 'user') ? Promise.resolve() : Promise.reject(new Error("Can't clear user processes"))); } @@ -189,7 +189,7 @@ class Connection { * @throws {Error} */ async listFileTypes() { - let response = await this._get('/file_formats'); + const response = await this._get('/file_formats'); return new FileTypes(response.data); } @@ -201,7 +201,7 @@ class Connection { * @throws {Error} */ async listServiceTypes() { - let response = await this._get('/service_types'); + const response = await this._get('/service_types'); return response.data; } @@ -213,7 +213,7 @@ class Connection { * @throws {Error} */ async listUdfRuntimes() { - let response = await this._get('/udf_runtimes'); + const response = await this._get('/udf_runtimes'); return response.data; } @@ -227,16 +227,38 @@ class Connection { * @throws {Error} */ async listCollections() { - let response = await this._get('/collections'); - if (Utils.isObject(response.data) && Array.isArray(response.data.collections)) { - response.data.collections = response.data.collections.map(collection => { - if (collection.stac_version) { - return StacMigrate.collection(collection); - } - return collection; - }); + const firstPage = await this.paginateCollections(null).next(); + return firstPage.value; + } + + /** + * Paginate through the collections available on the back-end. + * + * The collections returned always comply to the latest STAC version (currently 1.0.0). + * + * @async + * @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections. + * @yields {Promise} A response compatible to the API specification. + * @throws {Error} + */ + async * paginateCollections(limit = 50) { + let nextUrl = '/collections'; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; } - return response.data; + do { + const response = await this._get(nextUrl); + if (Utils.isObject(response.data) && Array.isArray(response.data.collections)) { + response.data.collections = response.data.collections.map(collection => { + if (collection.stac_version) { + return StacMigrate.collection(collection); + } + return collection; + }); + } + yield response.data; + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -250,7 +272,7 @@ class Connection { * @throws {Error} */ async describeCollection(collectionId) { - let response = await this._get('/collections/' + collectionId); + const response = await this._get('/collections/' + collectionId); if (response.data.stac_version) { return StacMigrate.collection(response.data); } @@ -261,12 +283,11 @@ class Connection { /** * Loads items for a specific image collection. + * * May not be available for all collections. * * The items returned always comply to the latest STAC version (currently 1.0.0). * - * This is an experimental API and is subject to change. - * * @async * @param {string} collectionId - Collection ID to request items for. * @param {?Array.} [spatialExtent=null] - Limits the items to the given bounding box in WGS84: @@ -283,33 +304,29 @@ class Connection { * @throws {Error} */ async * listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) { - let page = 1; let nextUrl = '/collections/' + collectionId + '/items'; + let params = {}; + if (Array.isArray(spatialExtent)) { + params.bbox = spatialExtent.join(','); + } + if (Array.isArray(temporalExtent)) { + params.datetime = temporalExtent + .map(e => { + if (e instanceof Date) { + return e.toISOString(); + } + else if (typeof e === 'string') { + return e; + } + return '..'; // Open date range + }) + .join('/'); + } + if (limit > 0) { + params.limit = limit; + } while(nextUrl) { - let params = {}; - if (page === 1) { - if (Array.isArray(spatialExtent)) { - params.bbox = spatialExtent.join(','); - } - if (Array.isArray(temporalExtent)) { - params.datetime = temporalExtent - .map(e => { - if (e instanceof Date) { - return e.toISOString(); - } - else if (typeof e === 'string') { - return e; - } - return '..'; // Open date range - }) - .join('/'); - } - if (limit > 0) { - params.limit = limit; - } - } - - let response = await this._get(nextUrl, params); + const response = await this._get(nextUrl, params); if (Utils.isObject(response.data) && Array.isArray(response.data.features)) { response.data.features = response.data.features.map(item => { if (item.stac_version) { @@ -319,10 +336,8 @@ class Connection { }); } yield response.data; - - page++; - let links = this.makeLinksAbsolute(response.data.links); - nextUrl = this._getLinkHref(links, 'next'); + nextUrl = this._getNextLink(response); + params = null; } } @@ -346,7 +361,7 @@ class Connection { } /** - * List processes available on the back-end. + * List all processes available on the back-end. * * Requests pre-defined processes by default. * Set the namespace parameter to request processes from a specific namespace. @@ -360,21 +375,50 @@ class Connection { * @throws {Error} */ async listProcesses(namespace = null) { + const firstPage = await this.paginateProcesses(namespace, null).next(); + return firstPage.value; + } + + /** + * Paginate through the processes available on the back-end. + * + * Requests pre-defined processes by default. + * Set the namespace parameter to request processes from a specific namespace. + * + * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. + * The namespaces are then listed in the property `namespaces`. + * + * @async + * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! + * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. + * @yields {Promise} - A response compatible to the API specification. + * @throws {Error} + */ + async * paginateProcesses(namespace = null, limit = 50) { if (!namespace) { namespace = 'backend'; } - let path = (namespace === 'backend') ? '/processes' : `/processes/${this.normalizeNamespace(namespace)}`; - let response = await this._get(path); - if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { - throw new Error('Invalid response received for processes'); + let nextUrl = (namespace === 'backend') ? '/processes' : `/processes/${this.normalizeNamespace(namespace)}`; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; } + do { + const response = await this._get(nextUrl); - // Store processes in cache - this.processes.remove(null, namespace); - this.processes.addAll(response.data.processes, namespace); - - return Object.assign(response.data, {processes: this.processes.namespace(namespace)}); + if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { + throw new Error('Invalid response received for processes'); + } + + // Store processes in cache + this.processes.addAll(response.data.processes, namespace); + for (let i in response.data.processes) { + response.data.processes[i] = this.processes.get(response.data.processes[i].id, namespace); + } + + yield response.data; + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -395,7 +439,7 @@ class Connection { await this.listProcesses(); } else { - let response = await this._get(`/processes/${this.normalizeNamespace(namespace)}/${processId}`); + const response = await this._get(`/processes/${this.normalizeNamespace(namespace)}/${processId}`); if (!Utils.isObject(response.data) || typeof response.data.id !== 'string') { throw new Error('Invalid response received for process'); } @@ -432,15 +476,15 @@ class Connection { } this.authProviderList = []; - let cap = this.capabilities(); + const cap = this.capabilities(); // Add OIDC providers if (cap.hasFeature('authenticateOIDC')) { - let res = await this._get('/credentials/oidc'); - let oidcFactory = this.getOidcProviderFactory(); + const res = await this._get('/credentials/oidc'); + const oidcFactory = this.getOidcProviderFactory(); if (Utils.isObject(res.data) && Array.isArray(res.data.providers) && typeof oidcFactory === 'function') { for(let i in res.data.providers) { - let obj = oidcFactory(res.data.providers[i]); + const obj = oidcFactory(res.data.providers[i]); if (obj instanceof AuthProvider) { this.authProviderList.push(obj); } @@ -521,7 +565,7 @@ class Connection { * @see Connection#listAuthProviders */ async authenticateBasic(username, password) { - let basic = new BasicProvider(this); + const basic = new BasicProvider(this); await basic.login(username, password); } @@ -615,7 +659,7 @@ class Connection { * @returns {AuthProvider} */ setAuthToken(type, providerId, token) { - let provider = new AuthProvider(type, this, { + const provider = new AuthProvider(type, this, { id: providerId, title: "Custom", description: "" @@ -635,23 +679,43 @@ class Connection { * @throws {Error} */ async describeAccount() { - let response = await this._get('/me'); + const response = await this._get('/me'); return response.data; } /** - * Lists all files from the user workspace. + * List all files from the user workspace. * * @async * @returns {Promise>} A list of files. * @throws {Error} */ async listFiles() { - let response = await this._get('/files'); - let files = response.data.files.map( - f => new UserFile(this, f.path).setAll(f) - ); - return this._toResponseArray(files, response.data); + const firstPage = await this.paginateFiles(null).next(); + return firstPage.value; + } + + /** + * Paginate through the files from the user workspace. + * + * @async + * @param {?number} [limit=50] - The number of files per request/page as integer. If `null`, requests all files. + * @yields {Promise>} A list of files. + * @throws {Error} + */ + async * paginateFiles(limit = 50) { + let nextUrl = '/files'; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; + } + do { + const response = await this._get(nextUrl); + const files = response.data.files.map( + f => new UserFile(this, f.path).setAll(f) + ); + yield this._toResponseArray(files, response.data); + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -682,7 +746,7 @@ class Connection { if (targetPath === null) { targetPath = Environment.fileNameForUpload(source); } - let file = await this.getFile(targetPath); + const file = await this.getFile(targetPath); return await file.uploadFile(source, statusCallback, abortController); } @@ -732,7 +796,7 @@ class Connection { * @throws {Error} */ async validateProcess(process) { - let response = await this._post('/validation', this._normalizeUserProcess(process).process); + const response = await this._post('/validation', this._normalizeUserProcess(process).process); if (Array.isArray(response.data.errors)) { const errors = response.data.errors; errors['federation:backends'] = Array.isArray(response.data['federation:missing']) ? response.data['federation:missing'] : []; @@ -744,7 +808,7 @@ class Connection { } /** - * Lists all user-defined processes of the authenticated user. + * List all user-defined processes of the authenticated user. * * @async * @param {Array.} [oldProcesses=[]] - A list of existing user-defined processes to update. @@ -752,29 +816,47 @@ class Connection { * @throws {Error} */ async listUserProcesses(oldProcesses = []) { - let response = await this._get('/process_graphs'); + const firstPage = await this.paginateUserProcesses(null, oldProcesses).next(); + return firstPage.value; + } - if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { - throw new Error('Invalid response received for processes'); + /** + * Paginates through the user-defined processes of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. + * @param {Array.} [oldProcesses=[]] - A list of existing user-defined processes to update. + * @yields {Promise>} A list of user-defined processes. + * @throws {Error} + */ + async * paginateUserProcesses(limit = 50, oldProcesses = []) { + let nextUrl = '/process_graphs'; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; } + do { + const response = await this._get(nextUrl); - // Remove existing processes from cache - this.processes.remove(null, 'user'); - - // Update existing processes if needed - let newProcesses = response.data.processes.map(newProcess => { - let process = oldProcesses.find(oldProcess => oldProcess.id === newProcess.id); - if (!process) { - process = new UserProcess(this, newProcess.id); + if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { + throw new Error('Invalid response received for processes'); } - return process.setAll(newProcess); - }); - - // Store plain JS variant (i.e. no Job objects involved) of processes in cache - let jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes; - this.processes.addAll(jsonProcesses, 'user'); - return this._toResponseArray(newProcesses, response.data); + // Update existing processes if needed + const newProcesses = response.data.processes.map(newProcess => { + let process = oldProcesses.find(oldProcess => oldProcess.id === newProcess.id); + if (!process) { + process = new UserProcess(this, newProcess.id); + } + return process.setAll(newProcess); + }); + + // Store plain JS variant (i.e. no UserProcess objects involved) of processes in cache + const jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes; + this.processes.addAll(jsonProcesses, 'user'); + + yield this._toResponseArray(newProcesses, response.data); + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -787,7 +869,7 @@ class Connection { * @throws {Error} */ async setUserProcess(id, process) { - let pg = new UserProcess(this, id); + const pg = new UserProcess(this, id); return await pg.replaceUserProcess(process); } @@ -800,7 +882,7 @@ class Connection { * @throws {Error} */ async getUserProcess(id) { - let pg = new UserProcess(this, id); + const pg = new UserProcess(this, id); return await pg.describeUserProcess(); } @@ -818,15 +900,15 @@ class Connection { * @returns {Promise} - An object with the data and some metadata. */ async computeResult(process, plan = null, budget = null, abortController = null, additional = {}) { - let requestBody = this._normalizeUserProcess( + const requestBody = this._normalizeUserProcess( process, Object.assign({}, additional, { plan: plan, budget: budget }) ); - let response = await this._post('/result', requestBody, Environment.getResponseType(), abortController); - let syncResult = { + const response = await this._post('/result', requestBody, Environment.getResponseType(), abortController); + const syncResult = { data: response.data, costs: null, type: null, @@ -841,15 +923,15 @@ class Connection { syncResult.type = response.headers['content-type']; } - let links = Array.isArray(response.headers.link) ? response.headers.link : [response.headers.link]; + const links = Array.isArray(response.headers.link) ? response.headers.link : [response.headers.link]; for(let link of links) { if (typeof link !== 'string') { continue; } - let logs = link.match(/^<([^>]+)>;\s?rel="monitor"/i); + const logs = link.match(/^<([^>]+)>;\s?rel="monitor"/i); if (Array.isArray(logs) && logs.length > 1) { try { - let logsResponse = await this._get(logs[1]); + const logsResponse = await this._get(logs[1]); if (Utils.isObject(logsResponse.data) && Array.isArray(logsResponse.data.logs)) { syncResult.logs = logsResponse.data.logs; } @@ -880,13 +962,13 @@ class Connection { * @throws {Error} */ async downloadResult(process, targetPath, plan = null, budget = null, abortController = null) { - let response = await this.computeResult(process, plan, budget, abortController); + const response = await this.computeResult(process, plan, budget, abortController); // @ts-ignore await Environment.saveToFile(response.data, targetPath); } /** - * Lists all batch jobs of the authenticated user. + * List all batch jobs of the authenticated user. * * @async * @param {Array.} [oldJobs=[]] - A list of existing jobs to update. @@ -894,15 +976,37 @@ class Connection { * @throws {Error} */ async listJobs(oldJobs = []) { - let response = await this._get('/jobs'); - let newJobs = response.data.jobs.map(newJob => { - let job = oldJobs.find(oldJob => oldJob.id === newJob.id); - if (!job) { - job = new Job(this, newJob.id); - } - return job.setAll(newJob); - }); - return this._toResponseArray(newJobs, response.data); + const firstPage = await this.paginateJobs(null, oldJobs).next(); + return firstPage.value; + + } + + /** + * Paginate through the batch jobs of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of jobs per request/page as integer. If `null`, requests all jobs. + * @param {Array.} [oldJobs=[]] - A list of existing jobs to update. + * @yields {Promise>} A list of jobs. + * @throws {Error} + */ + async * paginateJobs(limit = 50, oldJobs = []) { + let nextUrl = '/jobs'; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; + } + do { + const response = await this._get(nextUrl); + const newJobs = response.data.jobs.map(newJob => { + let job = oldJobs.find(oldJob => oldJob.id === newJob.id); + if (!job) { + job = new Job(this, newJob.id); + } + return job.setAll(newJob); + }); + yield this._toResponseArray(newJobs, response.data); + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -925,12 +1029,12 @@ class Connection { plan: plan, budget: budget }); - let requestBody = this._normalizeUserProcess(process, additional); - let response = await this._post('/jobs', requestBody); + const requestBody = this._normalizeUserProcess(process, additional); + const response = await this._post('/jobs', requestBody); if (typeof response.headers['openeo-identifier'] !== 'string') { throw new Error("Response did not contain a Job ID. Job has likely been created, but may not show up yet."); } - let job = new Job(this, response.headers['openeo-identifier']).setAll(requestBody); + const job = new Job(this, response.headers['openeo-identifier']).setAll(requestBody); if (this.capabilities().hasFeature('describeJob')) { return await job.describeJob(); } @@ -948,12 +1052,12 @@ class Connection { * @throws {Error} */ async getJob(id) { - let job = new Job(this, id); + const job = new Job(this, id); return await job.describeJob(); } /** - * Lists all secondary web services of the authenticated user. + * List all secondary web services of the authenticated user. * * @async * @param {Array.} [oldServices=[]] - A list of existing services to update. @@ -961,15 +1065,36 @@ class Connection { * @throws {Error} */ async listServices(oldServices = []) { - let response = await this._get('/services'); - let newServices = response.data.services.map(newService => { - let service = oldServices.find(oldService => oldService.id === newService.id); - if (!service) { - service = new Service(this, newService.id); - } - return service.setAll(newService); - }); - return this._toResponseArray(newServices, response.data); + const firstPage = await this.paginateServices(null, oldServices).next(); + return firstPage.value; + } + + /** + * Paginate through the secondary web services of the authenticated user. + * + * @async + * @param {?number} [limit=50] - The number of services per request/page as integer. If `null` (default), requests all services. + * @param {Array.} [oldServices=[]] - A list of existing services to update. + * @yields {Promise>} A list of services. + * @throws {Error} + */ + async * paginateServices(limit = 50, oldServices = []) { + let nextUrl = '/services'; + if (limit > 0) { + nextUrl = nextUrl + '?limit=' + limit; + } + do { + const response = await this._get(nextUrl); + const newServices = response.data.services.map(newService => { + let service = oldServices.find(oldService => oldService.id === newService.id); + if (!service) { + service = new Service(this, newService.id); + } + return service.setAll(newService); + }); + yield this._toResponseArray(newServices, response.data); + nextUrl = this._getNextLink(response); + } while (nextUrl); } /** @@ -989,7 +1114,7 @@ class Connection { * @throws {Error} */ async createService(process, type, title = null, description = null, enabled = true, configuration = {}, plan = null, budget = null, additional = {}) { - let requestBody = this._normalizeUserProcess(process, Object.assign({ + const requestBody = this._normalizeUserProcess(process, Object.assign({ title: title, description: description, type: type, @@ -998,11 +1123,11 @@ class Connection { plan: plan, budget: budget }, additional)); - let response = await this._post('/services', requestBody); + const response = await this._post('/services', requestBody); if (typeof response.headers['openeo-identifier'] !== 'string') { throw new Error("Response did not contain a Service ID. Service has likely been created, but may not show up yet."); } - let service = new Service(this, response.headers['openeo-identifier']).setAll(requestBody); + const service = new Service(this, response.headers['openeo-identifier']).setAll(requestBody); if (this.capabilities().hasFeature('describeService')) { return service.describeService(); } @@ -1020,7 +1145,7 @@ class Connection { * @throws {Error} */ async getService(id) { - let service = new Service(this, id); + const service = new Service(this, id); return await service.describeService(); } @@ -1054,7 +1179,7 @@ class Connection { rel = [rel]; } if (Array.isArray(links)) { - let link = links.find(l => Utils.isObject(l) && rel.includes(l.rel) && typeof l.href === 'string'); + const link = links.find(l => Utils.isObject(l) && rel.includes(l.rel) && typeof l.href === 'string'); if (link) { return link.href; } @@ -1062,6 +1187,18 @@ class Connection { return null; } + /** + * Get the URL of the next page from a response. + * + * @protected + * @param {AxiosResponse} response + * @returns {string | null} + */ + _getNextLink(response) { + const links = this.makeLinksAbsolute(response.data.links, response); + return this._getLinkHref(links, 'next'); + } + /** * Makes all links in the list absolute. * @@ -1091,7 +1228,7 @@ class Connection { return link; } try { - let url = new URL(link.href, baseUrl); + const url = new URL(link.href, baseUrl); return Object.assign({}, link, {href: url.toString()}); } catch(error) { return link; @@ -1138,7 +1275,7 @@ class Connection { * @see https://github.com/axios/axios#request-config */ async _post(path, body, responseType, abortController = null) { - let options = { + const options = { method: 'post', responseType: responseType, url: path, @@ -1210,7 +1347,7 @@ class Connection { * @throws {Error} */ async download(url, authorize) { - let result = await this._send({ + const result = await this._send({ method: 'get', responseType: Environment.getResponseType(), url: url, @@ -1270,7 +1407,7 @@ class Connection { try { let response = await axios(options); - let capabilities = this.capabilities(); + const capabilities = this.capabilities(); if (capabilities) { response = capabilities.migrate(response); } @@ -1294,7 +1431,7 @@ class Connection { // See: https://github.com/axios/axios/issues/815 if (options.responseType === Environment.getResponseType()) { try { - let errorResponse = await Environment.handleErrorResponse(error); + const errorResponse = await Environment.handleErrorResponse(error); throw enrichError(error, errorResponse); } catch (error2) { console.error(error2); From 2ae4fd94ab2cf826c28032cfb3c089c1ab47fa33 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Fri, 3 Jan 2025 20:58:10 +0100 Subject: [PATCH 2/3] Add self URLs for pagination --- openeo.d.ts | 21 ++++++++++++++++++--- src/connection.js | 45 +++++++++++++++++++++++++++++++++++++-------- src/typedefs.js | 3 ++- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/openeo.d.ts b/openeo.d.ts index f207010..4466b21 100644 --- a/openeo.d.ts +++ b/openeo.d.ts @@ -2020,6 +2020,7 @@ declare module OpenEO { * List all collections available on the back-end. * * The collections returned always comply to the latest STAC version (currently 1.0.0). + * This function adds a self link to the response if not present. * * @async * @returns {Promise} A response compatible to the API specification. @@ -2029,7 +2030,8 @@ declare module OpenEO { /** * Paginate through the collections available on the back-end. * - * The collections returned always comply to the latest STAC version (currently 1.0.0). + * The collections returned always complies to the latest STAC version (currently 1.0.0). + * This function adds a self link to the response if not present. * * @async * @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections. @@ -2094,6 +2096,8 @@ declare module OpenEO { * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. * The namespaces are then listed in the property `namespaces`. * + * This function adds a self link to the response if not present. + * * @async * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! * @returns {Promise} - A response compatible to the API specification. @@ -2109,6 +2113,8 @@ declare module OpenEO { * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. * The namespaces are then listed in the property `namespaces`. * + * This function adds a self link to the response if not present. + * * @async * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. @@ -2498,9 +2504,10 @@ declare module OpenEO { * @protected * @param {Array.<*>} arr * @param {object.} response + * @param {string} selfUrl * @returns {ResponseArray} */ - protected _toResponseArray(arr: Array, response: object): ResponseArray; + protected _toResponseArray(arr: Array, response: object, selfUrl: string): ResponseArray; /** * Get the a link with the given rel type. * @@ -2519,6 +2526,14 @@ declare module OpenEO { * @returns {string | null} */ protected _getNextLink(response: AxiosResponse): string | null; + /** + * Add a self link to the response if not present. + * + * @param {object} data - The body of the response as an object. + * @param {string} selfUrl - The URL of the current request. + * @returns {object} The modified object. + */ + _addSelfLink(data: object, selfUrl: string): object; /** * Makes all links in the list absolute. * @@ -2982,7 +2997,7 @@ declare module OpenEO { /** * An array, but enriched with additional details from an openEO API response. * - * Adds two properties: `links` and `federation:missing`. + * Adds three properties: `url`, `links` and `federation:missing`. */ export type ResponseArray = any; export type ServiceType = object; diff --git a/src/connection.js b/src/connection.js index 87fb34b..3a21888 100644 --- a/src/connection.js +++ b/src/connection.js @@ -221,6 +221,7 @@ class Connection { * List all collections available on the back-end. * * The collections returned always comply to the latest STAC version (currently 1.0.0). + * This function adds a self link to the response if not present. * * @async * @returns {Promise} A response compatible to the API specification. @@ -234,7 +235,8 @@ class Connection { /** * Paginate through the collections available on the back-end. * - * The collections returned always comply to the latest STAC version (currently 1.0.0). + * The collections returned always complies to the latest STAC version (currently 1.0.0). + * This function adds a self link to the response if not present. * * @async * @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections. @@ -256,7 +258,7 @@ class Connection { return collection; }); } - yield response.data; + yield this._addSelfLink(response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -369,6 +371,8 @@ class Connection { * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. * The namespaces are then listed in the property `namespaces`. * + * This function adds a self link to the response if not present. + * * @async * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! * @returns {Promise} - A response compatible to the API specification. @@ -388,6 +392,8 @@ class Connection { * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. * The namespaces are then listed in the property `namespaces`. * + * This function adds a self link to the response if not present. + * * @async * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. @@ -416,7 +422,7 @@ class Connection { response.data.processes[i] = this.processes.get(response.data.processes[i].id, namespace); } - yield response.data; + yield this._addSelfLink(response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -713,7 +719,7 @@ class Connection { const files = response.data.files.map( f => new UserFile(this, f.path).setAll(f) ); - yield this._toResponseArray(files, response.data); + yield this._toResponseArray(files, response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -854,7 +860,7 @@ class Connection { const jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes; this.processes.addAll(jsonProcesses, 'user'); - yield this._toResponseArray(newProcesses, response.data); + yield this._toResponseArray(newProcesses, response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -1004,7 +1010,7 @@ class Connection { } return job.setAll(newJob); }); - yield this._toResponseArray(newJobs, response.data); + yield this._toResponseArray(newJobs, response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -1092,7 +1098,7 @@ class Connection { } return service.setAll(newService); }); - yield this._toResponseArray(newServices, response.data); + yield this._toResponseArray(newServices, response.data, nextUrl); nextUrl = this._getNextLink(response); } while (nextUrl); } @@ -1157,9 +1163,11 @@ class Connection { * @protected * @param {Array.<*>} arr * @param {object.} response + * @param {string} selfUrl * @returns {ResponseArray} */ - _toResponseArray(arr, response) { + _toResponseArray(arr, response, selfUrl) { + arr.url = selfUrl; arr.links = Array.isArray(response.links) ? response.links : []; arr['federation:missing'] = Array.isArray(response['federation:missing']) ? response['federation:missing'] : []; return arr; @@ -1199,6 +1207,27 @@ class Connection { return this._getLinkHref(links, 'next'); } + /** + * Add a self link to the response if not present. + * + * @param {object} data - The body of the response as an object. + * @param {string} selfUrl - The URL of the current request. + * @returns {object} The modified object. + */ + _addSelfLink(data, selfUrl) { + if (!Utils.isObject(data)) { + return data; + } + if (!Array.isArray(data.links)) { + data.links = []; + } + const selfLink = data.links.find(l => Utils.isObject(l) && l.rel === 'self'); + if (!selfLink) { + data.links.push({rel: 'self', href: selfUrl}); + } + return data; + } + /** * Makes all links in the list absolute. * diff --git a/src/typedefs.js b/src/typedefs.js index 0fd0a10..ce3249f 100644 --- a/src/typedefs.js +++ b/src/typedefs.js @@ -213,11 +213,12 @@ /** * An array, but enriched with additional details from an openEO API response. * - * Adds two properties: `links` and `federation:missing`. + * Adds three properties: `url`, `links` and `federation:missing`. * * @typedef ResponseArray * @augments Array * @type {Array.<*>} + * @property {string} url The URL from which the data was requested. * @property {Array.} links A list of related links. * @property {Array.} ["federation:missing"] A list of backends from the federation that are missing in the response data. */ From 7665b6005a494640e831afef3ac90080c30146c4 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Sat, 4 Jan 2025 02:19:13 +0100 Subject: [PATCH 3/3] Rework pagination --- CHANGELOG.md | 5 + src/connection.js | 262 ++++------------------------ src/pages.js | 349 ++++++++++++++++++++++++++++++++++++++ src/typedefs.js | 3 +- tests/earthengine.test.js | 8 +- tests/eodc.test.js | 7 +- 6 files changed, 401 insertions(+), 233 deletions(-) create mode 100644 src/pages.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ba7fe9..3d939c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `paginateFiles` - `paginateUserProcesses` - `paginateServices` +- The client may add a self link to the links in responses + +### Changed + +- The `listCollectionItems` function in `Connection` was completely rewritten. ## [2.6.0] - 2024-07-11 diff --git a/src/connection.js b/src/connection.js index 3a21888..172601e 100644 --- a/src/connection.js +++ b/src/connection.js @@ -17,6 +17,7 @@ const Service = require('./service'); const Builder = require('./builder/builder'); const BuilderNode = require('./builder/node'); +const { CollectionPages, ItemPages, JobPages, ProcessPages, ServicePages, UserFilePages } = require('./pages'); const CONFORMANCE_RELS = [ 'conformance', @@ -228,39 +229,20 @@ class Connection { * @throws {Error} */ async listCollections() { - const firstPage = await this.paginateCollections(null).next(); - return firstPage.value; + const pages = this.paginateCollections(null); + return await pages.nextPage([], false); } /** * Paginate through the collections available on the back-end. * - * The collections returned always complies to the latest STAC version (currently 1.0.0). - * This function adds a self link to the response if not present. + * The collections returned always comply to the latest STAC version (currently 1.0.0). * - * @async * @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections. - * @yields {Promise} A response compatible to the API specification. - * @throws {Error} + * @returns {CollectionPages} A paged list of collections. */ - async * paginateCollections(limit = 50) { - let nextUrl = '/collections'; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - if (Utils.isObject(response.data) && Array.isArray(response.data.collections)) { - response.data.collections = response.data.collections.map(collection => { - if (collection.stac_version) { - return StacMigrate.collection(collection); - } - return collection; - }); - } - yield this._addSelfLink(response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateCollections(limit = 50) { + return new CollectionPages(this, limit); } /** @@ -284,7 +266,7 @@ class Connection { } /** - * Loads items for a specific image collection. + * Paginate through items for a specific collection. * * May not be available for all collections. * @@ -302,11 +284,10 @@ class Connection { * each must be either an RFC 3339 compatible string or a Date object. * Also supports open intervals by setting one of the boundaries to `null`, but never both. * @param {?number} [limit=null] - The amount of items per request/page as integer. If `null` (default), the back-end decides. - * @yields {Promise} A response compatible to the API specification. + * @returns {Promise} A response compatible to the API specification. * @throws {Error} */ - async * listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) { - let nextUrl = '/collections/' + collectionId + '/items'; + listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) { let params = {}; if (Array.isArray(spatialExtent)) { params.bbox = spatialExtent.join(','); @@ -327,20 +308,7 @@ class Connection { if (limit > 0) { params.limit = limit; } - while(nextUrl) { - const response = await this._get(nextUrl, params); - if (Utils.isObject(response.data) && Array.isArray(response.data.features)) { - response.data.features = response.data.features.map(item => { - if (item.stac_version) { - return StacMigrate.item(item); - } - return item; - }); - } - yield response.data; - nextUrl = this._getNextLink(response); - params = null; - } + return new ItemPages(this, collectionId, params, limit); } /** @@ -379,8 +347,8 @@ class Connection { * @throws {Error} */ async listProcesses(namespace = null) { - const firstPage = await this.paginateProcesses(namespace, null).next(); - return firstPage.value; + const pages = this.paginateProcesses(namespace); + return await pages.nextPage([], false); } /** @@ -392,39 +360,12 @@ class Connection { * Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given. * The namespaces are then listed in the property `namespaces`. * - * This function adds a self link to the response if not present. - * - * @async * @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL! * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. - * @yields {Promise} - A response compatible to the API specification. - * @throws {Error} + * @returns {ProcessPages} A paged list of processes. */ - async * paginateProcesses(namespace = null, limit = 50) { - if (!namespace) { - namespace = 'backend'; - } - - let nextUrl = (namespace === 'backend') ? '/processes' : `/processes/${this.normalizeNamespace(namespace)}`; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - - if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { - throw new Error('Invalid response received for processes'); - } - - // Store processes in cache - this.processes.addAll(response.data.processes, namespace); - for (let i in response.data.processes) { - response.data.processes[i] = this.processes.get(response.data.processes[i].id, namespace); - } - - yield this._addSelfLink(response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateProcesses(namespace = null, limit = 50) { + return new ProcessPages(this, limit, namespace); } /** @@ -697,31 +638,18 @@ class Connection { * @throws {Error} */ async listFiles() { - const firstPage = await this.paginateFiles(null).next(); - return firstPage.value; + const pages = this.paginateFiles(null); + return await pages.nextPage(); } /** * Paginate through the files from the user workspace. * - * @async * @param {?number} [limit=50] - The number of files per request/page as integer. If `null`, requests all files. - * @yields {Promise>} A list of files. - * @throws {Error} + * @returns {ServicePages} A paged list of files. */ - async * paginateFiles(limit = 50) { - let nextUrl = '/files'; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - const files = response.data.files.map( - f => new UserFile(this, f.path).setAll(f) - ); - yield this._toResponseArray(files, response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateFiles(limit = 50) { + return new UserFilePages(this, limit); } /** @@ -822,47 +750,18 @@ class Connection { * @throws {Error} */ async listUserProcesses(oldProcesses = []) { - const firstPage = await this.paginateUserProcesses(null, oldProcesses).next(); - return firstPage.value; + const pages = this.paginateUserProcesses(null); + return await pages.nextPage(oldProcesses); } /** * Paginates through the user-defined processes of the authenticated user. * - * @async * @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes. - * @param {Array.} [oldProcesses=[]] - A list of existing user-defined processes to update. - * @yields {Promise>} A list of user-defined processes. - * @throws {Error} + * @returns {ProcessPages} A paged list of user-defined processes. */ - async * paginateUserProcesses(limit = 50, oldProcesses = []) { - let nextUrl = '/process_graphs'; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - - if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) { - throw new Error('Invalid response received for processes'); - } - - // Update existing processes if needed - const newProcesses = response.data.processes.map(newProcess => { - let process = oldProcesses.find(oldProcess => oldProcess.id === newProcess.id); - if (!process) { - process = new UserProcess(this, newProcess.id); - } - return process.setAll(newProcess); - }); - - // Store plain JS variant (i.e. no UserProcess objects involved) of processes in cache - const jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes; - this.processes.addAll(jsonProcesses, 'user'); - - yield this._toResponseArray(newProcesses, response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateUserProcesses(limit = 50) { + return this.paginateProcesses('user', limit); } /** @@ -982,37 +881,19 @@ class Connection { * @throws {Error} */ async listJobs(oldJobs = []) { - const firstPage = await this.paginateJobs(null, oldJobs).next(); - return firstPage.value; - + const pages = this.paginateJobs(null); + const firstPage = await pages.nextPage(oldJobs); + return firstPage; } /** * Paginate through the batch jobs of the authenticated user. * - * @async * @param {?number} [limit=50] - The number of jobs per request/page as integer. If `null`, requests all jobs. - * @param {Array.} [oldJobs=[]] - A list of existing jobs to update. - * @yields {Promise>} A list of jobs. - * @throws {Error} + * @returns {JobPages} A paged list of jobs. */ - async * paginateJobs(limit = 50, oldJobs = []) { - let nextUrl = '/jobs'; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - const newJobs = response.data.jobs.map(newJob => { - let job = oldJobs.find(oldJob => oldJob.id === newJob.id); - if (!job) { - job = new Job(this, newJob.id); - } - return job.setAll(newJob); - }); - yield this._toResponseArray(newJobs, response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateJobs(limit = 50) { + return new JobPages(this, limit); } /** @@ -1071,36 +952,18 @@ class Connection { * @throws {Error} */ async listServices(oldServices = []) { - const firstPage = await this.paginateServices(null, oldServices).next(); - return firstPage.value; + const pages = this.paginateServices(null); + return await pages.nextPage(oldServices); } /** * Paginate through the secondary web services of the authenticated user. * - * @async * @param {?number} [limit=50] - The number of services per request/page as integer. If `null` (default), requests all services. - * @param {Array.} [oldServices=[]] - A list of existing services to update. - * @yields {Promise>} A list of services. - * @throws {Error} + * @returns {ServicePages} A paged list of services. */ - async * paginateServices(limit = 50, oldServices = []) { - let nextUrl = '/services'; - if (limit > 0) { - nextUrl = nextUrl + '?limit=' + limit; - } - do { - const response = await this._get(nextUrl); - const newServices = response.data.services.map(newService => { - let service = oldServices.find(oldService => oldService.id === newService.id); - if (!service) { - service = new Service(this, newService.id); - } - return service.setAll(newService); - }); - yield this._toResponseArray(newServices, response.data, nextUrl); - nextUrl = this._getNextLink(response); - } while (nextUrl); + paginateServices(limit = 50) { + return new ServicePages(this, limit); } /** @@ -1155,24 +1018,6 @@ class Connection { return await service.describeService(); } - /** - * Adds additional response details to the array. - * - * Adds links and federation:missing. - * - * @protected - * @param {Array.<*>} arr - * @param {object.} response - * @param {string} selfUrl - * @returns {ResponseArray} - */ - _toResponseArray(arr, response, selfUrl) { - arr.url = selfUrl; - arr.links = Array.isArray(response.links) ? response.links : []; - arr['federation:missing'] = Array.isArray(response['federation:missing']) ? response['federation:missing'] : []; - return arr; - } - /** * Get the a link with the given rel type. * @@ -1195,39 +1040,6 @@ class Connection { return null; } - /** - * Get the URL of the next page from a response. - * - * @protected - * @param {AxiosResponse} response - * @returns {string | null} - */ - _getNextLink(response) { - const links = this.makeLinksAbsolute(response.data.links, response); - return this._getLinkHref(links, 'next'); - } - - /** - * Add a self link to the response if not present. - * - * @param {object} data - The body of the response as an object. - * @param {string} selfUrl - The URL of the current request. - * @returns {object} The modified object. - */ - _addSelfLink(data, selfUrl) { - if (!Utils.isObject(data)) { - return data; - } - if (!Array.isArray(data.links)) { - data.links = []; - } - const selfLink = data.links.find(l => Utils.isObject(l) && l.rel === 'self'); - if (!selfLink) { - data.links.push({rel: 'self', href: selfUrl}); - } - return data; - } - /** * Makes all links in the list absolute. * diff --git a/src/pages.js b/src/pages.js new file mode 100644 index 0000000..29600d5 --- /dev/null +++ b/src/pages.js @@ -0,0 +1,349 @@ +/* eslint-disable max-classes-per-file */ + +const Job = require('./job.js'); +const Service = require('./service.js'); +const UserFile = require('./userfile.js'); +const UserProcess = require('./userprocess.js'); +const Utils = require('@openeo/js-commons/src/utils'); +const StacMigrate = require('@radiantearth/stac-migrate'); + +const FED_MISSING = 'federation:missing'; + +/** + * A class to handle pagination of resources. + * + * @abstract + */ +class Pages { + /** + * Creates an instance of Pages. + * + * @param {Connection} connection + * @param {string} endpoint + * @param {string} key + * @param {Constructor} cls + * @param {object} [params={}] + * @param {string} primaryKey + */ + constructor(connection, endpoint, key, cls, params = {}, primaryKey = "id") { + this.connection = connection; + this.nextUrl = endpoint; + this.key = key; + this.primaryKey = primaryKey; + this.cls = cls; + if (!(params.limit > 0)) { + delete params.limit; + } + this.params = params; + } + + /** + * Returns true if there are more pages to fetch. + * + * @returns {boolean} + */ + hasNextPage() { + return this.nextUrl !== null; + } + + /** + * Returns the next page of resources. + * + * @async + * @param {Array.} oldObjects - Existing objects to update, if any. + * @param {boolean} [toArray=true] - Whether to return the objects as a simplified array or as an object with all information. + * @returns {Array.} + * @throws {Error} + */ + async nextPage(oldObjects = [], toArray = true) { + // Request data from server + const response = await this.connection._get(this.nextUrl, this.params); + + let data = response.data; + // Check response + if (!Utils.isObject(data)) { + throw new Error(`Response is invalid, is not an object`); + } + if (!Array.isArray(data[this.key])) { + throw new Error(`Response is invalid, '${this.key}' property is not an array`); + } + + // Update existing objects if needed + let newObjects = data[this.key].map(updated => { + let resource = oldObjects.find(old => old[this.primaryKey] === updated[this.primaryKey]); + if (resource) { + resource.setAll(updated); + } + else { + resource = this._createObject(updated); + } + return resource; + }); + + // Store objects in cache if needed + newObjects = this._cache(newObjects); + + // Add self link if missing + data.links = this._ensureArray(data.links); + const selfLink = this.connection._getLinkHref(data.links, 'self'); + if (!selfLink) { + data.links.push({rel: 'self', href: this.nextUrl}); + } + + // Check whether a next page is available + this.nextUrl = this._getNextLink(response); + // Don't append initial params to the next URL + this.params = null; + + // Either return as ResponseArray or full API response body + if (toArray) { + newObjects.links = data.links; + newObjects[FED_MISSING] = this._ensureArray(data[FED_MISSING]); + return newObjects; + } + else { + data[this.key] = newObjects; + return data; + } + } + + /** + * Ensures a variable is an array. + * + * @protected + * @param {*} x + * @returns {Array} + */ + _ensureArray(x) { + return Array.isArray(x) ? x : []; + } + + /** + * Creates a facade for the object, if needed. + * + * @protected + * @param {object} obj + * @returns {object} + */ + _createObject(obj) { + if (this.cls) { + const cls = this.cls; + const newObj = new cls(this.connection, obj[this.primaryKey]); + newObj.setAll(obj); + return newObj; + } + else { + return obj; + } + } + + /** + * Caches the plain objects if needed. + * + * @param {Array.} objects + * @returns {Array.} + */ + _cache(objects) { + return objects; + } + + /** + * Get the URL of the next page from a response. + * + * @protected + * @param {AxiosResponse} response + * @returns {string | null} + */ + _getNextLink(response) { + const links = this.connection.makeLinksAbsolute(response.data.links, response); + return this.connection._getLinkHref(links, 'next'); + } + + /** + * Makes this class asynchronously iterable. + * + * @returns {AsyncIterator} + */ + [Symbol.asyncIterator]() { + return { + self: this, + /** + * Get the next page of resources. + * + * @async + * @returns {{done: boolean, value: Array.}} + */ + async next() { + const done = !this.self.hasNextPage(); + let value; + if (!done) { + value = await this.self.nextPage(); + } + return { done, value }; + } + } + } + +} + +/** + * Paginate through jobs. + */ +class JobPages extends Pages { + /** + * Paginate through jobs. + * + * @param {Connection} connection + * @param {?number} limit + */ + constructor(connection, limit = null) { + super(connection, "/jobs", "jobs", Job, {limit}); + } +} + +/** + * Paginate through services. + */ +class ServicePages extends Pages { + /** + * Paginate through services. + * + * @param {Connection} connection + * @param {?number} limit + */ + constructor(connection, limit = null) { + super(connection, "/services", "services", Service, {limit}); + } +} + +/** + * Paginate through user files. + */ +class UserFilePages extends Pages { + /** + * Paginate through user files. + * + * @param {Connection} connection + * @param {?number} limit + */ + constructor(connection, limit = null) { + super(connection, "/files", "files", UserFile, {limit}, "path"); + } +} + +/** + * Paginate through processes. + */ +class ProcessPages extends Pages { + /** + * Paginate through processes. + * + * @param {Connection} connection + * @param {?number} limit + * @param {?string} namespace + */ + constructor(connection, limit = null, namespace = null) { + if (!namespace) { + namespace = 'backend'; + } + let endpoint; + let cls = null + if (namespace === 'user') { + endpoint = '/process_graphs'; + cls = UserProcess; + } + else { + endpoint = '/processes'; + if (namespace !== 'backend') { + const normalized = connection.normalizeNamespace(namespace); + endpoint += `/${normalized}`; + } + } + super(connection, endpoint, "processes", cls, {limit}); + this.namespace = namespace; + } + + /** + * Caches the objects to the ProcessRegistry. + * + * @param {Array.} objects + * @returns {Array.} + */ + _cache(objects) { + const plainObjects = objects.map(p => (typeof p.toJSON === 'function' ? p.toJSON() : p)); + this.connection.processes.addAll(plainObjects, this.namespace); + if (!this.cls) { + for (let i in objects) { + objects[i] = this.connection.processes.get(objects[i].id, this.namespace); + } + } + return objects; + } +} + +/** + * Paginate through collections. + */ +class CollectionPages extends Pages { + /** + * Paginate through collections. + * + * @param {Connection} connection + * @param {?number} limit + */ + constructor(connection, limit = null) { + super(connection, "/collections", "collections", null, {limit}); + } + + /** + * Migrates the STAC collection to the latest version. + * + * @param {object} obj + * @returns {Collection} + */ + _createObject(obj) { + if (obj.stac_version) { + return StacMigrate.collection(obj); + } + return obj; + } +} + +/** + * Paginate through collection items. + */ +class ItemPages extends Pages { + /** + * Paginate through collection items. + * + * @param {Connection} connection + * @param {string} collectionId + * @param {object} params + */ + constructor(connection, collectionId, params) { + super(connection, `/collections/${collectionId}/items`, "features", null, params); + } + + /** + * Migrates the STAC item to the latest version. + * + * @param {object} obj + * @returns {Item} + */ + _createObject(obj) { + if (obj.stac_version) { + return StacMigrate.item(obj); + } + return obj; + } +} + +module.exports = { + Pages, + CollectionPages, + ItemPages, + JobPages, + ProcessPages, + ServicePages, + UserFilePages +} diff --git a/src/typedefs.js b/src/typedefs.js index ce3249f..f969e85 100644 --- a/src/typedefs.js +++ b/src/typedefs.js @@ -213,12 +213,11 @@ /** * An array, but enriched with additional details from an openEO API response. * - * Adds three properties: `url`, `links` and `federation:missing`. + * Adds three properties: `links` and `federation:missing`. * * @typedef ResponseArray * @augments Array * @type {Array.<*>} - * @property {string} url The URL from which the data was requested. * @property {Array.} links A list of related links. * @property {Array.} ["federation:missing"] A list of backends from the federation that are missing in the response data. */ diff --git a/tests/earthengine.test.js b/tests/earthengine.test.js index 349057c..7de71d0 100644 --- a/tests/earthengine.test.js +++ b/tests/earthengine.test.js @@ -292,7 +292,7 @@ describe('GEE back-end', () => { expect(pgs).not.toBeNull(); expect(pgs).toHaveLength(0); expect(pgs instanceof Array).toBeTruthy(); - expect(pgs.links).toEqual([]); + expect(pgs.links).toHaveLength(1); expect(pgs['federation:missing']).toEqual([]); }); @@ -427,7 +427,7 @@ describe('GEE back-end', () => { expect(jobs).not.toBeNull(); expect(jobs).toHaveLength(0); expect(jobs instanceof Array).toBeTruthy(); - expect(jobs.links).toEqual([]); + expect(jobs.links).toHaveLength(1); expect(jobs['federation:missing']).toEqual([]); }); @@ -587,7 +587,7 @@ describe('GEE back-end', () => { expect(svcs).not.toBeNull(); expect(svcs).toHaveLength(0); expect(svcs instanceof Array).toBeTruthy(); - expect(svcs.links).toEqual([]); + expect(svcs.links).toHaveLength(1); expect(svcs['federation:missing']).toEqual([]); }); @@ -670,7 +670,7 @@ describe('GEE back-end', () => { expect(files).not.toBeNull(); expect(files).toHaveLength(0); expect(files instanceof Array).toBeTruthy(); - expect(files.links).toEqual([]); + expect(files.links).toHaveLength(1); expect(files['federation:missing']).toEqual([]); }); diff --git a/tests/eodc.test.js b/tests/eodc.test.js index 84e3a63..b5d847e 100644 --- a/tests/eodc.test.js +++ b/tests/eodc.test.js @@ -7,11 +7,12 @@ jest.setTimeout(30*1000); describe('EODC back-end', () => { const TESTBACKEND = 'https://openeo.eodc.eu'; - const TESTCOLLECTION = 'boa_landsat_8'; + const TESTCOLLECTION = 'SENTINEL2_L2A'; describe('Request Collection Items', () => { let con; + // Skip this test for now, EODC back-end has no CORS headers test.skip('Connect', async () => { con = await OpenEO.connect(TESTBACKEND); expect(con instanceof Connection).toBeTruthy(); @@ -19,14 +20,16 @@ describe('EODC back-end', () => { expect(cap instanceof Capabilities).toBeTruthy(); }); + // Skip this test for now, EODC back-end has no CORS headers test.skip('Check collection', async () => { let col = await con.describeCollection(TESTCOLLECTION); + console.log(col.id); expect(col.id).toBe(TESTCOLLECTION); expect(col).toHaveProperty("links"); expect(typeof con._getLinkHref(col.links, 'items')).toBe("string"); }); - // Skip this test for now, EODC back-end is not responding + // Skip this test for now, EODC back-end requires Auth test.skip('Request three pages of items', async () => { let page = 1; let spatialExtent = [5.0,45.0,20.0,50.0];