Skip to content

Commit

Permalink
Merge branch 'main' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
fossygirl authored Jul 5, 2023
2 parents e3bd404 + 613cced commit 3a1c932
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 61 deletions.
35 changes: 22 additions & 13 deletions src/backend/distributed/commands/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,29 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString)
}

/*
* Register the tenant schema on the coordinator and save the command
* to register it on the workers.
* Skip if the schema is already inserted into pg_dist_schema.
* This could occur when trying to create an already existing schema,
* with IF NOT EXISTS clause.
*/
int shardCount = 1;
int replicationFactor = 1;
Oid distributionColumnType = InvalidOid;
Oid distributionColumnCollation = InvalidOid;
uint32 colocationId = CreateColocationGroup(
shardCount, replicationFactor, distributionColumnType,
distributionColumnCollation);

InsertTenantSchemaLocally(schemaId, colocationId);

commands = lappend(commands, TenantSchemaInsertCommand(schemaId, colocationId));
if (!IsTenantSchema(schemaId))
{
/*
* Register the tenant schema on the coordinator and save the command
* to register it on the workers.
*/
int shardCount = 1;
int replicationFactor = 1;
Oid distributionColumnType = InvalidOid;
Oid distributionColumnCollation = InvalidOid;
uint32 colocationId = CreateColocationGroup(
shardCount, replicationFactor, distributionColumnType,
distributionColumnCollation);

InsertTenantSchemaLocally(schemaId, colocationId);

commands = lappend(commands, TenantSchemaInsertCommand(schemaId,
colocationId));
}
}

commands = lappend(commands, ENABLE_DDL_PROPAGATION);
Expand Down
9 changes: 9 additions & 0 deletions src/backend/distributed/commands/table.c
Original file line number Diff line number Diff line change
Expand Up @@ -4154,6 +4154,15 @@ ConvertNewTableIfNecessary(Node *createStmt)
return;
}

/*
* We allow mat views in a distributed schema but do not make them a tenant
* table. We should skip converting them.
*/
if (get_rel_relkind(createdRelationId) == RELKIND_MATVIEW)
{
return;
}

CreateTenantSchemaTable(createdRelationId);
}

Expand Down
54 changes: 37 additions & 17 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
Expand All @@ -104,7 +105,7 @@ static List * OpenConnectionToNodes(List *workerNodeList);
static void ReceiveShardIdAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval);

static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
Expand Down Expand Up @@ -916,6 +917,12 @@ static char *
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
{
StringInfo allShardStatisticsQuery = makeStringInfo();
bool insertedValues = false;

appendStringInfoString(allShardStatisticsQuery, "SELECT shard_id, ");
appendStringInfo(allShardStatisticsQuery, PG_TOTAL_RELATION_SIZE_FUNCTION,
"table_name");
appendStringInfoString(allShardStatisticsQuery, " FROM (VALUES ");

Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIds)
Expand All @@ -930,46 +937,60 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
{
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
relationId);
char *shardStatisticsQuery =
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
if (list_length(shardIntervalsOnNode) == 0)
{
relation_close(relation, AccessShareLock);
continue;
}
char *shardIdNameValues =
GenerateShardIdNameValuesForShardList(shardIntervalsOnNode,
!insertedValues);
insertedValues = true;
appendStringInfoString(allShardStatisticsQuery, shardIdNameValues);
relation_close(relation, AccessShareLock);
}
}

/* Add a dummy entry so that UNION ALL doesn't complain */
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, 0::bigint;");
if (!insertedValues)
{
return "SELECT 0 AS shard_id, '' AS table_name LIMIT 0";
}

appendStringInfoString(allShardStatisticsQuery, ") t(shard_id, table_name) "
"WHERE to_regclass(table_name) IS NOT NULL");
return allShardStatisticsQuery->data;
}


/*
* GenerateShardStatisticsQueryForShardList generates a query that returns:
* SELECT shard_id, shard_name, shard_size for all shards in the list
* GenerateShardIdNameValuesForShardList generates a list of (shard_id, shard_name) values
* for all shards in the list
*/
static char *
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
GenerateShardIdNameValuesForShardList(List *shardIntervalList, bool firstValue)
{
StringInfo selectQuery = makeStringInfo();

ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
AppendShardSizeQuery(selectQuery, shardInterval);
appendStringInfo(selectQuery, " UNION ALL ");
if (!firstValue)
{
appendStringInfoString(selectQuery, ", ");
}
firstValue = false;
AppendShardIdNameValues(selectQuery, shardInterval);
}

return selectQuery->data;
}


/*
* AppendShardSizeQuery appends a query in the following form to selectQuery
* SELECT shard_id, shard_name, shard_size
* AppendShardIdNameValues appends (shard_id, shard_name) for shard
*/
static void
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)
{
uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
Expand All @@ -981,8 +1002,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);

appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, "(" UINT64_FORMAT ", %s)", shardId, quotedShardName);
}


Expand Down
21 changes: 10 additions & 11 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -2489,17 +2489,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);


