Skip to content

Commit

Permalink
Refactor BucketParameterQuerier implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 5, 2025
1 parent dece5f7 commit b5d3a8b
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 112 deletions.
60 changes: 60 additions & 0 deletions packages/sync-rules/src/BucketParameterQuerier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { BucketDescription } from './BucketDescription.js';
import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js';

/**
* Represents a set of parameter queries for a specific request.
*/
export interface BucketParameterQuerier {
/**
* These buckets do not change for the lifetime of the connection.
*
* This includes parameter queries such as:
*
* select request.user_id() as user_id()
* select value as project_id from json_each(request.jwt() -> 'project_ids')
*/
readonly staticBuckets: BucketDescription[];

/**
* True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used.
*
* If this is false, queryDynamicBucketDescriptions() will always return an empty array.
*/
readonly hasDynamicBuckets: boolean;

/**
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
*
* The ParameterLookupSource should perform the query for the current checkpoint - that is not passed
* as a parameter.
*
* This includes parameter queries such as:
*
* select id as user_id from users where users.id = request.user_id()
*/
queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise<BucketDescription[]>;
}

export interface ParameterLookupSource {
getParameterSets: (lookups: SqliteJsonValue[][]) => Promise<SqliteJsonRow[]>;
}

export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
parameters: RequestParameters;
}

