Skip to content

Commit

Permalink
Move Python object finding logic to query_conn caller thread and pass…
Browse files Browse the repository at this point in the history
… it to worker pipeline
  • Loading branch information
auxten committed Dec 30, 2024
1 parent af485c4 commit 1d13523
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 6 deletions.
58 changes: 58 additions & 0 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,69 @@
#include "LocalChdb.h"
#include <mutex>
#include "chdb.h"
#include "pybind11/gil.h"
#include "pybind11/pytypes.h"

#if USE_PYTHON

# include <Storages/StoragePython.h>
# include <TableFunctions/TableFunctionPython.h>
# include <Common/re2.h>


namespace py = pybind11;

extern bool inside_main = true;

// Global storage for Python Table Engine queriable object
extern py::handle global_query_obj;

// Find the queriable object in the Python environment
// return nullptr if no Python obj is referenced in query string
// return py::none if the obj referenced not found
// return the Python object if found
// The object name is extracted from the query string, must referenced by
// Python(var_name) or Python('var_name') or python("var_name") or python('var_name')
// such as:
// - `SELECT * FROM Python('PyReader')`
// - `SELECT * FROM Python(PyReader_instance)`
// - `SELECT * FROM Python(some_var_with_type_pandas_DataFrame_or_pyarrow_Table)`
// The object can be any thing that Python Table supported, like PyReader, pandas DataFrame, or PyArrow Table
// The object should be in the global or local scope
py::handle findQueryableObjFromQuery(const std::string & query_str)
{
// Extract the object name from the query string
std::string var_name;

// RE2 pattern to match Python()/python() patterns with single/double quotes or no quotes
static const RE2 pattern(R"([Pp]ython\s*\(\s*(?:['"]([^'"]+)['"]|([a-zA-Z_][a-zA-Z0-9_]*))\s*\))");

re2::StringPiece input(query_str);
std::string quoted_match, unquoted_match;

// Try to match and extract the groups
if (RE2::PartialMatch(query_str, pattern, &quoted_match, &unquoted_match))
{
// If quoted string was matched
if (!quoted_match.empty())
{
var_name = quoted_match;
}
// If unquoted identifier was matched
else if (!unquoted_match.empty())
{
var_name = unquoted_match;
}
}

if (var_name.empty())
{
return nullptr;
}

// Find the object in the Python environment
return DB::findQueryableObj(var_name);
}

local_result_v2 * queryToBuffer(
const std::string & queryStr,
Expand Down Expand Up @@ -258,11 +311,13 @@ connection_wrapper::connection_wrapper(const std::string & conn_str)

connection_wrapper::~connection_wrapper()
{
py::gil_scoped_release release;
close_conn(conn);
}

void connection_wrapper::close()
{
py::gil_scoped_release release;
close_conn(conn);
}

Expand All @@ -278,6 +333,8 @@ void connection_wrapper::commit()

query_result * connection_wrapper::query(const std::string & query_str, const std::string & format)
{
global_query_obj = findQueryableObjFromQuery(query_str);

py::gil_scoped_release release;
auto * result = query_conn(*conn, query_str.c_str(), format.c_str());
if (result->error_message)
Expand All @@ -290,6 +347,7 @@ query_result * connection_wrapper::query(const std::string & query_str, const st
void cursor_wrapper::execute(const std::string & query_str)
{
release_result();
global_query_obj = findQueryableObjFromQuery(query_str);

// Always use Arrow format internally
py::gil_scoped_release release;
Expand Down
17 changes: 11 additions & 6 deletions src/TableFunctions/TableFunctionPython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
#include <Common/PythonUtils.h>
#include <Common/logger_useful.h>


namespace py = pybind11;
// Global storage for Python Table Engine queriable object
py::handle global_query_obj = nullptr;

namespace DB
{

Expand All @@ -28,7 +33,7 @@ extern const int PY_EXCEPTION_OCCURED;
}

// Function to find instance of PyReader, pandas DataFrame, or PyArrow Table, filtered by variable name
py::object find_instances_of_pyreader(const std::string & var_name)
py::object findQueryableObj(const std::string & var_name)
{
py::module inspect = py::module_::import("inspect");
py::object current_frame = inspect.attr("currentframe")();
Expand Down Expand Up @@ -57,7 +62,7 @@ py::object find_instances_of_pyreader(const std::string & var_name)

void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
py::gil_scoped_acquire acquire;
// py::gil_scoped_acquire acquire;
const auto & func_args = ast_function->as<ASTFunction &>();

if (!func_args.arguments)
Expand All @@ -81,8 +86,8 @@ void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr
std::remove_if(py_reader_arg_str.begin(), py_reader_arg_str.end(), [](char c) { return c == '\'' || c == '\"' || c == '`'; }),
py_reader_arg_str.end());

auto instance = find_instances_of_pyreader(py_reader_arg_str);
if (instance.is_none())
auto instance = global_query_obj;
if (instance == nullptr || instance.is_none())
throw Exception(
ErrorCodes::PY_OBJECT_NOT_FOUND,
"Python object not found in the Python environment\n"
Expand All @@ -93,8 +98,8 @@ void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr
"Python object found in Python environment with name: {} type: {}",
py_reader_arg_str,
py::str(instance.attr("__class__")).cast<std::string>());

reader = instance;
py::gil_scoped_acquire acquire;
reader = instance.cast<py::object>();
}
catch (py::error_already_set & e)
{
Expand Down
2 changes: 2 additions & 0 deletions src/TableFunctions/TableFunctionPython.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
namespace DB
{

py::object findQueryableObj(const std::string & var_name);

class TableFunctionPython : public ITableFunction
{
public:
Expand Down

0 comments on commit 1d13523

Please sign in to comment.