Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support huge amount of datacubes #1004

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions src/client/utils/ajax/ajax.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import axios from "axios";
import { Dataset, DatasetJS, Environment, Executor, Expression } from "plywood";
import { ClientAppSettings } from "../../../common/models/app-settings/app-settings";
import { SerializedDataCube } from "../../../common/models/data-cube/data-cube";
import { isEnabled, Oauth } from "../../../common/models/oauth/oauth";
import { ClientSources, SerializedSources } from "../../../common/models/sources/sources";
import { ClientSources } from "../../../common/models/sources/sources";
import { deserialize } from "../../deserializers/sources";
import { getToken, mapOauthError } from "../../oauth/oauth";

Expand Down Expand Up @@ -54,12 +55,12 @@ export class Ajax {

const headers = Ajax.headers(oauth);
return axios({ method, url, data, timeout, validateStatus, headers })
.then(res => {
return res.data;
})
.catch(error => {
throw mapOauthError(oauth, error);
});
.then(res => {
return res.data;
})
.catch(error => {
throw mapOauthError(oauth, error);
});
}

static queryUrlExecutorFactory(dataCubeName: string, { oauth, clientTimeout: timeout }: ClientAppSettings): Executor {
Expand All @@ -69,17 +70,26 @@ export class Ajax {
const timezone = env ? env.timezone : null;
const data = { dataCube: dataCubeName, expression: ex.toJS(), timezone };
return Ajax.query<{ result: DatasetJS }>({ method, url, timeout, data }, oauth)
.then(res => Dataset.fromJS(res.result));
.then(res => Dataset.fromJS(res.result));
};
}

static sources(appSettings: ClientAppSettings): Promise<ClientSources> {
const headers = Ajax.headers(appSettings.oauth);
return axios.get<SerializedSources>("sources", { headers })
.then(resp => resp.data)
.catch(error => {
throw mapOauthError(appSettings.oauth, error);
})
.then(sourcesJS => deserialize(sourcesJS, appSettings));
static async sources(appSettings: ClientAppSettings): Promise<ClientSources> {
try {
const headers = Ajax.headers(appSettings.oauth);
const clusters = (await axios.get("sources/clusters", { headers })).data;
bikov marked this conversation as resolved.
Show resolved Hide resolved
let dataCubesResult: SerializedDataCube[] = [];
let isDone = false;
let batch = 0;
bikov marked this conversation as resolved.
Show resolved Hide resolved
while (!isDone) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could extract this to async iterator and with for await ... of. That's very loose idea though!

const { dataCubes, isDone: isDoneResult } = (await axios.get(`sources/dataCubes?batch=${batch}`, { headers })).data;
dataCubesResult = [...dataCubesResult, ...dataCubes];
isDone = isDoneResult;
batch += 1;
}
return deserialize({ clusters, dataCubes: dataCubesResult }, appSettings);
} catch (e) {
throw mapOauthError(appSettings.oauth, e);
}
}
}
20 changes: 1 addition & 19 deletions src/common/models/sources/sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
Cluster,
ClusterJS,
fromConfig as clusterFromConfig,
serialize as serializeCluster,
SerializedCluster
} from "../cluster/cluster";
import { findCluster } from "../cluster/find-cluster";
Expand All @@ -31,10 +30,9 @@ import {
DataCube,
DataCubeJS,
fromConfig as dataCubeFromConfig,
serialize as serializeDataCube,
SerializedDataCube
} from "../data-cube/data-cube";
import { isQueryable, QueryableDataCube } from "../data-cube/queryable-data-cube";
import { QueryableDataCube } from "../data-cube/queryable-data-cube";

export interface SourcesJS {
clusters?: ClusterJS[];
Expand Down Expand Up @@ -96,22 +94,6 @@ export function fromConfig(config: SourcesJS, logger: Logger): Sources {
};
}

export function serialize({
clusters: serverClusters,
dataCubes: serverDataCubes
}: Sources): SerializedSources {
const clusters = serverClusters.map(serializeCluster);

const dataCubes = serverDataCubes
.filter(dc => isQueryable(dc))
.map(serializeDataCube);

return {
clusters,
dataCubes
};
}

export function getDataCubesForCluster(sources: Sources, clusterName: string): DataCube[] {
return sources.dataCubes.filter(dataCube => dataCube.clusterName === clusterName);
}
Expand Down
66 changes: 45 additions & 21 deletions src/server/routes/sources/sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,57 @@

import { Request, Response, Router } from "express";
import { errorToMessage } from "../../../common/logger/logger";
import { serialize } from "../../../common/models/sources/sources";
import { serialize as serializeCluster } from "../../../common/models/cluster/cluster";
import { serialize as serializeDataCube } from "../../../common/models/data-cube/data-cube";
import { isQueryable } from "../../../common/models/data-cube/queryable-data-cube";
import { checkAccess } from "../../utils/datacube-guard/datacube-guard";
import { SettingsManager } from "../../utils/settings-manager/settings-manager";

export function sourcesRouter(settings: Pick<SettingsManager, "getSources" | "logger">) {

const logger = settings.logger.setLoggerId("Sources");
const logger = settings.logger.setLoggerId("Sources");

const router = Router();
const router = Router();
const MAX_DATA_CUBES_IN_REQUEST = 1000;
router.get("/clusters", async (req: Request, res: Response) => {
try {
const { clusters } = await settings.getSources();
res.json(clusters.map(serializeCluster));
} catch (error) {
logger.error(errorToMessage(error));

router.get("/", async (req: Request, res: Response) => {
res.status(500).send({
error: "Can't fetch settings",
message: error.message
});
}
});
router.get("/dataCubes", async (req: Request, res: Response) => {
try {
const { dataCubes } = await settings.getSources();
const relevantDatasources = dataCubes.filter(dataCube => checkAccess(dataCube, req.headers))
bikov marked this conversation as resolved.
Show resolved Hide resolved
.filter(dc => isQueryable(dc))
bikov marked this conversation as resolved.
Show resolved Hide resolved
.map(serializeDataCube);
if (relevantDatasources.length < MAX_DATA_CUBES_IN_REQUEST) {
res.json({ dataCubes: relevantDatasources, isDone: true });
} else {
const currentBatchNumber = (req.query["batch"] && parseInt(req.query["batch"] as string, 10)) || 0;
const dataSourcesStart = currentBatchNumber * MAX_DATA_CUBES_IN_REQUEST;
const dataSourcesEnd = dataSourcesStart + MAX_DATA_CUBES_IN_REQUEST;
const isDone = dataSourcesEnd >= dataCubes.length;
res.json({
dataCubes: relevantDatasources.slice(dataSourcesStart, dataSourcesEnd),
isDone
});
}
} catch (error) {
logger.error(errorToMessage(error));
res.status(500).send({
error: "Can't fetch settings",
message: error.message
});
}
});

try {
const { clusters, dataCubes } = await settings.getSources();
res.json(serialize({
clusters,
dataCubes: dataCubes.filter( dataCube => checkAccess(dataCube, req.headers) )
}));
} catch (error) {
logger.error(errorToMessage(error));

res.status(500).send({
error: "Can't fetch settings",
message: error.message
});
}
});

return router;
return router;
}