Skip to content

Commit

Permalink
Merge pull request #2134 from joto/fewer-db-conns
Browse files Browse the repository at this point in the history
Significantly reduce number of db connection in flex output
  • Loading branch information
lonvia authored Feb 8, 2024
2 parents 68eae3f + ef8f6e5 commit e6a3096
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 86 deletions.
3 changes: 2 additions & 1 deletion src/flex-lua-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ static flex_table_t &create_flex_table(lua_State *lua_state,
throw fmt_error("Table with name '{}' already exists.", table_name);
}

auto &new_table = tables->emplace_back(default_schema, table_name);
auto &new_table =
tables->emplace_back(default_schema, table_name, tables->size());

lua_pop(lua_state, 1); // "name"

Expand Down
85 changes: 36 additions & 49 deletions src/flex-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const

if (has_multicolumn_id_index()) {
return fmt::format(
R"(PREPARE get_wkb(char(1), bigint) AS)"
R"(PREPARE get_wkb_{}(char(1), bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)",
columns, full_name(), m_columns[0].name(), m_columns[1].name());
m_table_num, columns, full_name(), m_columns[0].name(),
m_columns[1].name());
}

return fmt::format(R"(PREPARE get_wkb(bigint) AS)"
return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1)",
columns, full_name(), id_column_names());
m_table_num, columns, full_name(), id_column_names());
}

std::string
Expand Down Expand Up @@ -241,14 +242,6 @@ bool flex_table_t::has_columns_with_expire() const noexcept
[](auto const &column) { return column.has_expire(); });
}

void table_connection_t::connect(connection_params_t const &connection_params)
{
assert(!m_db_connection);

m_db_connection =
std::make_unique<pg_conn_t>(connection_params, "out.flex.table");
}

static void enable_check_trigger(pg_conn_t const &db_connection,
flex_table_t const &table)
{
Expand All @@ -273,50 +266,46 @@ static void enable_check_trigger(pg_conn_t const &db_connection,
checks);
}

void table_connection_t::start(bool append)
void table_connection_t::start(pg_conn_t const &db_connection, bool append)
{
assert(m_db_connection);

if (!append) {
m_db_connection->exec("DROP TABLE IF EXISTS {} CASCADE",
table().full_name());
db_connection.exec("DROP TABLE IF EXISTS {} CASCADE",
table().full_name());
}

// These _tmp tables can be left behind if we run out of disk space.
m_db_connection->exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());
db_connection.exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());

if (!append) {
m_db_connection->exec(table().build_sql_create_table(
db_connection.exec(table().build_sql_create_table(
table().cluster_by_geom() ? flex_table_t::table_type::interim
: flex_table_t::table_type::permanent,
table().full_name()));

enable_check_trigger(*m_db_connection, table());
enable_check_trigger(db_connection, table());
}

prepare();
prepare(db_connection);
}

void table_connection_t::stop(bool updateable, bool append)
void table_connection_t::stop(pg_conn_t const &db_connection, bool updateable,
bool append)
{
assert(m_db_connection);

m_copy_mgr.sync();

if (append) {
teardown();
return;
}

if (table().cluster_by_geom()) {
if (table().geom_column().needs_isvalid()) {
drop_geom_check_trigger(*m_db_connection, table().schema(),
drop_geom_check_trigger(db_connection, table().schema(),
table().name());
}

log_info("Clustering table '{}' by geometry...", table().name());

m_db_connection->exec(table().build_sql_create_table(
db_connection.exec(table().build_sql_create_table(
flex_table_t::table_type::permanent, table().full_tmp_name()));

std::string const columns = table().build_sql_column_list();
Expand Down Expand Up @@ -349,15 +338,15 @@ void table_connection_t::stop(bool updateable, bool append)
sql += geom_column_name;
}

m_db_connection->exec(sql);
db_connection.exec(sql);

m_db_connection->exec("DROP TABLE {}", table().full_name());
m_db_connection->exec(R"(ALTER TABLE {} RENAME TO "{}")",
table().full_tmp_name(), table().name());
db_connection.exec("DROP TABLE {}", table().full_name());
db_connection.exec(R"(ALTER TABLE {} RENAME TO "{}")",
table().full_tmp_name(), table().name());
m_id_index_created = false;

if (updateable) {
enable_check_trigger(*m_db_connection, table());
enable_check_trigger(db_connection, table());
}
}

Expand All @@ -369,55 +358,53 @@ void table_connection_t::stop(bool updateable, bool append)
index.columns());
auto const sql = index.create_index(
qualified_name(table().schema(), table().name()));
m_db_connection->exec(sql);
db_connection.exec(sql);
}
}

if ((table().always_build_id_index() || updateable) &&
table().has_id_column()) {
create_id_index();
create_id_index(db_connection);
}

log_info("Analyzing table '{}'...", table().name());
analyze();

teardown();
analyze(db_connection);
}

