Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Significantly reduce number of db connection in flex output #2134

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/flex-lua-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ static flex_table_t &create_flex_table(lua_State *lua_state,
throw fmt_error("Table with name '{}' already exists.", table_name);
}

auto &new_table = tables->emplace_back(default_schema, table_name);
auto &new_table =
tables->emplace_back(default_schema, table_name, tables->size());

lua_pop(lua_state, 1); // "name"

Expand Down
85 changes: 36 additions & 49 deletions src/flex-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const

if (has_multicolumn_id_index()) {
return fmt::format(
R"(PREPARE get_wkb(char(1), bigint) AS)"
R"(PREPARE get_wkb_{}(char(1), bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)",
columns, full_name(), m_columns[0].name(), m_columns[1].name());
m_table_num, columns, full_name(), m_columns[0].name(),
m_columns[1].name());
}

return fmt::format(R"(PREPARE get_wkb(bigint) AS)"
return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1)",
columns, full_name(), id_column_names());
m_table_num, columns, full_name(), id_column_names());
}

std::string
Expand Down Expand Up @@ -241,14 +242,6 @@ bool flex_table_t::has_columns_with_expire() const noexcept
[](auto const &column) { return column.has_expire(); });
}

void table_connection_t::connect(connection_params_t const &connection_params)
{
assert(!m_db_connection);

m_db_connection =
std::make_unique<pg_conn_t>(connection_params, "out.flex.table");
}

static void enable_check_trigger(pg_conn_t const &db_connection,
flex_table_t const &table)
{
Expand All @@ -273,50 +266,46 @@ static void enable_check_trigger(pg_conn_t const &db_connection,
checks);
}

void table_connection_t::start(bool append)
void table_connection_t::start(pg_conn_t const &db_connection, bool append)
{
assert(m_db_connection);

if (!append) {
m_db_connection->exec("DROP TABLE IF EXISTS {} CASCADE",
table().full_name());
db_connection.exec("DROP TABLE IF EXISTS {} CASCADE",
table().full_name());
}

// These _tmp tables can be left behind if we run out of disk space.
m_db_connection->exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());
db_connection.exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());

if (!append) {
m_db_connection->exec(table().build_sql_create_table(
db_connection.exec(table().build_sql_create_table(
table().cluster_by_geom() ? flex_table_t::table_type::interim
: flex_table_t::table_type::permanent,
table().full_name()));

enable_check_trigger(*m_db_connection, table());
enable_check_trigger(db_connection, table());
}

prepare();
prepare(db_connection);
}

void table_connection_t::stop(bool updateable, bool append)
void table_connection_t::stop(pg_conn_t const &db_connection, bool updateable,
bool append)
{
assert(m_db_connection);

m_copy_mgr.sync();

if (append) {
teardown();
return;
}

if (table().cluster_by_geom()) {
if (table().geom_column().needs_isvalid()) {
drop_geom_check_trigger(*m_db_connection, table().schema(),
drop_geom_check_trigger(db_connection, table().schema(),
table().name());
}

log_info("Clustering table '{}' by geometry...", table().name());

m_db_connection->exec(table().build_sql_create_table(
db_connection.exec(table().build_sql_create_table(
flex_table_t::table_type::permanent, table().full_tmp_name()));

std::string const columns = table().build_sql_column_list();
Expand Down Expand Up @@ -349,15 +338,15 @@ void table_connection_t::stop(bool updateable, bool append)
sql += geom_column_name;
}

m_db_connection->exec(sql);
db_connection.exec(sql);

m_db_connection->exec("DROP TABLE {}", table().full_name());
m_db_connection->exec(R"(ALTER TABLE {} RENAME TO "{}")",
table().full_tmp_name(), table().name());
db_connection.exec("DROP TABLE {}", table().full_name());
db_connection.exec(R"(ALTER TABLE {} RENAME TO "{}")",
table().full_tmp_name(), table().name());
m_id_index_created = false;

if (updateable) {
enable_check_trigger(*m_db_connection, table());
enable_check_trigger(db_connection, table());
}
}

Expand All @@ -369,55 +358,53 @@ void table_connection_t::stop(bool updateable, bool append)
index.columns());
auto const sql = index.create_index(
qualified_name(table().schema(), table().name()));
m_db_connection->exec(sql);
db_connection.exec(sql);
}
}

if ((table().always_build_id_index() || updateable) &&
table().has_id_column()) {
create_id_index();
create_id_index(db_connection);
}

log_info("Analyzing table '{}'...", table().name());
analyze();

teardown();
analyze(db_connection);
}

