Skip to content

Commit

Permalink
Fix citus_shard_sizes bug with many shards
Browse files Browse the repository at this point in the history
  • Loading branch information
halilozanakgul committed Jul 12, 2023
1 parent c18586c commit a90a3a7
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/backend/distributed/citus.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '11.3-1'
default_version = '11.3-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog
81 changes: 48 additions & 33 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
Expand All @@ -101,10 +102,10 @@ static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
static List * OpenConnectionToNodes(List *workerNodeList);
static void ReceiveShardNameAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
static void ReceiveShardIdAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval);

static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
Expand Down Expand Up @@ -253,7 +254,7 @@ GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, uint64 *availabl


/*
* citus_shard_sizes returns all shard names and their sizes.
* citus_shard_sizes returns all shard ids and their sizes.
*/
Datum
citus_shard_sizes(PG_FUNCTION_ARGS)
Expand All @@ -271,7 +272,7 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);

ReceiveShardNameAndSizeResults(connectionList, tupleStore, tupleDescriptor);
ReceiveShardIdAndSizeResults(connectionList, tupleStore, tupleDescriptor);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -446,12 +447,12 @@ GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds)


/*
* ReceiveShardNameAndSizeResults receives shard name and size results from the given
* ReceiveShardIdAndSizeResults receives shard id and size results from the given
* connection list.
*/
static void
ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor)
ReceiveShardIdAndSizeResults(List *connectionList, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
Expand Down Expand Up @@ -488,13 +489,9 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));

/* format is [0] shard id, [1] shard name, [2] size */
char *tableName = PQgetvalue(result, rowIndex, 1);
Datum resultStringDatum = CStringGetDatum(tableName);
Datum textDatum = DirectFunctionCall1(textin, resultStringDatum);

values[0] = textDatum;
values[1] = ParseIntField(result, rowIndex, 2);
/* format is [0] shard id, [1] size */
values[0] = ParseIntField(result, rowIndex, 0);
values[1] = ParseIntField(result, rowIndex, 1);

tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
Expand Down Expand Up @@ -920,6 +917,12 @@ static char *
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
{
StringInfo allShardStatisticsQuery = makeStringInfo();
bool insertedValues = false;

appendStringInfoString(allShardStatisticsQuery, "SELECT shard_id, ");
appendStringInfo(allShardStatisticsQuery, PG_TOTAL_RELATION_SIZE_FUNCTION,
"table_name");
appendStringInfoString(allShardStatisticsQuery, " FROM (VALUES ");

Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIds)
Expand All @@ -934,46 +937,60 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
{
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
relationId);
char *shardStatisticsQuery =
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
if (list_length(shardIntervalsOnNode) == 0)
{
relation_close(relation, AccessShareLock);
continue;
}
char *shardIdNameValues =
GenerateShardIdNameValuesForShardList(shardIntervalsOnNode,
!insertedValues);
insertedValues = true;
appendStringInfoString(allShardStatisticsQuery, shardIdNameValues);
relation_close(relation, AccessShareLock);
}
}

/* Add a dummy entry so that UNION ALL doesn't complain */
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, NULL::text, 0::bigint;");
if (!insertedValues)
{
return "SELECT 0 AS shard_id, '' AS table_name LIMIT 0";
}

appendStringInfoString(allShardStatisticsQuery, ") t(shard_id, table_name) "
"WHERE to_regclass(table_name) IS NOT NULL");
return allShardStatisticsQuery->data;
}


/*
* GenerateShardStatisticsQueryForShardList generates a query that returns:
* SELECT shard_id, shard_name, shard_size for all shards in the list
* GenerateShardIdNameValuesForShardList generates a list of (shard_id, shard_name) values
* for all shards in the list
*/
static char *
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
GenerateShardIdNameValuesForShardList(List *shardIntervalList, bool firstValue)
{
StringInfo selectQuery = makeStringInfo();

ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
AppendShardSizeQuery(selectQuery, shardInterval);
appendStringInfo(selectQuery, " UNION ALL ");
if (!firstValue)
{
appendStringInfoString(selectQuery, ", ");
}
firstValue = false;
AppendShardIdNameValues(selectQuery, shardInterval);
}

return selectQuery->data;
}


/*
* AppendShardSizeQuery appends a query in the following form to selectQuery
* SELECT shard_id, shard_name, shard_size
* AppendShardIdNameValues appends (shard_id, shard_name) for shard
*/
static void
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)
{
uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
Expand All @@ -985,9 +1002,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);

appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName);
appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, "(" UINT64_FORMAT ", %s)", shardId, quotedShardName);
}


Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/operations/stage_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId,
return false;
}

*shardSize = ParseIntField(result, rowIndex, 2);
*shardSize = ParseIntField(result, rowIndex, 1);

Check warning on line 858 in src/backend/distributed/operations/stage_protocol.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/stage_protocol.c#L858