void table_connection_t::prepare()
void table_connection_t::prepare(pg_conn_t const &db_connection)
{
assert(m_db_connection);
if (table().has_id_column() && table().has_columns_with_expire()) {
m_db_connection->exec(table().build_sql_prepare_get_wkb());
db_connection.exec(table().build_sql_prepare_get_wkb());
}
}

void table_connection_t::analyze()
void table_connection_t::analyze(pg_conn_t const &db_connection)
{
analyze_table(*m_db_connection, table().schema(), table().name());
analyze_table(db_connection, table().schema(), table().name());
}

void table_connection_t::create_id_index()
void table_connection_t::create_id_index(pg_conn_t const &db_connection)
{
if (m_id_index_created) {
log_debug("Id index on table '{}' already created.", table().name());
} else {
log_info("Creating id index on table '{}'...", table().name());
m_db_connection->exec(table().build_sql_create_id_index());
db_connection.exec(table().build_sql_create_id_index());
m_id_index_created = true;
}
}

pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type,
pg_result_t table_connection_t::get_geoms_by_id(pg_conn_t const &db_connection,
osmium::item_type type,
osmid_t id) const
{
assert(table().has_geom_column());
assert(m_db_connection);
std::string const stmt = fmt::format("get_wkb_{}", table().num());
if (table().has_multicolumn_id_index()) {
return m_db_connection->exec_prepared_as_binary("get_wkb",
type_to_char(type), id);
return db_connection.exec_prepared_as_binary(stmt.c_str(),
type_to_char(type), id);
}
return m_db_connection->exec_prepared_as_binary("get_wkb", id);
return db_connection.exec_prepared_as_binary(stmt.c_str(), id);
}

void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id)
Expand Down
31 changes: 15 additions & 16 deletions src/flex-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class flex_table_t
permanent
};

flex_table_t(std::string schema, std::string name)
: m_schema(std::move(schema)), m_name(std::move(name))
flex_table_t(std::string schema, std::string name, std::size_t num)
: m_schema(std::move(schema)), m_name(std::move(name)), m_table_num(num)
{
}

Expand Down Expand Up @@ -197,6 +197,8 @@ class flex_table_t

bool has_columns_with_expire() const noexcept;

std::size_t num() const noexcept { return m_table_num; }

private:
/// The schema this table is in
std::string m_schema;
Expand Down Expand Up @@ -230,6 +232,9 @@ class flex_table_t
*/
std::size_t m_geom_column = std::numeric_limits<std::size_t>::max();

/// Unique number for each table.
std::size_t m_table_num;

/**
* Type of id stored in this table.
*/
Expand All @@ -255,31 +260,28 @@ class table_connection_t
m_target(std::make_shared<db_target_descr_t>(
table->schema(), table->name(), table->id_column_names(),
table->build_sql_column_list())),
m_copy_mgr(copy_thread), m_db_connection(nullptr)
m_copy_mgr(copy_thread)
{
}

void connect(connection_params_t const &connection_params);

void start(bool append);
void start(pg_conn_t const &db_connection, bool append);

void stop(bool updateable, bool append);
void stop(pg_conn_t const &db_connection, bool updateable, bool append);

flex_table_t const &table() const noexcept { return *m_table; }

void teardown() { m_db_connection.reset(); }
void prepare(pg_conn_t const &db_connection);

void prepare();
void analyze(pg_conn_t const &db_connection);

void analyze();

void create_id_index();
void create_id_index(pg_conn_t const &db_connection);

/**
* Get all geometries that have at least one expire config defined
* from the database and return the result set.
*/
pg_result_t get_geoms_by_id(osmium::item_type type, osmid_t id) const;
pg_result_t get_geoms_by_id(pg_conn_t const &db_connection,
osmium::item_type type, osmid_t id) const;

void flush() { m_copy_mgr.flush(); }

Expand Down Expand Up @@ -327,9 +329,6 @@ class table_connection_t
*/
db_copy_mgr_t<db_deleter_by_type_and_id_t> m_copy_mgr;

/// The connection to the database server.
std::unique_ptr<pg_conn_t> m_db_connection;

task_result_t m_task_result;

