Skip to content

Commit

Permalink
Rework pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
m-mohr committed Jan 4, 2025
1 parent 44cd0e3 commit 57ebbe8
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 234 deletions.
262 changes: 37 additions & 225 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const Service = require('./service');

const Builder = require('./builder/builder');
const BuilderNode = require('./builder/node');
const { CollectionPages, ItemPages, JobPages, ProcessPages, ServicePages, UserFilePages } = require('./pages');

const CONFORMANCE_RELS = [
'conformance',
Expand Down Expand Up @@ -228,39 +229,20 @@ class Connection {
* @throws {Error}
*/
async listCollections() {
const firstPage = await this.paginateCollections(null).next();
return firstPage.value;
const pages = this.paginateCollections(null);
return await pages.nextPage([], false);
}

/**
* Paginate through the collections available on the back-end.
*
* The collections returned always complies to the latest STAC version (currently 1.0.0).
* This function adds a self link to the response if not present.
* The collections returned always comply to the latest STAC version (currently 1.0.0).
*
* @async
* @param {?number} [limit=50] - The number of collections per request/page as integer. If `null`, requests all collections.
* @yields {Promise<Collections>} A response compatible to the API specification.
* @throws {Error}
* @returns {CollectionPages} A paged list of collections.
*/
async * paginateCollections(limit = 50) {
let nextUrl = '/collections';
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);
if (Utils.isObject(response.data) && Array.isArray(response.data.collections)) {
response.data.collections = response.data.collections.map(collection => {
if (collection.stac_version) {
return StacMigrate.collection(collection);
}
return collection;
});
}
yield this._addSelfLink(response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateCollections(limit = 50) {
return new CollectionPages(this, limit);
}

/**
Expand All @@ -284,7 +266,7 @@ class Connection {
}

/**
* Loads items for a specific image collection.
* Paginate through items for a specific collection.
*
* May not be available for all collections.
*
Expand All @@ -302,11 +284,10 @@ class Connection {
* each must be either an RFC 3339 compatible string or a Date object.
* Also supports open intervals by setting one of the boundaries to `null`, but never both.
* @param {?number} [limit=null] - The amount of items per request/page as integer. If `null` (default), the back-end decides.
* @yields {Promise<ItemCollection>} A response compatible to the API specification.
* @returns {Promise<ItemPages>} A response compatible to the API specification.
* @throws {Error}
*/
async * listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) {
let nextUrl = '/collections/' + collectionId + '/items';
listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) {
let params = {};
if (Array.isArray(spatialExtent)) {
params.bbox = spatialExtent.join(',');
Expand All @@ -327,20 +308,7 @@ class Connection {
if (limit > 0) {
params.limit = limit;
}
while(nextUrl) {
const response = await this._get(nextUrl, params);
if (Utils.isObject(response.data) && Array.isArray(response.data.features)) {
response.data.features = response.data.features.map(item => {
if (item.stac_version) {
return StacMigrate.item(item);
}
return item;
});
}
yield response.data;
nextUrl = this._getNextLink(response);
params = null;
}
return new ItemPages(this, collectionId, params, limit);
}

/**
Expand Down Expand Up @@ -379,8 +347,8 @@ class Connection {
* @throws {Error}
*/
async listProcesses(namespace = null) {
const firstPage = await this.paginateProcesses(namespace, null).next();
return firstPage.value;
const pages = this.paginateProcesses(namespace);
return await pages.nextPage([], false);
}

/**
Expand All @@ -392,39 +360,12 @@ class Connection {
* Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given.
* The namespaces are then listed in the property `namespaces`.
*
* This function adds a self link to the response if not present.
*
* @async
* @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL!
* @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes.
* @yields {Promise<Processes>} - A response compatible to the API specification.
* @throws {Error}
* @returns {ProcessPages} A paged list of processes.
*/
async * paginateProcesses(namespace = null, limit = 50) {
if (!namespace) {
namespace = 'backend';
}

let nextUrl = (namespace === 'backend') ? '/processes' : `/processes/${this.normalizeNamespace(namespace)}`;
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);

if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) {
throw new Error('Invalid response received for processes');
}

// Store processes in cache
this.processes.addAll(response.data.processes, namespace);
for (let i in response.data.processes) {
response.data.processes[i] = this.processes.get(response.data.processes[i].id, namespace);
}

yield this._addSelfLink(response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateProcesses(namespace = null, limit = 50) {
return new ProcessPages(this, limit, namespace);
}

/**
Expand Down Expand Up @@ -697,31 +638,18 @@ class Connection {
* @throws {Error}
*/
async listFiles() {
const firstPage = await this.paginateFiles(null).next();
return firstPage.value;
const pages = this.paginateFiles(null);
return await pages.nextPage();
}

/**
* Paginate through the files from the user workspace.
*
* @async
* @param {?number} [limit=50] - The number of files per request/page as integer. If `null`, requests all files.
* @yields {Promise<ResponseArray.<UserFile>>} A list of files.
* @throws {Error}
* @returns {ServicePages} A paged list of files.
*/
async * paginateFiles(limit = 50) {
let nextUrl = '/files';
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);
const files = response.data.files.map(
f => new UserFile(this, f.path).setAll(f)
);
yield this._toResponseArray(files, response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateFiles(limit = 50) {
return new UserFilePages(this, limit);
}

/**
Expand Down Expand Up @@ -822,47 +750,18 @@ class Connection {
* @throws {Error}
*/
async listUserProcesses(oldProcesses = []) {
const firstPage = await this.paginateUserProcesses(null, oldProcesses).next();
return firstPage.value;
const pages = this.paginateUserProcesses(null);
return await pages.nextPage(oldProcesses);
}

/**
* Paginates through the user-defined processes of the authenticated user.
*
* @async
* @param {?number} [limit=50] - The number of processes per request/page as integer. If `null`, requests all processes.
* @param {Array.<UserProcess>} [oldProcesses=[]] - A list of existing user-defined processes to update.
* @yields {Promise<ResponseArray.<UserProcess>>} A list of user-defined processes.
* @throws {Error}
* @returns {ProcessPages} A paged list of user-defined processes.
*/
async * paginateUserProcesses(limit = 50, oldProcesses = []) {
let nextUrl = '/process_graphs';
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);

if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) {
throw new Error('Invalid response received for processes');
}

// Update existing processes if needed
const newProcesses = response.data.processes.map(newProcess => {
let process = oldProcesses.find(oldProcess => oldProcess.id === newProcess.id);
if (!process) {
process = new UserProcess(this, newProcess.id);
}
return process.setAll(newProcess);
});

// Store plain JS variant (i.e. no UserProcess objects involved) of processes in cache
const jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes;
this.processes.addAll(jsonProcesses, 'user');

yield this._toResponseArray(newProcesses, response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateUserProcesses(limit = 50) {
return this.paginateProcesses('user', limit);
}

/**
Expand Down Expand Up @@ -982,37 +881,19 @@ class Connection {
* @throws {Error}
*/
async listJobs(oldJobs = []) {
const firstPage = await this.paginateJobs(null, oldJobs).next();
return firstPage.value;

const pages = this.paginateJobs(null);
const firstPage = await pages.nextPage(oldJobs);
return firstPage;
}

/**
* Paginate through the batch jobs of the authenticated user.
*
* @async
* @param {?number} [limit=50] - The number of jobs per request/page as integer. If `null`, requests all jobs.
* @param {Array.<Job>} [oldJobs=[]] - A list of existing jobs to update.
* @yields {Promise<ResponseArray.<Job>>} A list of jobs.
* @throws {Error}
* @returns {JobPages} A paged list of jobs.
*/
async * paginateJobs(limit = 50, oldJobs = []) {
let nextUrl = '/jobs';
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);
const newJobs = response.data.jobs.map(newJob => {
let job = oldJobs.find(oldJob => oldJob.id === newJob.id);
if (!job) {
job = new Job(this, newJob.id);
}
return job.setAll(newJob);
});
yield this._toResponseArray(newJobs, response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateJobs(limit = 50) {
return new JobPages(this, limit);
}

/**
Expand Down Expand Up @@ -1071,36 +952,18 @@ class Connection {
* @throws {Error}
*/
async listServices(oldServices = []) {
const firstPage = await this.paginateServices(null, oldServices).next();
return firstPage.value;
const pages = this.paginateServices(null);
return await pages.nextPage(oldServices);
}

/**
* Paginate through the secondary web services of the authenticated user.
*
* @async
* @param {?number} [limit=50] - The number of services per request/page as integer. If `null` (default), requests all services.
* @param {Array.<Service>} [oldServices=[]] - A list of existing services to update.
* @yields {Promise<ResponseArray.<Job>>} A list of services.
* @throws {Error}
* @returns {ServicePages} A paged list of services.
*/
async * paginateServices(limit = 50, oldServices = []) {
let nextUrl = '/services';
if (limit > 0) {
nextUrl = nextUrl + '?limit=' + limit;
}
do {
const response = await this._get(nextUrl);
const newServices = response.data.services.map(newService => {
let service = oldServices.find(oldService => oldService.id === newService.id);
if (!service) {
service = new Service(this, newService.id);
}
return service.setAll(newService);
});
yield this._toResponseArray(newServices, response.data, nextUrl);
nextUrl = this._getNextLink(response);
} while (nextUrl);
paginateServices(limit = 50) {
return new ServicePages(this, limit);
}

/**
Expand Down Expand Up @@ -1155,24 +1018,6 @@ class Connection {
return await service.describeService();
}

/**
* Adds additional response details to the array.
*
* Adds links and federation:missing.
*
* @protected
* @param {Array.<*>} arr
* @param {object.<string, *>} response
* @param {string} selfUrl
* @returns {ResponseArray}
*/
_toResponseArray(arr, response, selfUrl) {
arr.url = selfUrl;
arr.links = Array.isArray(response.links) ? response.links : [];
arr['federation:missing'] = Array.isArray(response['federation:missing']) ? response['federation:missing'] : [];
return arr;
}

/**
* Get the a link with the given rel type.
*
Expand All @@ -1195,39 +1040,6 @@ class Connection {
return null;
}

/**
* Get the URL of the next page from a response.
*
* @protected
* @param {AxiosResponse} response
* @returns {string | null}
*/
_getNextLink(response) {
const links = this.makeLinksAbsolute(response.data.links, response);
return this._getLinkHref(links, 'next');
}

/**
* Add a self link to the response if not present.
*
* @param {object} data - The body of the response as an object.
* @param {string} selfUrl - The URL of the current request.
* @returns {object} The modified object.
*/
_addSelfLink(data, selfUrl) {
if (!Utils.isObject(data)) {
return data;
}
if (!Array.isArray(data.links)) {
data.links = [];
}
const selfLink = data.links.find(l => Utils.isObject(l) && l.rel === 'self');
if (!selfLink) {
data.links.push({rel: 'self', href: selfUrl});
}
return data;
}

/**
* Makes all links in the list absolute.
*
Expand Down
Loading

0 comments on commit 57ebbe8

Please sign in to comment.