Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'COPY ... FROM STDIN' Support with Dynamic Fast Forward and Improved Error Handling - v3.0 #4762

Merged
merged 25 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3118c43
First data row should be skipped since it is part of PGresult, which …
rahim-kanji Oct 16, 2024
616a8e0
Implemented handling of COPY OUT
rahim-kanji Nov 6, 2024
72acc1b
Removed COPY check from TAP test
rahim-kanji Nov 6, 2024
c83fa6d
Added TAP test
rahim-kanji Nov 7, 2024
0590950
Fixed check count
rahim-kanji Nov 8, 2024
79c7f13
Clean up
rahim-kanji Nov 13, 2024
967386d
Added the SESSION_FORWARD_TYPE enum to define the type of session for…
rahim-kanji Nov 25, 2024
e64a434
Few fixes
rahim-kanji Nov 25, 2024
c4a8b0e
Added FAST FORWARD support for PostgreSQL
rahim-kanji Nov 25, 2024
551aeed
Improved COPY OUT format extraction
rahim-kanji Nov 25, 2024
7feaac3
Removed compression code
rahim-kanji Nov 25, 2024
ea059ea
Added SESSION_FORWARD_TYPE enum to define the type of session forwarding
rahim-kanji Nov 25, 2024
6679e59
Added START_REPLICATION command
rahim-kanji Nov 25, 2024
5a37a14
Added PGSQL_QUERY_RESULT_COPY_IN
rahim-kanji Nov 26, 2024
b906c08
Added dynamic fast forward support
rahim-kanji Nov 26, 2024
f33fd02
Add COPY ... FROM STDIN detection regex
rahim-kanji Nov 26, 2024
ba8dc5f
Removed the assertion. Instead, disconnect the client session and bac…
rahim-kanji Nov 26, 2024
395a977
Merge branch 'v3.0'
rahim-kanji Nov 26, 2024
18f9f45
Few fixes
rahim-kanji Nov 26, 2024
490da0c
Fixed admin-listen_on_unix-t TAP test
rahim-kanji Nov 26, 2024
2aa60e3
Added Session Type in logs
rahim-kanji Dec 2, 2024
58db40e
Added TAP test
rahim-kanji Dec 2, 2024
778d1aa
Renamed TAP test
rahim-kanji Dec 2, 2024
794ec0f
Merge remote-tracking branch 'Master/v3.0' into v3.0_dynamic_fast_for…
rahim-kanji Dec 3, 2024
7e20a59
Reordered OpenSSL library and include paths to the end to ensure that…
rahim-kanji Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pcre/pcre/.libs/libpcre.a:

pcre: pcre/pcre/.libs/libpcre.a

postgresql/postgresql/src/interfaces/libpq/libpq.a :
postgresql/postgresql/src/interfaces/libpq/libpq.a:
cd postgresql && rm -rf postgresql-*/ || true
cd postgresql && tar -zxf postgresql-*.tar.gz
cd postgresql/postgresql && patch -p0 < ../get_result_from_pgconn.patch
Expand Down
12 changes: 10 additions & 2 deletions include/Base_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ class StmtLongDataHandler;
class MySQL_Session;
class PgSQL_Session;

enum SESSION_FORWARD_TYPE : uint8_t {
SESSION_FORWARD_TYPE_NONE = 0x00,
SESSION_FORWARD_TYPE_PERMANENT = 0x01,
SESSION_FORWARD_TYPE_TEMPORARY = 0x02,
SESSION_FORWARD_TYPE_COPY_STDIN = 0x04 | SESSION_FORWARD_TYPE_TEMPORARY,
SESSION_FORWARD_TYPE_START_REPLICATION = 0x08 | SESSION_FORWARD_TYPE_TEMPORARY,
};