void table_connection_t::prepare()
void table_connection_t::prepare(pg_conn_t const &db_connection)
{
assert(m_db_connection);
if (table().has_id_column() && table().has_columns_with_expire()) {
m_db_connection->exec(table().build_sql_prepare_get_wkb());
db_connection.exec(table().build_sql_prepare_get_wkb());
}
}

void table_connection_t::analyze()
void table_connection_t::analyze(pg_conn_t const &db_connection)
{
analyze_table(*m_db_connection, table().schema(), table().name());
analyze_table(db_connection, table().schema(), table().name());
}

void table_connection_t::create_id_index()
void table_connection_t::create_id_index(pg_conn_t const &db_connection)
{
if (m_id_index_created) {
log_debug("Id index on table '{}' already created.", table().name());
} else {
log_info("Creating id index on table '{}'...", table().name());
m_db_connection->exec(table().build_sql_create_id_index());
db_connection.exec(table().build_sql_create_id_index());
m_id_index_created = true;
}
}

pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type,
pg_result_t table_connection_t::get_geoms_by_id(pg_conn_t const &db_connection,
osmium::item_type type,
osmid_t id) const
{
assert(table().has_geom_column());
assert(m_db_connection);
std::string const stmt = fmt::format("get_wkb_{}", table().num());
if (table().has_multicolumn_id_index()) {
return m_db_connection->exec_prepared_as_binary("get_wkb",
type_to_char(type), id);
return db_connection.exec_prepared_as_binary(stmt.c_str(),
type_to_char(type), id);
}
return m_db_connection->exec_prepared_as_binary("get_wkb", id);
return db_connection.exec_prepared_as_binary(stmt.c_str(), id);
}

void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id)
Expand Down
31 changes: 15 additions & 16 deletions src/flex-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class flex_table_t
permanent
};

flex_table_t(std::string schema, std::string name)
: m_schema(std::move(schema)), m_name(std::move(name))
flex_table_t(std::string schema, std::string name, std::size_t num)
: m_schema(std::move(schema)), m_name(std::move(name)), m_table_num(num)
{
}

Expand Down Expand Up @@ -197,6 +197,8 @@ class flex_table_t

bool has_columns_with_expire() const noexcept;

std::size_t num() const noexcept { return m_table_num; }

private:
/// The schema this table is in
std::string m_schema;
Expand Down Expand Up @@ -230,6 +232,9 @@ class flex_table_t
*/
std::size_t m_geom_column = std::numeric_limits<std::size_t>::max();

/// Unique number for each table.
std::size_t m_table_num;

/**
* Type of id stored in this table.
*/
Expand All @@ -255,31 +260,28 @@ class table_connection_t
m_target(std::make_shared<db_target_descr_t>(
table->schema(), table->name(), table->id_column_names(),
table->build_sql_column_list())),
m_copy_mgr(copy_thread), m_db_connection(nullptr)
m_copy_mgr(copy_thread)
{
}

void connect(connection_params_t const &connection_params);

void start(bool append);
void start(pg_conn_t const &db_connection, bool append);

void stop(bool updateable, bool append);
void stop(pg_conn_t const &db_connection, bool updateable, bool append);

flex_table_t const &table() const noexcept { return *m_table; }

void teardown() { m_db_connection.reset(); }
void prepare(pg_conn_t const &db_connection);

void prepare();
void analyze(pg_conn_t const &db_connection);

void analyze();

void create_id_index();
void create_id_index(pg_conn_t const &db_connection);

/**
* Get all geometries that have at least one expire config defined
* from the database and return the result set.
*/
pg_result_t get_geoms_by_id(osmium::item_type type, osmid_t id) const;
pg_result_t get_geoms_by_id(pg_conn_t const &db_connection,
osmium::item_type type, osmid_t id) const;

void flush() { m_copy_mgr.flush(); }

Expand Down Expand Up @@ -327,9 +329,6 @@ class table_connection_t
*/
db_copy_mgr_t<db_deleter_by_type_and_id_t> m_copy_mgr;

/// The connection to the database server.
std::unique_ptr<pg_conn_t> m_db_connection;

task_result_t m_task_result;