Added line #L858 was not covered by tests
return true;
}

Expand Down
9 changes: 9 additions & 0 deletions src/backend/distributed/sql/citus--11.3-1--11.3-2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP VIEW citus_shards;
DROP VIEW IF EXISTS pg_catalog.citus_tables;
DROP VIEW IF EXISTS public.citus_tables;
DROP FUNCTION citus_shard_sizes;

#include "udfs/citus_shard_sizes/11.3-2.sql"

#include "udfs/citus_shards/11.3-2.sql"
#include "udfs/citus_tables/11.3-2.sql"
13 changes: 13 additions & 0 deletions src/backend/distributed/sql/downgrades/citus--11.3-2--11.3-1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DROP VIEW IF EXISTS public.citus_tables;
DROP VIEW IF EXISTS pg_catalog.citus_tables;

DROP VIEW pg_catalog.citus_shards;
DROP FUNCTION pg_catalog.citus_shard_sizes;
#include "../udfs/citus_shard_sizes/10.0-1.sql"
-- citus_shards/11.1-1.sql tries to create citus_shards in pg_catalog but it is not allowed.
-- Here we use citus_shards/10.0-1.sql to properly create the view in citus schema and
-- then alter it to pg_catalog, so citus_shards/11.1-1.sql can REPLACE it without any errors.
#include "../udfs/citus_shards/10.0-1.sql"

#include "../udfs/citus_tables/11.1-1.sql"
#include "../udfs/citus_shards/11.1-1.sql"
6 changes: 6 additions & 0 deletions src/backend/distributed/sql/udfs/citus_shard_sizes/11.3-2.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/backend/distributed/sql/udfs/citus_shard_sizes/latest.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE FUNCTION pg_catalog.citus_shard_sizes(OUT table_name text, OUT size bigint)
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_sizes(OUT shard_id int, OUT size bigint)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_shard_sizes$$;
COMMENT ON FUNCTION pg_catalog.citus_shard_sizes(OUT table_name text, OUT size bigint)
COMMENT ON FUNCTION pg_catalog.citus_shard_sizes(OUT shard_id int, OUT size bigint)
IS 'returns shards sizes across citus cluster';
46 changes: 46 additions & 0 deletions src/backend/distributed/sql/udfs/citus_shards/11.3-2.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/backend/distributed/sql/udfs/citus_shards/latest.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE VIEW pg_catalog.citus_shards AS
CREATE OR REPLACE VIEW citus.citus_shards AS
SELECT
pg_dist_shard.logicalrelid AS table_name,
pg_dist_shard.shardid,
Expand All @@ -23,7 +23,7 @@ JOIN
ON
pg_dist_partition.logicalrelid = pg_dist_shard.logicalrelid
LEFT JOIN
(SELECT (regexp_matches(table_name,'_(\d+)$'))[1]::int as shard_id, max(size) as size from citus_shard_sizes() GROUP BY shard_id) as shard_sizes
(SELECT shard_id, max(size) as size from citus_shard_sizes() GROUP BY shard_id) as shard_sizes
ON
pg_dist_shard.shardid = shard_sizes.shard_id
WHERE
Expand All @@ -42,4 +42,5 @@ ORDER BY
pg_dist_shard.logicalrelid::text, shardid
;

ALTER VIEW citus.citus_shards SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_shards TO public;
55 changes: 55 additions & 0 deletions src/backend/distributed/sql/udfs/citus_tables/11.3-2.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/backend/distributed/sql/udfs/citus_tables/latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ citus_tables_create_query=$CTCQ$
END AS citus_table_type,
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS distribution_column,
colocationid AS colocation_id,
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS table_size,
pg_size_pretty(table_sizes.table_size) AS table_size,
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS shard_count,
pg_get_userbyid(relowner) AS table_owner,
amname AS access_method
Expand All @@ -21,6 +21,13 @@ citus_tables_create_query=$CTCQ$
pg_class c ON (p.logicalrelid = c.oid)
LEFT JOIN
pg_am a ON (a.oid = c.relam)
JOIN
(
SELECT ds.logicalrelid AS table_id, SUM(css.size) AS table_size
FROM citus_shard_sizes() css, pg_dist_shard ds
WHERE css.shard_id = ds.shardid
GROUP BY ds.logicalrelid
) table_sizes ON (table_sizes.table_id = p.logicalrelid)
WHERE
-- filter out tables owned by extensions
logicalrelid NOT IN (
Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
"worker_partitioned_relation_total_size(%s)"

#define SHARD_SIZES_COLUMN_COUNT (3)
#define SHARD_SIZES_COLUMN_COUNT (2)

/*
* Flag to keep track of whether the process is currently in a function converting the
Expand Down
Loading

0 comments on commit a90a3a7

Please sign in to comment.