template<typename S, typename DS, typename B, typename T>
class Base_Session {
public:
Expand Down Expand Up @@ -89,8 +97,8 @@ class Base_Session {
//bool stats;
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
SESSION_FORWARD_TYPE session_fast_forward;
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;
Expand Down
4 changes: 2 additions & 2 deletions include/MySQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class MySQL_Data_Stream
//
// we have a similar code in MySQL_Connection
// in case of ASYNC_CONNECT_SUCCESSFUL
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// if frontend and backend connection use SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
Expand Down Expand Up @@ -260,7 +260,7 @@ class MySQL_Data_Stream
myconn->myds=NULL;
myconn=NULL;
if (encrypted == true) {
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// it seems we are a connection with SSL on a fast_forward session.
// See attach_connection() for more details .
// We now disable SSL metadata from the Data Stream
Expand Down
2 changes: 1 addition & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
Expand Down
8 changes: 7 additions & 1 deletion include/PgSQL_Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,10 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder {
inline int get_pg_is_nonblocking() { return PQisnonblocking(pgsql_conn); }
inline int get_pg_is_threadsafe() { return PQisthreadsafe(); }
inline const char* get_pg_error_message() { return PQerrorMessage(pgsql_conn); }
inline SSL* get_pg_ssl_object() { return (SSL*)PQsslStruct(pgsql_conn, "OpenSSL"); }
const char* get_pg_server_version_str(char* buff, int buff_size);
const char* get_pg_connection_status_str();
const char* get_pg_transaction_status_str();

unsigned int get_memory_usage() const;

//PgSQL_Conn_Param conn_params;
Expand All @@ -655,10 +655,16 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder {
PgSQL_Query_Result* query_result;
PgSQL_Query_Result* query_result_reuse;
bool new_result;
bool is_copy_out;
//PgSQL_SrvC* parent;
//PgSQL_Connection_userinfo* userinfo;
//PgSQL_Data_Stream* myds;
//int fd;

private:
// Handles the COPY OUT response from the server.
// Returns true if it consumes all buffer data, or false if the threshold for result size is reached
bool handle_copy_out(const PGresult* result, uint64_t* processed_bytes);
};

#endif /* __CLASS_PGSQL_CONNECTION_H */
27 changes: 12 additions & 15 deletions include/PgSQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class PgSQL_Data_Stream
private:
int array2buffer();
int buffer2array();
void generate_compressed_packet();
enum pgsql_sslstatus do_ssl_handshake();
void queue_encrypted_bytes(const char* buf, size_t len);
public:
Expand Down Expand Up @@ -217,39 +216,37 @@ class PgSQL_Data_Stream
//
// we have a similar code in MySQL_Connection
// in case of ASYNC_CONNECT_SUCCESSFUL
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// if frontend and backend connection use SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
// directly from PGconn SSL structure.
//
// For futher details:
// - without ssl: we use the file descriptor from pgsql connection
// - with ssl: we use the SSL structure from pgsql connection
if (myconn->pgsql && myconn->ret_mysql) {
if (myconn->pgsql->options.use_ssl == 1) {
if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) {
if (ssl == NULL) {
encrypted = true;
if (ssl == NULL) {
// check the definition of P_MARIADB_TLS
// P_MARIADB_TLS* matls = (P_MARIADB_TLS*)myconn->pgsql->net.pvio->ctls;
// ssl = (SSL*)matls->ssl;
// rbio_ssl = BIO_new(BIO_s_mem());
// wbio_ssl = BIO_new(BIO_s_mem());
// SSL_set_bio(ssl, rbio_ssl, wbio_ssl);
}
SSL* ssl_obj = myconn->get_pg_ssl_object();
if (ssl_obj == NULL) assert(0); // Should not be null
ssl = ssl_obj;
rbio_ssl = BIO_new(BIO_s_mem());
wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(ssl, rbio_ssl, wbio_ssl);
}
}
}
}

// safe way to detach a MySQL Connection
// safe way to detach a PgSQL Connection
void detach_connection() {
assert(myconn);
myconn->statuses.pgconnpoll_put++;
statuses.pgconnpoll_put++;
myconn->myds = NULL;
myconn = NULL;
if (encrypted == true) {
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// it seems we are a connection with SSL on a fast_forward session.
// See attach_connection() for more details .
// We now disable SSL metadata from the Data Stream
Expand Down
75 changes: 75 additions & 0 deletions include/PgSQL_Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ class PgSQL_Protocol;
#define PGSQL_QUERY_RESULT_READY 0x04
#define PGSQL_QUERY_RESULT_ERROR 0x08
#define PGSQL_QUERY_RESULT_EMPTY 0x10
#define PGSQL_QUERY_RESULT_COPY_OUT 0x20
#define PGSQL_QUERY_RESULT_COPY_IN 0x30

class PgSQL_Query_Result {
public:
Expand Down Expand Up @@ -435,6 +437,40 @@ class PgSQL_Query_Result {
*/
unsigned int add_ready_status(PGTransactionStatusType txn_status);

/**
* @brief Adds the start of a COPY OUT response to the packet.
*
* This function adds the initial part of a COPY OUT response to the packet.
* It uses the provided PGresult object to determine the necessary information
* to include in the response.
*
* @param result A pointer to the PGresult object containing the response data.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_start(const PGresult* result);

/**
* @brief Adds a row of data to the COPY OUT response.
*
* This function adds a row of data to the ongoing COPY OUT response. The data
* is provided as a pointer to the row data and its length.
*
* @param data A pointer to the row data to be added.
* @param len The length of the row data in bytes.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_row(const void* data, unsigned int len);

/**
* @brief Adds the end of a COPY OUT response to the packet.
*
* This function adds the final part of a COPY OUT response to the packet,
* indicating the end of the response.
*
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_end();

/**
* @brief Retrieves the query result set and copies it to a PtrSizeArray.
*
Expand Down Expand Up @@ -870,6 +906,45 @@ class PgSQL_Protocol : public MySQL_Protocol {
*/
unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result);

/**
* @brief Copies the start of a response to a PgSQL_Query_Result.
*
* This function copies the initial part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @param result The PGresult object containing the response data.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_start_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);

/**
* @brief Copies a row to a PgSQL_Query_Result.
*
* This function copies a single row of data to the provided PgSQL_Query_Result
* object. It can optionally send the row data.
*
* @param send Whether to send the row data.
* @param pg_query_result The PgSQL_Query_Result object to copy the row to.
* @param data The row data to copy.
* @param len The length of the row data.
* @return The number of bytes copied.
*/
unsigned int copy_out_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const unsigned char* data, unsigned int len);

/**
* @brief Copies the end of a response to a PgSQL_Query_Result.
*
* This function copies the final part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_end_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result);

private:

/**
Expand Down
30 changes: 27 additions & 3 deletions include/PgSQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ class PgSQL_Query_Info {

PgSQL_Query_Info();
~PgSQL_Query_Info();
void init(unsigned char* _p, int len, bool mysql_header = false);
void init(unsigned char* _p, int len, bool header = false);
void query_parser_init();
enum PGSQL_QUERY_command query_parser_command_type();
void query_parser_free();
unsigned long long query_parser_update_counters();
void begin(unsigned char* _p, int len, bool mysql_header = false);
void begin(unsigned char* _p, int len, bool header = false);
void end();
char* get_digest_text();
bool is_select_NOT_for_update();
Expand Down Expand Up @@ -256,6 +256,29 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt);
void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt);

/**
* @brief Switches session from normal mode to fast forward mode.
*
* This method transitions the session to fast forward mode based on session type.
* (Currently only supports SESSION_FORWARD_TYPE_TEMPORARY and extended types)
*
* @param pkt Used solely to push the packet back to client_myds PSarrayIN,
* allowing it to be forwarded to the backend via the fast forward session
* @param command Command that causes the session to switch to fast forward mode.
* @param session_type SESSION_FORWARD_TYPE indicating the type of session.
*
* @return void.
*/
void switch_normal_to_fast_forward_mode(PtrSize_t& pkt, std::string_view command, SESSION_FORWARD_TYPE session_type);

/**
* @brief Switches session from fast forward mode to normal mode.
*
* This method is used to revert session from fast forward mode back to normal mode.
*
*/
void switch_fast_forward_to_normal_mode();

public:
bool handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, const char* var_name, const char* var_value, bool no_quote = false, bool set_transaction = false);
#if 0
Expand Down Expand Up @@ -341,7 +364,7 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
Expand All @@ -359,6 +382,7 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
// StmtLongDataHandler* SLDH;

Session_Regex** match_regexes;
CopyCmdMatcher* copy_cmd_matcher;

ProxySQL_Node_Address* proxysql_node_address; // this is used ONLY for Admin, and only if the other party is another proxysql instance part of a cluster
bool use_ldap_auth;
Expand Down
21 changes: 20 additions & 1 deletion include/PgSQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ enum PgSQL_Thread_status_variable {
PG_st_var_END = 42 // to avoid ASAN complaining. TO FIX
};


struct CopyCmdMatcher {
re2::RE2::Options options;
re2::RE2 pattern;

CopyCmdMatcher() :
options(RE2::Quiet),
pattern(
R"(((?is)(?:--.*?$|/\*[\s\S]*?\*/|\s)*\bCOPY\b\s+[^;]*?\bFROM\b\s+STDIN\b(?:\s+WITH\s*\([^)]*\))?))",
options) {
//((?is)(?:--.*?$|/\*[\s\S]*?\*/|\s)*\bCOPY\b\s+[^;]*?\bFROM\b\s+STDIN\b(?:\s+WITH\s*\([^)]*\))?)
}

inline
bool match(const char* query, re2::StringPiece* matched = nullptr) const {
return re2::RE2::PartialMatch(query, pattern, matched);
}
};

class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread
{
private:
Expand Down Expand Up @@ -196,7 +215,7 @@ class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread
#ifdef IDLE_THREADS
PtrArray* idle_mysql_sessions;
PtrArray* resume_mysql_sessions;

CopyCmdMatcher *copy_cmd_matcher;
pgsql_conn_exchange_t myexchange;
#endif // IDLE_THREADS

Expand Down
1 change: 1 addition & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ enum PGSQL_QUERY_command {
PGSQL_QUERY_ALTER_TABLESPACE,
PGSQL_QUERY_DROP_TABLESPACE,
PGSQL_QUERY_CLUSTER,
PGSQL_QUERY_START_REPLICATION,
PGSQL_QUERY_UNKNOWN,
PGSQL_QUERY__UNINITIALIZED,
PGSQL_QUERY___NONE // Special marker.
Expand Down
4 changes: 4 additions & 0 deletions lib/Base_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ void Base_Thread::register_session(T thr, S _sess, bool up_start) {
// assert(0);
// }
_sess->match_regexes=match_regexes;
if constexpr (std::is_same_v<T, PgSQL_Thread*>) {
_sess->copy_cmd_matcher = (static_cast<PgSQL_Thread*>(this))->copy_cmd_matcher;
}

if (up_start)
_sess->start_time=curtime;
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Registered new session\n", _sess->thread, _sess);
Expand Down
2 changes: 1 addition & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ POSTGRES_LDIR=$(POSTGRES_IFACES)/libpq/

IDIR := ../include

IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE)
IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE) -I$(SSL_IDIR)
ifeq ($(UNAME_S),Linux)
IDIRS += -I$(COREDUMPER_IDIR)
endif
Expand Down
Loading
Loading