Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of CitusHasBeenLoaded function for a database that does not do CREATE EXTENSION citus but load citus.so. #7123

Merged
merged 28 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"

Expand Down Expand Up @@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,

bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
parsetree);

if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
Expand All @@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PreprocessCreateExtensionStmtForCitusColumnar(parsetree);
}

if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree))
{
/*
* Citus maintains a higher level cache. We use the cache invalidation mechanism
* of Postgres to achieve cache coherency between backends. Any change to citus
* extension should be made known to other backends. We do this by invalidating the
* relcache and therefore invoking the citus registered callback that invalidates
* the citus cache in other backends.
*/
CacheInvalidateRelcacheAll();
}

/*
* Make sure that on DROP DATABASE we terminate the background daemon
* associated with it.
Expand Down Expand Up @@ -926,15 +940,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
}
}
}

if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
{
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
Comment on lines -932 to -936
Copy link
Contributor

@JelteF JelteF Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this? The original comment makes it sounds like it is important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm I guess this is related to moving InitializeDistCache to _PG_init. Because we register the callback there, we don't do that here anymore.

}
}


Expand Down
120 changes: 69 additions & 51 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,27 @@ typedef struct ShardIdCacheEntry
int shardIndex;
} ShardIdCacheEntry;

/*
* ExtensionCreatedState is used to track if citus extension has been created
* using CREATE EXTENSION command.
* UNKNOWN : MetadataCache is invalid. State is UNKNOWN.
* CREATED : Citus is created.
* NOTCREATED : Citus is not created.
*/
typedef enum ExtensionCreatedState
{
UNKNOWN = 0,
CREATED = 1,
NOTCREATED = 2,
} ExtensionCreatedState;

/*
* State which should be cleared upon DROP EXTENSION. When the configuration
* changes, e.g. because extension is dropped, these summarily get set to 0.
*/
typedef struct MetadataCacheData
{
bool extensionLoaded;
ExtensionCreatedState extensionCreatedState;
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distBackgroundJobRelationId;
Expand Down Expand Up @@ -288,7 +301,6 @@ static void CreateDistTableCache(void);
static void CreateShardIdCache(void);
static void CreateDistObjectCache(void);
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId);
Expand Down Expand Up @@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
bool
CitusHasBeenLoaded(void)
{
if (!MetadataCache.extensionLoaded || creating_extension)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably call AcceptInvalidationMessages() here. To make sure we've processed any outstanding messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reorganized this part of the code such that when creating_extension == true for citus we do not change the cached value of extensionLoadedState but simply return false.

This change is to make the following scneario work:

BEGIN;

SET client_min_messages TO ERROR;
SET search_path TO public;
CREATE EXTENSION citus;
create table l1 (a int unique);
SELECT create_reference_table('l1');

When creating_extension=true, If we change the extensionCreatedState to false during creating_extension, it stayed false after this line causing the subsequent create_reference_table to fail.

This might be due to that we invalidate cache during preprocessing phase of the citus utility hook when
CREATE EXTENSION citus;
runs. And if we recreated the cache during creating_extension (we set extensionCreatedState to false), we do not get another cache invalidation message that would invalidate the cache.

AcceptInvalidationMessages();

did not eliminate the issue.

I opted for not changing the cache during creating_extension phase.

This change should not have any impact on performance since

if (creating_extension)

used to run previously every time even when the cache is valid.

/*
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
if (creating_extension)
{
/*
* Refresh if we have not determined whether the extension has been
* loaded yet, or in case of ALTER EXTENSION since we want to treat
* Citus as "not loaded" during ALTER EXTENSION citus.
*/
bool extensionLoaded = CitusHasBeenLoadedInternal();
Oid citusExtensionOid = get_extension_oid("citus", true);

if (CurrentExtensionObject == citusExtensionOid)
{
return false;
}
}