std::size_t m_count_insert = 0;
Expand Down
23 changes: 14 additions & 9 deletions src/output-flex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,10 @@ void output_flex_t::stop()
{
for (auto &table : m_table_connections) {
table.task_set(thread_pool().submit([&]() {
table.stop(get_options()->slim && !get_options()->droptemp,
pg_conn_t const db_connection{get_options()->connection_params,
"out.flex.stop"};
table.stop(db_connection,
get_options()->slim && !get_options()->droptemp,
get_options()->append);
}));
}
Expand Down Expand Up @@ -1151,13 +1154,15 @@ void output_flex_t::relation_add(osmium::Relation const &relation)
}

void output_flex_t::delete_from_table(table_connection_t *table_connection,
pg_conn_t const &db_connection,
osmium::item_type type, osmid_t osm_id)
{
assert(table_connection);
auto const id = table_connection->table().map_id(type, osm_id);

if (table_connection->table().has_columns_with_expire()) {
auto const result = table_connection->get_geoms_by_id(type, id);
auto const result =
table_connection->get_geoms_by_id(db_connection, type, id);
auto const num_tuples = result.num_tuples();
if (num_tuples > 0) {
int col = 0;
Expand All @@ -1180,7 +1185,7 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id)
{
for (auto &table : m_table_connections) {
if (table.table().matches_type(type) && table.table().has_id_column()) {
delete_from_table(&table, type, osm_id);
delete_from_table(&table, m_db_connection, type, osm_id);
}
}
}
Expand Down Expand Up @@ -1225,8 +1230,7 @@ void output_flex_t::relation_modify(osmium::Relation const &rel)
void output_flex_t::start()
{
for (auto &table : m_table_connections) {
table.connect(get_options()->connection_params);
table.start(get_options()->append);
table.start(m_db_connection, get_options()->append);
}
}

Expand Down Expand Up @@ -1254,6 +1258,7 @@ output_flex_t::output_flex_t(output_flex_t const *other,
std::shared_ptr<db_copy_thread_t> copy_thread)
: output_t(other, std::move(mid)), m_tables(other->m_tables),
m_expire_outputs(other->m_expire_outputs),
m_db_connection(get_options()->connection_params, "out.flex.thread"),
m_stage2_way_ids(other->m_stage2_way_ids),
m_copy_thread(std::move(copy_thread)), m_lua_state(other->m_lua_state),
m_process_node(other->m_process_node), m_process_way(other->m_process_way),
Expand All @@ -1262,8 +1267,7 @@ output_flex_t::output_flex_t(output_flex_t const *other,
{
for (auto &table : *m_tables) {
auto &tc = m_table_connections.emplace_back(&table, m_copy_thread);
tc.connect(get_options()->connection_params);
tc.prepare();
tc.prepare(m_db_connection);
}

for (auto &expire_output : *m_expire_outputs) {
Expand All @@ -1283,6 +1287,7 @@ output_flex_t::output_flex_t(std::shared_ptr<middle_query_t> const &mid,
std::shared_ptr<thread_pool_t> thread_pool,
options_t const &options)
: output_t(mid, std::move(thread_pool), options),
m_db_connection(get_options()->connection_params, "out.flex.main"),
m_copy_thread(std::make_shared<db_copy_thread_t>(options.connection_params))
{
init_lua(options.style);
Expand Down Expand Up @@ -1509,8 +1514,8 @@ void output_flex_t::reprocess_marked()
for (auto &table : m_table_connections) {
if (table.table().matches_type(osmium::item_type::way) &&
table.table().has_id_column()) {
table.analyze();
table.create_id_index();
table.analyze(m_db_connection);
table.create_id_index(m_db_connection);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/output-flex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ class output_flex_t : public output_t
void add_row(table_connection_t *table_connection, OBJECT const &object);

void delete_from_table(table_connection_t *table_connection,
pg_conn_t const &db_connection,
osmium::item_type type, osmid_t osm_id);

void delete_from_tables(osmium::item_type type, osmid_t osm_id);

lua_State *lua_state() noexcept { return m_lua_state.get(); }
Expand Down Expand Up @@ -288,6 +290,9 @@ class output_flex_t : public output_t

std::vector<table_connection_t> m_table_connections;

/// The connection to the database server.
pg_conn_t m_db_connection;

// This is shared between all clones of the output and must only be
// accessed while protected using the lua_mutex.
std::shared_ptr<idset_t> m_stage2_way_ids = std::make_shared<idset_t>();
Expand Down
Loading