DefineCustomIntVariable(
"citus.stat_tenants_sample_rate_for_new_tenants",
gettext_noop("Sampling rate for new tenants in citus_stat_tenants."),
NULL,
&StatTenantsSampleRateForNewTenants,
100, 1, 100,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomEnumVariable(
"citus.stat_tenants_track",
gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."),
Expand All @@ -2513,6 +2502,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomRealVariable(
"citus.stat_tenants_untracked_sample_rate",
gettext_noop("Sampling rate for new tenants in citus_stat_tenants."),
NULL,
&StatTenantsSampleRateForNewTenants,
1, 0, 1,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Usage of this GUC is highly discouraged, please read the long "
Expand Down
9 changes: 9 additions & 0 deletions src/backend/distributed/sql/citus--11.3-1--12.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ GRANT SELECT ON pg_catalog.pg_dist_schema TO public;
#include "udfs/citus_drop_trigger/12.0-1.sql"

DROP VIEW citus_shards;
DROP VIEW IF EXISTS pg_catalog.citus_tables;
DROP VIEW IF EXISTS public.citus_tables;
DROP FUNCTION citus_shard_sizes;
#include "udfs/citus_shard_sizes/12.0-1.sql"

Expand All @@ -42,3 +44,10 @@ DROP FUNCTION citus_shard_sizes;

#include "udfs/drop_old_time_partitions/12.0-1.sql"
#include "udfs/get_missing_time_partition_ranges/12.0-1.sql"

-- Update the default rebalance strategy to 'by_disk_size', but only if the
-- default is currently 'by_shard_count'
SELECT citus_set_default_rebalance_strategy(name)
FROM pg_dist_rebalance_strategy
WHERE name = 'by_disk_size'
AND (SELECT default_strategy FROM pg_dist_rebalance_strategy WHERE name = 'by_shard_count');
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ DROP FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(Oid, t
DROP VIEW IF EXISTS public.citus_schemas;
DROP VIEW IF EXISTS pg_catalog.citus_schemas;

DROP VIEW IF EXISTS public.citus_tables;
DROP VIEW IF EXISTS pg_catalog.citus_tables;

DROP VIEW pg_catalog.citus_shards;
DROP FUNCTION pg_catalog.citus_shard_sizes;
#include "../udfs/citus_shard_sizes/10.0-1.sql"
Expand Down Expand Up @@ -76,3 +79,8 @@ DROP FUNCTION pg_catalog.citus_stat_tenants_local_internal(

#include "../udfs/drop_old_time_partitions/10.2-1.sql"
#include "../udfs/get_missing_time_partition_ranges/10.2-1.sql"

-- This explicitly does not reset the rebalance strategy to by_shard_count,
-- because there's no way of knowing if the rebalance strategy before the
-- upgrade was by_disk_size or by_shard_count. And even in previous versions
-- by_disk_size is considered superior for quite some time.
9 changes: 8 additions & 1 deletion src/backend/distributed/sql/udfs/citus_tables/12.0-1.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/backend/distributed/sql/udfs/citus_tables/latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ citus_tables_create_query=$CTCQ$
END AS citus_table_type,
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS distribution_column,
colocationid AS colocation_id,
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS table_size,
pg_size_pretty(table_sizes.table_size) AS table_size,
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS shard_count,
pg_get_userbyid(relowner) AS table_owner,
amname AS access_method
Expand All @@ -24,6 +24,13 @@ citus_tables_create_query=$CTCQ$
pg_class c ON (p.logicalrelid = c.oid)
LEFT JOIN
pg_am a ON (a.oid = c.relam)
JOIN
(
SELECT ds.logicalrelid AS table_id, SUM(css.size) AS table_size
FROM citus_shard_sizes() css, pg_dist_shard ds
WHERE css.shard_id = ds.shardid
GROUP BY ds.logicalrelid
) table_sizes ON (table_sizes.table_id = p.logicalrelid)
WHERE
-- filter out tables owned by extensions
logicalrelid NOT IN (
Expand Down
22 changes: 19 additions & 3 deletions src/backend/distributed/utils/citus_stat_tenants.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

#include <time.h>

#if (PG_VERSION_NUM >= PG_VERSION_15)
#include "common/pg_prng.h"
#endif

static void AttributeMetricsIfApplicable(void);

ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
Expand Down Expand Up @@ -80,7 +84,7 @@ int StatTenantsLogLevel = CITUS_LOG_LEVEL_OFF;
int StatTenantsPeriod = (time_t) 60;
int StatTenantsLimit = 100;
int StatTenantsTrack = STAT_TENANTS_TRACK_NONE;
int StatTenantsSampleRateForNewTenants = 100;
double StatTenantsSampleRateForNewTenants = 1;

PG_FUNCTION_INFO_V1(citus_stat_tenants_local);
PG_FUNCTION_INFO_V1(citus_stat_tenants_local_reset);
Expand Down Expand Up @@ -281,13 +285,25 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)

MultiTenantMonitor *monitor = GetMultiTenantMonitor();
bool found = false;

/* Acquire the lock in shared mode to check if the tenant is already in the hash table. */
LWLockAcquire(&monitor->lock, LW_SHARED);

hash_search(monitor->tenants, &key, HASH_FIND, &found);

LWLockRelease(&monitor->lock);

/* If the tenant is not found in the hash table, we will track the query with a probability of StatTenantsSampleRateForNewTenants. */
if (!found)
{
int randomValue = rand() % 100;
bool shouldTrackQuery = randomValue < StatTenantsSampleRateForNewTenants;
#if (PG_VERSION_NUM >= PG_VERSION_15)
double randomValue = pg_prng_double(&pg_global_prng_state);
#else

/* Generate a random double between 0 and 1 */
double randomValue = (double) random() / MAX_RANDOM_VALUE;
#endif
bool shouldTrackQuery = randomValue <= StatTenantsSampleRateForNewTenants;
if (!shouldTrackQuery)
{
return;
Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/utils/citus_stat_tenants.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ extern int StatTenantsLogLevel;
extern int StatTenantsPeriod;
extern int StatTenantsLimit;
extern int StatTenantsTrack;
extern int StatTenantsSampleRateForNewTenants;
extern double StatTenantsSampleRateForNewTenants;

#endif /*CITUS_ATTRIBUTE_H */
Loading

0 comments on commit 3a1c932

Please sign in to comment.