if (extensionLoaded && !MetadataCache.extensionLoaded)
/*
* If extensionCreatedState is UNKNOWN, query pg_extension for Citus
* and cache the result. Otherwise return the value extensionCreatedState
* indicates.
*/
if (MetadataCache.extensionCreatedState == UNKNOWN)
{
bool extensionCreated = CitusHasBeenLoadedInternal();

if (extensionCreated)
{
/*
* Loaded Citus for the first time in this session, or first time after
Expand All @@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void)
*/
StartupCitusBackend();

/*
* InvalidateDistRelationCacheCallback resets state such as extensionLoaded
* when it notices changes to pg_dist_partition (which usually indicate
* `DROP EXTENSION citus;` has been run)
*
* Ensure InvalidateDistRelationCacheCallback will notice those changes
* by caching pg_dist_partition's oid.
*
* We skip these checks during upgrade since pg_dist_partition is not
* present during early stages of upgrade operation.
*/
DistPartitionRelationId();

/*
* This needs to be initialized so we can receive foreign relation graph
* invalidation messages in InvalidateForeignRelationGraphCacheCallback().
* See the comments of InvalidateForeignKeyGraph for more context.
*/
DistColocationRelationId();
}

MetadataCache.extensionLoaded = extensionLoaded;
MetadataCache.extensionCreatedState = CREATED;
}
else
{
MetadataCache.extensionCreatedState = NOTCREATED;
}
}

return MetadataCache.extensionLoaded;
return (MetadataCache.extensionCreatedState == CREATED) ? true : false;
}


Expand All @@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void)
return false;
}

if (creating_extension && CurrentExtensionObject == citusExtensionOid)
{
/*
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
return false;
}

/* citus extension exists and has been created */
return true;
}
Expand Down Expand Up @@ -4201,10 +4209,6 @@ InitializeDistCache(void)
CreateShardIdCache();

InitializeDistObjectCache();

/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);
}


Expand Down Expand Up @@ -4754,20 +4758,26 @@ InvalidateForeignKeyGraph(void)
* InvalidateDistRelationCacheCallback flushes cache entries when a relation
* is updated (or flushes the entire cache).
*/
static void
void
InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
{
/* invalidate either entire cache or a specific entry */
if (relationId == InvalidOid)
{
InvalidateDistTableCache();
InvalidateDistObjectCache();
InvalidateMetadataSystemCache();
}
else
{
void *hashKey = (void *) &relationId;
bool foundInCache = false;

if (DistTableCacheHash == NULL)
{
return;
}

CitusTableCacheEntrySlot *cacheSlot =
hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
if (foundInCache)
Copy link
Contributor

@JelteF JelteF Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now remove the InvalidateMetadataSystemCache call below here.

Copy link
Contributor Author

@emelsimsek emelsimsek Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing InvalidateMetadataSystemCache call here causes check-vanilla test suit to fail. Specifically check_index is failing consistently in the CI if I remove this code. Let me investigate why, a bit further.

Copy link
Contributor Author

@emelsimsek emelsimsek Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that

REINDEX SCHEMA CONCURRENTLY pg_catalog;

command causes the relcache invalidation of pg_dist tables which are under pg_catalog. If we do not invalidate Metadatacache, it will end up with stale oids causing the subsequent commands to fail.

For instance, the following command
\d

will run a SELECT command that invokes citus hooks and will cause failed cached lookups of pg_dist* tables.

So we need to invalidate the cache when one of the pg_dist* tables got invalidated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have restored the previous code not to regress check-vanilla.
We will track the issue of pg_dist_ tables getting invalidated in a seperate PR.

Note that, even if I restore the original code there are likely two issues with the original code

  1. When a pg_dist table other than pg_dist_partition gets invalidated for example using
    REINDEX TABLE pg_dist_background_task;
    we currently do not refresh its oid in the Metadata cache.

  2. When pg_dist_partition gets invalidated we unnecessarily nuke the entire MetadataCache.

Expand All @@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
}

/*
* If pg_dist_partition is being invalidated drop all state
* This happens pretty rarely, but most importantly happens during
* DROP EXTENSION citus; This isn't the only time when this happens
* though, it can happen for multiple other reasons, such as an
* autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE
* wouldn't really need a full Metadata cache invalidation, but we
* don't know how to differentiate between DROP EXTENSION and ANALYZE.
* So for now we simply drop it in both cases and take the slight
* temporary performance hit.
* if pg_dist_partition relcache is invalidated for some reason,
* invalidate the MetadataCache. It is likely an overkill to invalidate
* the entire cache here. But until a better fix, we keep it this way
* for postgres regression tests that includes
* REINDEX SCHEMA CONCURRENTLY pg_catalog
* command.
*/
if (relationId == MetadataCache.distPartitionRelationId)
{
InvalidateMetadataSystemCache();
}


if (relationId == MetadataCache.distObjectRelationId)
{
InvalidateDistObjectCache();
Expand Down Expand Up @@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void)
CitusTableCacheEntrySlot *cacheSlot = NULL;
HASH_SEQ_STATUS status;

if (DistTableCacheHash == NULL)
{
return;
}

hash_seq_init(&status, DistTableCacheHash);

while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
Expand All @@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void)
DistObjectCacheEntry *cacheEntry = NULL;
HASH_SEQ_STATUS status;

