diff --git a/src/flex-lua-table.cpp b/src/flex-lua-table.cpp index d459e170d..a37d1d84e 100644 --- a/src/flex-lua-table.cpp +++ b/src/flex-lua-table.cpp @@ -41,7 +41,8 @@ static flex_table_t &create_flex_table(lua_State *lua_state, throw fmt_error("Table with name '{}' already exists.", table_name); } - auto &new_table = tables->emplace_back(default_schema, table_name); + auto &new_table = + tables->emplace_back(default_schema, table_name, tables->size()); lua_pop(lua_state, 1); // "name" diff --git a/src/flex-table.cpp b/src/flex-table.cpp index 36016d13f..9e9676c53 100644 --- a/src/flex-table.cpp +++ b/src/flex-table.cpp @@ -167,14 +167,15 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const if (has_multicolumn_id_index()) { return fmt::format( - R"(PREPARE get_wkb(char(1), bigint) AS)" + R"(PREPARE get_wkb_{}(char(1), bigint) AS)" R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)", - columns, full_name(), m_columns[0].name(), m_columns[1].name()); + m_table_num, columns, full_name(), m_columns[0].name(), + m_columns[1].name()); } - return fmt::format(R"(PREPARE get_wkb(bigint) AS)" + return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)" R"( SELECT {} FROM {} WHERE "{}" = $1)", - columns, full_name(), id_column_names()); + m_table_num, columns, full_name(), id_column_names()); } std::string @@ -241,14 +242,6 @@ bool flex_table_t::has_columns_with_expire() const noexcept [](auto const &column) { return column.has_expire(); }); } -void table_connection_t::connect(connection_params_t const &connection_params) -{ - assert(!m_db_connection); - - m_db_connection = - std::make_unique(connection_params, "out.flex.table"); -} - static void enable_check_trigger(pg_conn_t const &db_connection, flex_table_t const &table) { @@ -273,50 +266,46 @@ static void enable_check_trigger(pg_conn_t const &db_connection, checks); } -void table_connection_t::start(bool append) +void table_connection_t::start(pg_conn_t const &db_connection, bool append) { - assert(m_db_connection); - if (!append) { - m_db_connection->exec("DROP TABLE IF EXISTS {} CASCADE", - table().full_name()); + db_connection.exec("DROP TABLE IF EXISTS {} CASCADE", + table().full_name()); } // These _tmp tables can be left behind if we run out of disk space. - m_db_connection->exec("DROP TABLE IF EXISTS {}", table().full_tmp_name()); + db_connection.exec("DROP TABLE IF EXISTS {}", table().full_tmp_name()); if (!append) { - m_db_connection->exec(table().build_sql_create_table( + db_connection.exec(table().build_sql_create_table( table().cluster_by_geom() ? flex_table_t::table_type::interim : flex_table_t::table_type::permanent, table().full_name())); - enable_check_trigger(*m_db_connection, table()); + enable_check_trigger(db_connection, table()); } - prepare(); + prepare(db_connection); } -void table_connection_t::stop(bool updateable, bool append) +void table_connection_t::stop(pg_conn_t const &db_connection, bool updateable, + bool append) { - assert(m_db_connection); - m_copy_mgr.sync(); if (append) { - teardown(); return; } if (table().cluster_by_geom()) { if (table().geom_column().needs_isvalid()) { - drop_geom_check_trigger(*m_db_connection, table().schema(), + drop_geom_check_trigger(db_connection, table().schema(), table().name()); } log_info("Clustering table '{}' by geometry...", table().name()); - m_db_connection->exec(table().build_sql_create_table( + db_connection.exec(table().build_sql_create_table( flex_table_t::table_type::permanent, table().full_tmp_name())); std::string const columns = table().build_sql_column_list(); @@ -349,15 +338,15 @@ void table_connection_t::stop(bool updateable, bool append) sql += geom_column_name; } - m_db_connection->exec(sql); + db_connection.exec(sql); - m_db_connection->exec("DROP TABLE {}", table().full_name()); - m_db_connection->exec(R"(ALTER TABLE {} RENAME TO "{}")", - table().full_tmp_name(), table().name()); + db_connection.exec("DROP TABLE {}", table().full_name()); + db_connection.exec(R"(ALTER TABLE {} RENAME TO "{}")", + table().full_tmp_name(), table().name()); m_id_index_created = false; if (updateable) { - enable_check_trigger(*m_db_connection, table()); + enable_check_trigger(db_connection, table()); } } @@ -369,55 +358,53 @@ void table_connection_t::stop(bool updateable, bool append) index.columns()); auto const sql = index.create_index( qualified_name(table().schema(), table().name())); - m_db_connection->exec(sql); + db_connection.exec(sql); } } if ((table().always_build_id_index() || updateable) && table().has_id_column()) { - create_id_index(); + create_id_index(db_connection); } log_info("Analyzing table '{}'...", table().name()); - analyze(); - - teardown(); + analyze(db_connection); } -void table_connection_t::prepare() +void table_connection_t::prepare(pg_conn_t const &db_connection) { - assert(m_db_connection); if (table().has_id_column() && table().has_columns_with_expire()) { - m_db_connection->exec(table().build_sql_prepare_get_wkb()); + db_connection.exec(table().build_sql_prepare_get_wkb()); } } -void table_connection_t::analyze() +void table_connection_t::analyze(pg_conn_t const &db_connection) { - analyze_table(*m_db_connection, table().schema(), table().name()); + analyze_table(db_connection, table().schema(), table().name()); } -void table_connection_t::create_id_index() +void table_connection_t::create_id_index(pg_conn_t const &db_connection) { if (m_id_index_created) { log_debug("Id index on table '{}' already created.", table().name()); } else { log_info("Creating id index on table '{}'...", table().name()); - m_db_connection->exec(table().build_sql_create_id_index()); + db_connection.exec(table().build_sql_create_id_index()); m_id_index_created = true; } } -pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type, +pg_result_t table_connection_t::get_geoms_by_id(pg_conn_t const &db_connection, + osmium::item_type type, osmid_t id) const { assert(table().has_geom_column()); - assert(m_db_connection); + std::string const stmt = fmt::format("get_wkb_{}", table().num()); if (table().has_multicolumn_id_index()) { - return m_db_connection->exec_prepared_as_binary("get_wkb", - type_to_char(type), id); + return db_connection.exec_prepared_as_binary(stmt.c_str(), + type_to_char(type), id); } - return m_db_connection->exec_prepared_as_binary("get_wkb", id); + return db_connection.exec_prepared_as_binary(stmt.c_str(), id); } void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id) diff --git a/src/flex-table.hpp b/src/flex-table.hpp index efbd71122..cc15febff 100644 --- a/src/flex-table.hpp +++ b/src/flex-table.hpp @@ -59,8 +59,8 @@ class flex_table_t permanent }; - flex_table_t(std::string schema, std::string name) - : m_schema(std::move(schema)), m_name(std::move(name)) + flex_table_t(std::string schema, std::string name, std::size_t num) + : m_schema(std::move(schema)), m_name(std::move(name)), m_table_num(num) { } @@ -197,6 +197,8 @@ class flex_table_t bool has_columns_with_expire() const noexcept; + std::size_t num() const noexcept { return m_table_num; } + private: /// The schema this table is in std::string m_schema; @@ -230,6 +232,9 @@ class flex_table_t */ std::size_t m_geom_column = std::numeric_limits::max(); + /// Unique number for each table. + std::size_t m_table_num; + /** * Type of id stored in this table. */ @@ -255,31 +260,28 @@ class table_connection_t m_target(std::make_shared( table->schema(), table->name(), table->id_column_names(), table->build_sql_column_list())), - m_copy_mgr(copy_thread), m_db_connection(nullptr) + m_copy_mgr(copy_thread) { } - void connect(connection_params_t const &connection_params); - - void start(bool append); + void start(pg_conn_t const &db_connection, bool append); - void stop(bool updateable, bool append); + void stop(pg_conn_t const &db_connection, bool updateable, bool append); flex_table_t const &table() const noexcept { return *m_table; } - void teardown() { m_db_connection.reset(); } + void prepare(pg_conn_t const &db_connection); - void prepare(); + void analyze(pg_conn_t const &db_connection); - void analyze(); - - void create_id_index(); + void create_id_index(pg_conn_t const &db_connection); /** * Get all geometries that have at least one expire config defined * from the database and return the result set. */ - pg_result_t get_geoms_by_id(osmium::item_type type, osmid_t id) const; + pg_result_t get_geoms_by_id(pg_conn_t const &db_connection, + osmium::item_type type, osmid_t id) const; void flush() { m_copy_mgr.flush(); } @@ -327,9 +329,6 @@ class table_connection_t */ db_copy_mgr_t m_copy_mgr; - /// The connection to the database server. - std::unique_ptr m_db_connection; - task_result_t m_task_result; std::size_t m_count_insert = 0; diff --git a/src/output-flex.cpp b/src/output-flex.cpp index 633086222..daccd60fd 100644 --- a/src/output-flex.cpp +++ b/src/output-flex.cpp @@ -1076,7 +1076,10 @@ void output_flex_t::stop() { for (auto &table : m_table_connections) { table.task_set(thread_pool().submit([&]() { - table.stop(get_options()->slim && !get_options()->droptemp, + pg_conn_t const db_connection{get_options()->connection_params, + "out.flex.stop"}; + table.stop(db_connection, + get_options()->slim && !get_options()->droptemp, get_options()->append); })); } @@ -1151,13 +1154,15 @@ void output_flex_t::relation_add(osmium::Relation const &relation) } void output_flex_t::delete_from_table(table_connection_t *table_connection, + pg_conn_t const &db_connection, osmium::item_type type, osmid_t osm_id) { assert(table_connection); auto const id = table_connection->table().map_id(type, osm_id); if (table_connection->table().has_columns_with_expire()) { - auto const result = table_connection->get_geoms_by_id(type, id); + auto const result = + table_connection->get_geoms_by_id(db_connection, type, id); auto const num_tuples = result.num_tuples(); if (num_tuples > 0) { int col = 0; @@ -1180,7 +1185,7 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id) { for (auto &table : m_table_connections) { if (table.table().matches_type(type) && table.table().has_id_column()) { - delete_from_table(&table, type, osm_id); + delete_from_table(&table, m_db_connection, type, osm_id); } } } @@ -1225,8 +1230,7 @@ void output_flex_t::relation_modify(osmium::Relation const &rel) void output_flex_t::start() { for (auto &table : m_table_connections) { - table.connect(get_options()->connection_params); - table.start(get_options()->append); + table.start(m_db_connection, get_options()->append); } } @@ -1254,6 +1258,7 @@ output_flex_t::output_flex_t(output_flex_t const *other, std::shared_ptr copy_thread) : output_t(other, std::move(mid)), m_tables(other->m_tables), m_expire_outputs(other->m_expire_outputs), + m_db_connection(get_options()->connection_params, "out.flex.thread"), m_stage2_way_ids(other->m_stage2_way_ids), m_copy_thread(std::move(copy_thread)), m_lua_state(other->m_lua_state), m_process_node(other->m_process_node), m_process_way(other->m_process_way), @@ -1262,8 +1267,7 @@ output_flex_t::output_flex_t(output_flex_t const *other, { for (auto &table : *m_tables) { auto &tc = m_table_connections.emplace_back(&table, m_copy_thread); - tc.connect(get_options()->connection_params); - tc.prepare(); + tc.prepare(m_db_connection); } for (auto &expire_output : *m_expire_outputs) { @@ -1283,6 +1287,7 @@ output_flex_t::output_flex_t(std::shared_ptr const &mid, std::shared_ptr thread_pool, options_t const &options) : output_t(mid, std::move(thread_pool), options), + m_db_connection(get_options()->connection_params, "out.flex.main"), m_copy_thread(std::make_shared(options.connection_params)) { init_lua(options.style); @@ -1509,8 +1514,8 @@ void output_flex_t::reprocess_marked() for (auto &table : m_table_connections) { if (table.table().matches_type(osmium::item_type::way) && table.table().has_id_column()) { - table.analyze(); - table.create_id_index(); + table.analyze(m_db_connection); + table.create_id_index(m_db_connection); } } diff --git a/src/output-flex.hpp b/src/output-flex.hpp index c3125125a..b64d15e13 100644 --- a/src/output-flex.hpp +++ b/src/output-flex.hpp @@ -224,7 +224,9 @@ class output_flex_t : public output_t void add_row(table_connection_t *table_connection, OBJECT const &object); void delete_from_table(table_connection_t *table_connection, + pg_conn_t const &db_connection, osmium::item_type type, osmid_t osm_id); + void delete_from_tables(osmium::item_type type, osmid_t osm_id); lua_State *lua_state() noexcept { return m_lua_state.get(); } @@ -288,6 +290,9 @@ class output_flex_t : public output_t std::vector m_table_connections; + /// The connection to the database server. + pg_conn_t m_db_connection; + // This is shared between all clones of the output and must only be // accessed while protected using the lua_mutex. std::shared_ptr m_stage2_way_ids = std::make_shared(); diff --git a/tests/test-flex-indexes.cpp b/tests/test-flex-indexes.cpp index 609452af7..3bd401b09 100644 --- a/tests/test-flex-indexes.cpp +++ b/tests/test-flex-indexes.cpp @@ -48,7 +48,7 @@ TEST_CASE("check index with single column", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("geom", "geometry", ""); REQUIRE(table.indexes().empty()); @@ -71,7 +71,7 @@ TEST_CASE("check index with multiple columns", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("a", "int", ""); table.add_column("b", "int", ""); @@ -93,7 +93,7 @@ TEST_CASE("check unique index", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); REQUIRE(tf.run_lua( @@ -115,7 +115,7 @@ TEST_CASE("check index with tablespace from table", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.set_index_tablespace("foo"); table.add_column("col", "int", ""); @@ -137,7 +137,7 @@ TEST_CASE("check index with tablespace", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); REQUIRE(tf.run_lua("return { method = 'btree', column = 'col', tablespace " @@ -159,7 +159,7 @@ TEST_CASE("check index with expression and where clause", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "text", ""); REQUIRE(tf.run_lua("return { method = 'btree', expression = 'lower(col)'," @@ -181,7 +181,7 @@ TEST_CASE("check index with include", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -204,7 +204,7 @@ TEST_CASE("check index with include as array", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -227,7 +227,7 @@ TEST_CASE("check index with empty include array", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "int", ""); table.add_column("extra", "int", ""); @@ -250,7 +250,7 @@ TEST_CASE("check multiple indexes", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("a", "int", ""); table.add_column("b", "int", ""); @@ -273,7 +273,7 @@ TEST_CASE("check various broken index configs", "[NoDB]") { test_framework const tf; - flex_table_t table{"public", "test_table"}; + flex_table_t table{"public", "test_table", 0}; table.add_column("col", "text", ""); SECTION("empty index description") { REQUIRE(tf.run_lua("return {}")); }