From ac6008bdf7d25692b796a371f87e67ef3aac5b93 Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Sat, 27 Jan 2024 15:40:11 +0100 Subject: [PATCH 1/3] Give flex tables a number and use it for prepared statements This way prepared statements are different and we can have several of them in the same connection. We'll use that in the future. --- src/flex-lua-table.cpp | 3 ++- src/flex-table.cpp | 15 +++++++++------ src/flex-table.hpp | 9 +++++++-- tests/test-flex-indexes.cpp | 22 +++++++++++----------- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/flex-lua-table.cpp b/src/flex-lua-table.cpp index d459e170d..a37d1d84e 100644 --- a/src/flex-lua-table.cpp +++ b/src/flex-lua-table.cpp @@ -41,7 +41,8 @@ static flex_table_t &create_flex_table(lua_State *lua_state, throw fmt_error("Table with name '{}' already exists.", table_name); } - auto &new_table = tables->emplace_back(default_schema, table_name); + auto &new_table = + tables->emplace_back(default_schema, table_name, tables->size()); lua_pop(lua_state, 1); // "name" diff --git a/src/flex-table.cpp b/src/flex-table.cpp index 36016d13f..b0f31c2ed 100644 --- a/src/flex-table.cpp +++ b/src/flex-table.cpp @@ -167,14 +167,15 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const if (has_multicolumn_id_index()) { return fmt::format( - R"(PREPARE get_wkb(char(1), bigint) AS)" + R"(PREPARE get_wkb_{}(char(1), bigint) AS)" R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)", - columns, full_name(), m_columns[0].name(), m_columns[1].name()); + m_table_num, columns, full_name(), m_columns[0].name(), + m_columns[1].name()); } - return fmt::format(R"(PREPARE get_wkb(bigint) AS)" + return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)" R"( SELECT {} FROM {} WHERE "{}" = $1)", - columns, full_name(), id_column_names()); + m_table_num, columns, full_name(), id_column_names()); } std::string @@ -413,11 +414,13 @@ pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type, { assert(table().has_geom_column()); assert(m_db_connection); + + std::string const stmt = fmt::format("get_wkb_{}", table().num()); if (table().has_multicolumn_id_index()) { - return m_db_connection->exec_prepared_as_binary("get_wkb", + return m_db_connection->exec_prepared_as_binary(stmt.c_str(), type_to_char(type), id); } - return m_db_connection->exec_prepared_as_binary("get_wkb", id); + return m_db_connection->exec_prepared_as_binary(stmt.c_str(), id); } void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id) diff --git a/src/flex-table.hpp b/src/flex-table.hpp index efbd71122..bfcfd3ccc 100644 --- a/src/flex-table.hpp +++ b/src/flex-table.hpp @@ -59,8 +59,8 @@ class flex_table_t permanent }; - flex_table_t(std::string schema, std::string name) - : m_schema(std::move(schema)), m_name(std::move(name)) + flex_table_t(std::string schema, std::string name, std::size_t num) + : m_schema(std::move(schema)), m_name(std::move(name)), m_table_num(num) { } @@ -197,6 +197,8 @@ class flex_table_t bool has_columns_with_expire() const noexcept; + std::size_t num() const noexcept { return m_table_num; } + private: /// The schema this table is in std::string m_schema; @@ -230,6 +232,9 @@ class flex_table_t */ std::size_t m_geom_column = std::numeric_limits::max(); + /// Unique number for each table. + std::size_t m_table_num; + /** * Type of id stored in this table. */ diff --git a/tests/test-flex-indexes.cpp b/tests/test-flex-indexes.cpp index 609452af7..3bd401b09 100644 --- a/tests/test-flex-indexes.cpp +++ b/tests/test-flex-indexes.cpp @@ -48,7 +48,7 @@ TEST_CASE("check index with single column", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("geom", "geometry", ""); REQUIRE(table.indexes().empty()); @@ -71,7 +71,7 @@ TEST_CASE("check index with multiple columns", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("a", "int", ""); table.add_column("b", "int", ""); @@ -93,7 +93,7 @@ TEST_CASE("check unique index", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); REQUIRE(tf.run_lua( @@ -115,7 +115,7 @@ TEST_CASE("check index with tablespace from table", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.set_index_tablespace("foo"); table.add_column("col", "int", ""); @@ -137,7 +137,7 @@ TEST_CASE("check index with tablespace", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); REQUIRE(tf.run_lua("return { method = 'btree', column = 'col', tablespace " @@ -159,7 +159,7 @@ TEST_CASE("check index with expression and where clause", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "text", ""); REQUIRE(tf.run_lua("return { method = 'btree', expression = 'lower(col)'," @@ -181,7 +181,7 @@ TEST_CASE("check index with include", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -204,7 +204,7 @@ TEST_CASE("check index with include as array", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -227,7 +227,7 @@ TEST_CASE("check index with empty include array", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -250,7 +250,7 @@ TEST_CASE("check multiple indexes", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("a", "int", ""); table.add_column("b", "int", ""); @@ -273,7 +273,7 @@ TEST_CASE("check various broken index configs", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "text", ""); SECTION("empty index description") { REQUIRE(tf.run_lua("return {}")); } From 974447fb4e417314177f6f7d3e58cb9833e1204c Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Sat, 27 Jan 2024 11:59:25 +0100 Subject: [PATCH 2/3] Refactor: Move storage of db connections from flex tables to output Each table of the flex output kept its own database connection in the table_connection_t class. With this change, the connection is kept in a vector in the flex output instead. This is a refactoring step towards using fewer database connections. As long as each table had its own connection, we can not reduce them, but if we keep them outside, we will be able to re-use the same connection for different tables. --- src/flex-table.cpp | 76 ++++++++++++++++++--------------------------- src/flex-table.hpp | 22 +++++-------- src/output-flex.cpp | 34 ++++++++++++++------ src/output-flex.hpp | 5 +++ 4 files changed, 67 insertions(+), 70 deletions(-) diff --git a/src/flex-table.cpp b/src/flex-table.cpp index b0f31c2ed..9e9676c53 100644 --- a/src/flex-table.cpp +++ b/src/flex-table.cpp @@ -242,14 +242,6 @@ bool flex_table_t::has_columns_with_expire() const noexcept [](auto const &column) { return column.has_expire(); }); } -void table_connection_t::connect(connection_params_t const &connection_params) -{ - assert(!m_db_connection); - - m_db_connection = - std::make_unique(connection_params, "out.flex.table"); -} - static void enable_check_trigger(pg_conn_t const &db_connection, flex_table_t const &table) { @@ -274,50 +266,46 @@ static void enable_check_trigger(pg_conn_t const &db_connection, checks); } -void table_connection_t::start(bool append) +void table_connection_t::start(pg_conn_t const &db_connection, bool append) { - assert(m_db_connection); - if (!append) { - m_db_connection->exec("DROP TABLE IF EXISTS {} CASCADE", - table().full_name()); + db_connection.exec("DROP TABLE IF EXISTS {} CASCADE", + table().full_name()); } // These _tmp tables can be left behind if we run out of disk space. - m_db_connection->exec("DROP TABLE IF EXISTS {}", table().full_tmp_name()); + db_connection.exec("DROP TABLE IF EXISTS {}", table().full_tmp_name()); if (!append) { - m_db_connection->exec(table().build_sql_create_table( + db_connection.exec(table().build_sql_create_table( table().cluster_by_geom() ? flex_table_t::table_type::interim : flex_table_t::table_type::permanent, table().full_name())); - enable_check_trigger(*m_db_connection, table()); + enable_check_trigger(db_connection, table()); } - prepare(); + prepare(db_connection); } -void table_connection_t::stop(bool updateable, bool append) +void table_connection_t::stop(pg_conn_t const &db_connection, bool updateable, + bool append) { - assert(m_db_connection); - m_copy_mgr.sync(); if (append) { - teardown(); return; } if (table().cluster_by_geom()) { if (table().geom_column().needs_isvalid()) { - drop_geom_check_trigger(*m_db_connection, table().schema(), + drop_geom_check_trigger(db_connection, table().schema(), table().name()); } log_info("Clustering table '{}' by geometry...", table().name()); - m_db_connection->exec(table().build_sql_create_table( + db_connection.exec(table().build_sql_create_table( flex_table_t::table_type::permanent, table().full_tmp_name())); std::string const columns = table().build_sql_column_list(); @@ -350,15 +338,15 @@ void table_connection_t::stop(bool updateable, bool append) sql += geom_column_name; } - m_db_connection->exec(sql); + db_connection.exec(sql); - m_db_connection->exec("DROP TABLE {}", table().full_name()); - m_db_connection->exec(R"(ALTER TABLE {} RENAME TO "{}")", - table().full_tmp_name(), table().name()); + db_connection.exec("DROP TABLE {}", table().full_name()); + db_connection.exec(R"(ALTER TABLE {} RENAME TO "{}")", + table().full_tmp_name(), table().name()); m_id_index_created = false; if (updateable) { - enable_check_trigger(*m_db_connection, table()); + enable_check_trigger(db_connection, table()); } } @@ -370,57 +358,53 @@ void table_connection_t::stop(bool updateable, bool append) index.columns()); auto const sql = index.create_index( qualified_name(table().schema(), table().name())); - m_db_connection->exec(sql); + db_connection.exec(sql); } } if ((table().always_build_id_index() || updateable) && table().has_id_column()) { - create_id_index(); + create_id_index(db_connection); } log_info("Analyzing table '{}'...", table().name()); - analyze(); - - teardown(); + analyze(db_connection); } -void table_connection_t::prepare() +void table_connection_t::prepare(pg_conn_t const &db_connection) { - assert(m_db_connection); if (table().has_id_column() && table().has_columns_with_expire()) { - m_db_connection->exec(table().build_sql_prepare_get_wkb()); + db_connection.exec(table().build_sql_prepare_get_wkb()); } } -void table_connection_t::analyze() +void table_connection_t::analyze(pg_conn_t const &db_connection) { - analyze_table(*m_db_connection, table().schema(), table().name()); + analyze_table(db_connection, table().schema(), table().name()); } -void table_connection_t::create_id_index() +void table_connection_t::create_id_index(pg_conn_t const &db_connection) { if (m_id_index_created) { log_debug("Id index on table '{}' already created.", table().name()); } else { log_info("Creating id index on table '{}'...", table().name()); - m_db_connection->exec(table().build_sql_create_id_index()); + db_connection.exec(table().build_sql_create_id_index()); m_id_index_created = true; } } -pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type, +pg_result_t table_connection_t::get_geoms_by_id(pg_conn_t const &db_connection, + osmium::item_type type, osmid_t id) const { assert(table().has_geom_column()); - assert(m_db_connection); - std::string const stmt = fmt::format("get_wkb_{}", table().num()); if (table().has_multicolumn_id_index()) { - return m_db_connection->exec_prepared_as_binary(stmt.c_str(), - type_to_char(type), id); + return db_connection.exec_prepared_as_binary(stmt.c_str(), + type_to_char(type), id); } - return m_db_connection->exec_prepared_as_binary(stmt.c_str(), id); + return db_connection.exec_prepared_as_binary(stmt.c_str(), id); } void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id) diff --git a/src/flex-table.hpp b/src/flex-table.hpp index bfcfd3ccc..cc15febff 100644 --- a/src/flex-table.hpp +++ b/src/flex-table.hpp @@ -260,31 +260,28 @@ class table_connection_t m_target(std::make_shared( table->schema(), table->name(), table->id_column_names(), table->build_sql_column_list())), - m_copy_mgr(copy_thread), m_db_connection(nullptr) + m_copy_mgr(copy_thread) { } - void connect(connection_params_t const &connection_params); + void start(pg_conn_t const &db_connection, bool append); - void start(bool append); - - void stop(bool updateable, bool append); + void stop(pg_conn_t const &db_connection, bool updateable, bool append); flex_table_t const &table() const noexcept { return *m_table; } - void teardown() { m_db_connection.reset(); } - - void prepare(); + void prepare(pg_conn_t const &db_connection); - void analyze(); + void analyze(pg_conn_t const &db_connection); - void create_id_index(); + void create_id_index(pg_conn_t const &db_connection); /** * Get all geometries that have at least one expire config defined * from the database and return the result set. */ - pg_result_t get_geoms_by_id(osmium::item_type type, osmid_t id) const; + pg_result_t get_geoms_by_id(pg_conn_t const &db_connection, + osmium::item_type type, osmid_t id) const; void flush() { m_copy_mgr.flush(); } @@ -332,9 +329,6 @@ class table_connection_t */ db_copy_mgr_t m_copy_mgr; - /// The connection to the database server. - std::unique_ptr m_db_connection; - task_result_t m_task_result; std::size_t m_count_insert = 0; diff --git a/src/output-flex.cpp b/src/output-flex.cpp index 633086222..7b6452294 100644 --- a/src/output-flex.cpp +++ b/src/output-flex.cpp @@ -1075,9 +1075,12 @@ void output_flex_t::after_ways() void output_flex_t::stop() { for (auto &table : m_table_connections) { - table.task_set(thread_pool().submit([&]() { - table.stop(get_options()->slim && !get_options()->droptemp, + auto *db_connection = &m_db_connections.at(table.table().num()); + table.task_set(thread_pool().submit([&, db_connection]() { + table.stop(*db_connection, + get_options()->slim && !get_options()->droptemp, get_options()->append); + db_connection->close(); })); } @@ -1151,13 +1154,15 @@ void output_flex_t::relation_add(osmium::Relation const &relation) } void output_flex_t::delete_from_table(table_connection_t *table_connection, + pg_conn_t const &db_connection, osmium::item_type type, osmid_t osm_id) { assert(table_connection); auto const id = table_connection->table().map_id(type, osm_id); if (table_connection->table().has_columns_with_expire()) { - auto const result = table_connection->get_geoms_by_id(type, id); + auto const result = + table_connection->get_geoms_by_id(db_connection, type, id); auto const num_tuples = result.num_tuples(); if (num_tuples > 0) { int col = 0; @@ -1180,7 +1185,8 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id) { for (auto &table : m_table_connections) { if (table.table().matches_type(type) && table.table().has_id_column()) { - delete_from_table(&table, type, osm_id); + delete_from_table(&table, m_db_connections.at(table.table().num()), + type, osm_id); } } } @@ -1224,9 +1230,12 @@ void output_flex_t::relation_modify(osmium::Relation const &rel) void output_flex_t::start() { + assert(m_db_connections.empty()); + for (auto &table : m_table_connections) { - table.connect(get_options()->connection_params); - table.start(get_options()->append); + m_db_connections.emplace_back(get_options()->connection_params, + "out.flex.table"); + table.start(m_db_connections.back(), get_options()->append); } } @@ -1260,10 +1269,13 @@ output_flex_t::output_flex_t(output_flex_t const *other, m_process_relation(other->m_process_relation), m_select_relation_members(other->m_select_relation_members) { + assert(m_db_connections.empty()); + for (auto &table : *m_tables) { auto &tc = m_table_connections.emplace_back(&table, m_copy_thread); - tc.connect(get_options()->connection_params); - tc.prepare(); + m_db_connections.emplace_back(get_options()->connection_params, + "out.flex.table"); + tc.prepare(m_db_connections.back()); } for (auto &expire_output : *m_expire_outputs) { @@ -1509,8 +1521,10 @@ void output_flex_t::reprocess_marked() for (auto &table : m_table_connections) { if (table.table().matches_type(osmium::item_type::way) && table.table().has_id_column()) { - table.analyze(); - table.create_id_index(); + auto const &db_connection = + m_db_connections.at(table.table().num()); + table.analyze(db_connection); + table.create_id_index(db_connection); } } diff --git a/src/output-flex.hpp b/src/output-flex.hpp index c3125125a..ae812502c 100644 --- a/src/output-flex.hpp +++ b/src/output-flex.hpp @@ -224,7 +224,9 @@ class output_flex_t : public output_t void add_row(table_connection_t *table_connection, OBJECT const &object); void delete_from_table(table_connection_t *table_connection, + pg_conn_t const &db_connection, osmium::item_type type, osmid_t osm_id); + void delete_from_tables(osmium::item_type type, osmid_t osm_id); lua_State *lua_state() noexcept { return m_lua_state.get(); } @@ -288,6 +290,9 @@ class output_flex_t : public output_t std::vector m_table_connections; + /// The connections to the database server for each table. + std::vector m_db_connections; + // This is shared between all clones of the output and must only be // accessed while protected using the lua_mutex. std::shared_ptr m_stage2_way_ids = std::make_shared(); From ef8f6e50f33b35635c226816c451bfd317300902 Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Sun, 4 Feb 2024 11:02:49 +0100 Subject: [PATCH 3/3] Use far fewer connections in flex output Most operations are not run in parallel anyway, they only need a single connection. For the parts running in parallel, we have one connection per thread. We don't have connections for each table any more. --- src/output-flex.cpp | 31 +++++++++++-------------------- src/output-flex.hpp | 4 ++-- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/src/output-flex.cpp b/src/output-flex.cpp index 7b6452294..daccd60fd 100644 --- a/src/output-flex.cpp +++ b/src/output-flex.cpp @@ -1075,12 +1075,12 @@ void output_flex_t::after_ways() void output_flex_t::stop() { for (auto &table : m_table_connections) { - auto *db_connection = &m_db_connections.at(table.table().num()); - table.task_set(thread_pool().submit([&, db_connection]() { - table.stop(*db_connection, + table.task_set(thread_pool().submit([&]() { + pg_conn_t const db_connection{get_options()->connection_params, + "out.flex.stop"}; + table.stop(db_connection, get_options()->slim && !get_options()->droptemp, get_options()->append); - db_connection->close(); })); } @@ -1185,8 +1185,7 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id) { for (auto &table : m_table_connections) { if (table.table().matches_type(type) && table.table().has_id_column()) { - delete_from_table(&table, m_db_connections.at(table.table().num()), - type, osm_id); + delete_from_table(&table, m_db_connection, type, osm_id); } } } @@ -1230,12 +1229,8 @@ void output_flex_t::relation_modify(osmium::Relation const &rel) void output_flex_t::start() { - assert(m_db_connections.empty()); - for (auto &table : m_table_connections) { - m_db_connections.emplace_back(get_options()->connection_params, - "out.flex.table"); - table.start(m_db_connections.back(), get_options()->append); + table.start(m_db_connection, get_options()->append); } } @@ -1263,19 +1258,16 @@ output_flex_t::output_flex_t(output_flex_t const *other, std::shared_ptr copy_thread) : output_t(other, std::move(mid)), m_tables(other->m_tables), m_expire_outputs(other->m_expire_outputs), + m_db_connection(get_options()->connection_params, "out.flex.thread"), m_stage2_way_ids(other->m_stage2_way_ids), m_copy_thread(std::move(copy_thread)), m_lua_state(other->m_lua_state), m_process_node(other->m_process_node), m_process_way(other->m_process_way), m_process_relation(other->m_process_relation), m_select_relation_members(other->m_select_relation_members) { - assert(m_db_connections.empty()); - for (auto &table : *m_tables) { auto &tc = m_table_connections.emplace_back(&table, m_copy_thread); - m_db_connections.emplace_back(get_options()->connection_params, - "out.flex.table"); - tc.prepare(m_db_connections.back()); + tc.prepare(m_db_connection); } for (auto &expire_output : *m_expire_outputs) { @@ -1295,6 +1287,7 @@ output_flex_t::output_flex_t(std::shared_ptr const &mid, std::shared_ptr thread_pool, options_t const &options) : output_t(mid, std::move(thread_pool), options), + m_db_connection(get_options()->connection_params, "out.flex.main"), m_copy_thread(std::make_shared(options.connection_params)) { init_lua(options.style); @@ -1521,10 +1514,8 @@ void output_flex_t::reprocess_marked() for (auto &table : m_table_connections) { if (table.table().matches_type(osmium::item_type::way) && table.table().has_id_column()) { - auto const &db_connection = - m_db_connections.at(table.table().num()); - table.analyze(db_connection); - table.create_id_index(db_connection); + table.analyze(m_db_connection); + table.create_id_index(m_db_connection); } } diff --git a/src/output-flex.hpp b/src/output-flex.hpp index ae812502c..b64d15e13 100644 --- a/src/output-flex.hpp +++ b/src/output-flex.hpp @@ -290,8 +290,8 @@ class output_flex_t : public output_t std::vector m_table_connections; - /// The connections to the database server for each table. - std::vector m_db_connections; + /// The connection to the database server. + pg_conn_t m_db_connection; // This is shared between all clones of the output and must only be // accessed while protected using the lua_mutex.