if (DistObjectCacheHash == NULL)
{
return;
}

hash_seq_init(&status, DistObjectCacheHash);

while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL)
Expand Down Expand Up @@ -4930,8 +4948,8 @@ CreateDistObjectCache(void)


/*
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
* and invalidates the worker node, ConnParams, and local group ID caches.
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState
* flag and invalidates the worker node, ConnParams, and local group ID caches.
*/
void
InvalidateMetadataSystemCache(void)
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/varlena.h"

Expand Down Expand Up @@ -554,6 +556,9 @@ _PG_init(void)
"ColumnarSupportsIndexAM",
true, &handle);

CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);

Comment on lines +559 to +561
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this call moved here instead of staying in InitializeDistCache

INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr);
INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable);
INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions);
Expand Down
2 changes: 2 additions & 0 deletions src/include/distributed/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ typedef enum
ANY_CITUS_TABLE_TYPE
} CitusTableType;

void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);

extern List * AllCitusTableIds(void);
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
Expand Down
44 changes: 44 additions & 0 deletions src/test/regress/citus_tests/test/test_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import psycopg
import pytest


def test_create_drop_citus(coord):
with coord.cur() as cur1:
with coord.cur() as cur2:
# Conn1 drops the extension
# and Conn2 cannot use it.
cur1.execute("DROP EXTENSION citus")

with pytest.raises(psycopg.errors.UndefinedFunction):
# Conn1 dropped the extension. citus_version udf
# cannot be found.sycopg.errors.UndefinedFunction
# is expected here.
cur2.execute("SELECT citus_version();")

# Conn2 creates the extension,
# Conn1 is able to use it immediadtely.
cur2.execute("CREATE EXTENSION citus")
cur1.execute("SELECT citus_version();")
cur1.execute("DROP EXTENSION citus;")

with coord.cur() as cur1:
with coord.cur() as cur2:
# A connection is able to create and use the extension
# within a transaction block.
cur1.execute("BEGIN;")
cur1.execute("CREATE TABLE t1(id int);")
cur1.execute("CREATE EXTENSION citus;")
cur1.execute("SELECT create_reference_table('t1')")
cur1.execute("ABORT;")

# Conn1 aborted so Conn2 is be able to create and
# use the extension within a transaction block.
cur2.execute("BEGIN;")
cur2.execute("CREATE TABLE t1(id int);")
cur2.execute("CREATE EXTENSION citus;")
cur2.execute("SELECT create_reference_table('t1')")
cur2.execute("COMMIT;")

# Conn2 commited so Conn1 is be able to use the
# extension immediately.
cur1.execute("SELECT citus_version();")