std::size_t m_count_insert = 0;
Expand Down
23 changes: 14 additions & 9 deletions src/output-flex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,10 @@ void output_flex_t::stop()
{
for (auto &table : m_table_connections) {
table.task_set(thread_pool().submit([&]() {
table.stop(get_options()->slim && !get_options()->droptemp,
pg_conn_t const db_connection{get_options()->connection_params,
"out.flex.stop"};
table.stop(db_connection,
get_options()->slim && !get_options()->droptemp,
get_options()->append);
}));
}
Expand Down Expand Up @@ -1151,13 +1154,15 @@ void output_flex_t::relation_add(osmium::Relation const &relation)
}

void output_flex_t::delete_from_table(table_connection_t *table_connection,
pg_conn_t const &db_connection,
osmium::item_type type, osmid_t osm_id)
{
assert(table_connection);
auto const id = table_connection->table().map_id(type, osm_id);

if (table_connection->table().has_columns_with_expire()) {
auto const result = table_connection->get_geoms_by_id(type, id);
auto const result =
table_connection->get_geoms_by_id(db_connection, type, id);
auto const num_tuples = result.num_tuples();
if (num_tuples > 0) {
int col = 0;
Expand All @@ -1180,7 +1185,7 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id)
{
for (auto &table : m_table_connections) {
if (table.table().matches_type(type) && table.table().has_id_column()) {
delete_from_table(&table, type, osm_id);
delete_from_table(&table, m_db_connection, type, osm_id);
}
}
}
Expand Down Expand Up @@ -1225,8 +1230,7 @@ void output_flex_t::relation_modify(osmium::Relation const &rel)
void output_flex_t::start()
{
for (auto &table : m_table_connections) {
table.connect(get_options()->connection_params);
table.start(get_options()->append);
table.start(m_db_connection, get_options()->append);
}
}

Expand Down Expand Up @@ -1254,6 +1258,7 @@ output_flex_t::output_flex_t(output_flex_t const *other,
std::shared_ptr<db_copy_thread_t> copy_thread)
: output_t(other, std::move(mid)), m_tables(other->m_tables),
m_expire_outputs(other->m_expire_outputs),
m_db_connection(get_options()->connection_params, "out.flex.thread"),
m_stage2_way_ids(other->m_stage2_way_ids),
m_copy_thread(std::move(copy_thread)), m_lua_state(other->m_lua_state),
m_process_node(other->m_process_node), m_process_way(other->m_process_way),
Expand All @@ -1262,8 +1267,7 @@ output_flex_t::output_flex_t(output_flex_t const *other,
{
for (auto &table : *m_tables) {
auto &tc = m_table_connections.emplace_back(&table, m_copy_thread);
tc.connect(get_options()->connection_params);
tc.prepare();
tc.prepare(m_db_connection);
}

for (auto &expire_output : *m_expire_outputs) {
Expand All @@ -1283,6 +1287,7 @@ output_flex_t::output_flex_t(std::shared_ptr<middle_query_t> const &mid,
std::shared_ptr<thread_pool_t> thread_pool,
options_t const &options)
: output_t(mid, std::move(thread_pool), options),
m_db_connection(get_options()->connection_params, "out.flex.main"),
m_copy_thread(std::make_shared<db_copy_thread_t>(options.connection_params))
{
init_lua(options.style);
Expand Down Expand Up @@ -1509,8 +1514,8 @@ void output_flex_t::reprocess_marked()
for (auto &table : m_table_connections) {
if (table.table().matches_type(osmium::item_type::way) &&
table.table().has_id_column()) {
table.analyze();
table.create_id_index();
table.analyze(m_db_connection);
table.create_id_index(m_db_connection);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/output-flex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ class output_flex_t : public output_t
void add_row(table_connection_t *table_connection, OBJECT const &object);

void delete_from_table(table_connection_t *table_connection,
pg_conn_t const &db_connection,
osmium::item_type type, osmid_t osm_id);

void delete_from_tables(osmium::item_type type, osmid_t osm_id);

lua_State *lua_state() noexcept { return m_lua_state.get(); }
Expand Down Expand Up @@ -288,6 +290,9 @@ class output_flex_t : public output_t

std::vector<table_connection_t> m_table_connections;

/// The connection to the database server.
pg_conn_t m_db_connection;

// This is shared between all clones of the output and must only be
// accessed while protected using the lua_mutex.
std::shared_ptr<idset_t> m_stage2_way_ids = std::make_shared<idset_t>();
Expand Down
Loading

0 comments on commit e6a3096

Please sign in to comment.