Skip to content

Commit

Permalink
Fully implemented result handling with removing query when it's done
Browse files Browse the repository at this point in the history
  • Loading branch information
dankmolot committed Jan 14, 2025
1 parent 0318532 commit a60e658
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 26 deletions.
84 changes: 58 additions & 26 deletions source/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ void query_failed(GLua::ILuaInterface* lua, Connection* state) {
}
}

inline bool bad_result(PGresult* result) {
if (!result) {
return true;
}

auto status = PQresultStatus(result);

return status == PGRES_BAD_RESPONSE || status == PGRES_NONFATAL_ERROR ||
status == PGRES_FATAL_ERROR;
}

void query_result(GLua::ILuaInterface* lua, pg::result&& result,
GLua::AutoReference& callback) {
if (callback.Push()) {
if (!bad_result(result.get())) {
lua->PushBool(true);
create_result_table(lua, result.get());
} else {
lua->PushBool(false);
lua->PushString(PQresultErrorMessage(result.get()));
}

pcall(lua, 2, 0);
}
}

// returns true if poll was successful
// returns false if there was an error
inline bool poll_query(PGconn* conn, Query& query) {
Expand All @@ -55,15 +81,39 @@ inline bool poll_query(PGconn* conn, Query& query) {
return true;
}

bool bad_result(PGresult* result) {
void process_result(GLua::ILuaInterface* lua, Connection* state,
pg::result&& result) {
// query is done
if (!result) {
return true;
state->query.reset();
return process_query(lua, state);
}

auto status = PQresultStatus(result);
// next result might be empty,
// that means that query is done
// and we need to remove query from the state
// so callback can add another query
if (!pg::isBusy(state->conn)) {
auto next_result = pg::getResult(state->conn);
if (!next_result) {
// query is done, we need to remove query from the state
Query query = std::move(*state->query);
state->query.reset();

return status == PGRES_BAD_RESPONSE || status == PGRES_NONFATAL_ERROR ||
status == PGRES_FATAL_ERROR;
query_result(lua, std::move(result), query.callback);

// callback might added another query, process it rightaway
process_query(lua, state);
} else {
// query is not done, but also since we own next result
// we need to call query callback and process next result
query_result(lua, std::move(result), state->query->callback);
process_result(lua, state, std::move(next_result));
}
} else {
// query is not done, but we don't need to process next result
query_result(lua, std::move(result), state->query->callback);
}
}

void async_postgres::process_query(GLua::ILuaInterface* lua,
Expand All @@ -90,26 +140,8 @@ void async_postgres::process_query(GLua::ILuaInterface* lua,
return process_query(lua, state);
}

while (PQisBusy(state->conn.get()) == 0) {
auto result = pg::getResult(state->conn);
if (!result) {
// query is done
// TODO: remove query if we have a final result
state->query.reset();
return process_query(lua, state);
}

if (query.callback) {
query.callback.Push();
if (!bad_result(result.get())) {
lua->PushBool(true);
create_result_table(lua, result.get());
} else {
lua->PushBool(false);
lua->PushString(PQresultErrorMessage(result.get()));
}

pcall(lua, 2, 0);
}
// ensure that getting result won't block
if (!pg::isBusy(state->conn)) {
return process_result(lua, state, pg::getResult(state->conn));
}
}
4 changes: 4 additions & 0 deletions source/safe_pg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ namespace async_postgres::pg {
inline notify getNotify(conn& conn) {
return notify(PQnotifies(conn.get()), &PQfreemem);
}

// misc

inline bool isBusy(conn& conn) { return PQisBusy(conn.get()) == 1; }
} // namespace async_postgres::pg

0 comments on commit a60e658

Please sign in to comment.