export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]): BucketParameterQuerier {
return {
staticBuckets: queriers.flatMap((q) => q.staticBuckets),
hasDynamicBuckets: queriers.some((q) => q.hasDynamicBuckets),
async queryDynamicBucketDescriptions(source: ParameterLookupSource) {
let results: BucketDescription[] = [];
for (let q of queriers) {
if (q.hasDynamicBuckets) {
results.push(...(await q.queryDynamicBucketDescriptions(source)));
}
}
return results;
}
};
}
47 changes: 16 additions & 31 deletions packages/sync-rules/src/SqlBucketDescriptor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { BucketDescription } from './BucketDescription.js';
import {
BucketParameterQuerier,
mergeBucketParameterQueriers,
QueryBucketDescriptorOptions
} from './BucketParameterQuerier.js';
import { IdSequence } from './IdSequence.js';
import { SourceTableInterface } from './SourceTableInterface.js';
import { SqlDataQuery } from './SqlDataQuery.js';
Expand All @@ -8,12 +13,9 @@ import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
import { TablePattern } from './TablePattern.js';
import { SqlRuleError } from './errors.js';
import {
BucketParameterQuerier,
EvaluateRowOptions,
EvaluatedParametersResult,
EvaluationResult,
ParameterLookupSource,
QueryBucketIdOptions,
QueryParseOptions,
RequestParameters,
SqliteRow
Expand Down Expand Up @@ -112,24 +114,18 @@ export class SqlBucketDescriptor {

getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier {
const staticBuckets = this.getStaticBucketDescriptions(parameters);
const hasDynamicBuckets = this.parameter_queries.length > 0;

return {
const staticQuerier = {
staticBuckets,
hasDynamicBuckets,
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
let results: BucketDescription[] = [];
for (let query of this.parameter_queries) {
results.push(
...(await query.queryBucketDescriptions({
getParameterSets: source.getParameterSets.bind(source),
parameters
}))
);
}
return results;
}
};
hasDynamicBuckets: false,
queryDynamicBucketDescriptions: async () => []
} satisfies BucketParameterQuerier;

if (this.parameter_queries.length == 0) {
return staticQuerier;
}

const dynamicQueriers = this.parameter_queries.map((query) => query.getBucketParameterQuerier(parameters));
return mergeBucketParameterQueriers([staticQuerier, ...dynamicQueriers]);
}

getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] {
Expand All @@ -140,17 +136,6 @@ export class SqlBucketDescriptor {
return results;
}

/**
* @deprecated Use getBucketParameterQuerier() instead.
*/
async queryBucketDescriptions(options: QueryBucketIdOptions): Promise<BucketDescription[]> {
let result = this.getStaticBucketDescriptions(options.parameters);
for (let query of this.parameter_queries) {
result.push(...(await query.queryBucketDescriptions(options)));
}
return result;
}

getSourceTables(): Set<TablePattern> {
let result = new Set<TablePattern>();
for (let query of this.parameter_queries) {
Expand Down
36 changes: 22 additions & 14 deletions packages/sync-rules/src/SqlParameterQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
InputParameter,
ParameterMatchClause,
ParameterValueClause,
QueryBucketIdOptions,
QueryParseOptions,
QuerySchema,
RequestParameters,
Expand All @@ -24,6 +23,11 @@ import {
import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js';
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js';
import {
BucketParameterQuerier,
ParameterLookupSource,
QueryBucketDescriptorOptions
} from './BucketParameterQuerier.js';

/**
* Represents a parameter query, such as:
Expand Down Expand Up @@ -362,22 +366,26 @@ export class SqlParameterQuery {
}
}

/**
* Given sync parameters (token and user parameters), return bucket ids and priorities.
*
* This is done in three steps:
* 1. Given the parameters, get lookups we need to perform on the database.
* 2. Perform the lookups, returning parameter sets (partial rows).
* 3. Given the parameter sets, resolve bucket ids.
*/
async queryBucketDescriptions(options: QueryBucketIdOptions): Promise<BucketDescription[]> {
let lookups = this.getLookups(options.parameters);
getBucketParameterQuerier(requestParameters: RequestParameters): BucketParameterQuerier {
const lookups = this.getLookups(requestParameters);
if (lookups.length == 0) {
return [];
// This typically happens when the query is pre-filtered using a where clause
// on the parameters, and does not depend on the database state.
return {
staticBuckets: [],
hasDynamicBuckets: false,
queryDynamicBucketDescriptions: async () => []
};
}

const parameters = await options.getParameterSets(lookups);
return this.resolveBucketDescriptions(parameters, options.parameters);
return {
staticBuckets: [],
hasDynamicBuckets: true,
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
const bucketParameters = await source.getParameterSets(lookups);
return this.resolveBucketDescriptions(bucketParameters, requestParameters);
}
};
}

get hasAuthenticatedBucketParameters(): boolean {
Expand Down
33 changes: 7 additions & 26 deletions packages/sync-rules/src/SqlSyncRules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { SourceTableInterface } from './SourceTableInterface.js';
import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js';
import { TablePattern } from './TablePattern.js';
import {
BucketParameterQuerier,
EvaluatedParameters,
EvaluatedParametersResult,
EvaluatedRow,
Expand All @@ -17,15 +16,18 @@ import {
isEvaluatedParameters,
isEvaluatedRow,
isEvaluationError,
ParameterLookupSource,
QueryBucketIdOptions,
QueryParseOptions,
RequestParameters,
SourceSchema,
SqliteRow,
SyncRules
} from './types.js';
import { BucketDescription } from './BucketDescription.js';
import {
BucketParameterQuerier,
mergeBucketParameterQueriers,
ParameterLookupSource
} from './BucketParameterQuerier.js';

const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES');

Expand Down Expand Up @@ -307,29 +309,8 @@ export class SqlSyncRules implements SyncRules {
}

getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier {
let staticBuckets: BucketDescription[] = [];

let dynamicQueriers: BucketParameterQuerier[] = [];
for (let bucket of this.bucket_descriptors) {
const querier = bucket.getBucketParameterQuerier(parameters);
staticBuckets.push(...querier.staticBuckets);
if (querier.hasDynamicBuckets) {
dynamicQueriers.push(querier);
}
}
const hasDynamicBuckets = dynamicQueriers.length > 0;

return {
staticBuckets,
hasDynamicBuckets,
queryDynamicBucketDescriptions: async (options: ParameterLookupSource) => {
let results: BucketDescription[] = [];
for (let querier of dynamicQueriers) {
results.push(...(await querier.queryDynamicBucketDescriptions(options)));
}
return results;
}
};
const queriers = this.bucket_descriptors.map((query) => query.getBucketParameterQuerier(parameters));
return mergeBucketParameterQueriers(queriers);
}

getSourceTables(): TablePattern[] {
Expand Down
41 changes: 0 additions & 41 deletions packages/sync-rules/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,6 @@ export type CompiledClause = RowValueClause | ParameterMatchClause | ParameterVa
*/
export type TrueIfParametersMatch = FilterParameters[];

export interface ParameterLookupSource {
getParameterSets: (lookups: SqliteJsonValue[][]) => Promise<SqliteJsonRow[]>;
}
export interface QueryBucketIdOptions extends ParameterLookupSource {
parameters: RequestParameters;
}

export interface SourceSchemaTable {
table: string;
getColumn(column: string): ColumnDefinition | undefined;
Expand All @@ -356,37 +349,3 @@ export interface SourceSchemaTable {
export interface SourceSchema {
getTables(sourceTable: TablePattern): SourceSchemaTable[];
}

/**
* Represents a set of parameter queries for a specific request.
*/
export interface BucketParameterQuerier {
/**
* These buckets do not change for the lifetime of the connection.
*
* This includes parameter queries such as:
*
* select request.user_id() as user_id()
* select value as project_id from json_each(request.jwt() -> 'project_ids')
*/
readonly staticBuckets: BucketDescription[];

/**
* True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used.
*
* If this is false, queryDynamicBucketDescriptions() will always return an empty array.
*/
readonly hasDynamicBuckets: boolean;

/**
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
*
* The ParameterLookupSource should perform the query for the current checkpoint - that is not passed
* as a parameter.
*
* This includes parameter queries such as:
*
* select id as user_id from users where users.id = request.user_id()
*/
queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise<BucketDescription[]>;
}

0 comments on commit b5d3a8b

Please sign in to comment.