From ce27beebd965cbc55d0b7cceb7ede2bd824e711d Mon Sep 17 00:00:00 2001 From: Johannes Kasimir Date: Thu, 6 Feb 2025 08:44:19 +0100 Subject: [PATCH] fix: use cursor to avoid deadlock --- duckdb_flight_server.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/duckdb_flight_server.py b/duckdb_flight_server.py index 43b6477..626d738 100644 --- a/duckdb_flight_server.py +++ b/duckdb_flight_server.py @@ -11,19 +11,21 @@ def __init__(self, location="grpc://localhost:8815", db_path="duck_flight.db"): def do_get(self, context, ticket): """Handle 'GET' requests from clients to retrieve data.""" + cur = self.conn.cursor() query = ticket.ticket.decode("utf-8") - result_table = self.conn.execute(query).fetch_arrow_table() + result_table = cur.execute(query).fetch_arrow_table() # Convert to record batches with alignment batches = result_table.to_batches(max_chunksize=1024) # Use power of 2 for alignment return flight.RecordBatchStream(pa.Table.from_batches(batches)) def do_put(self, context, descriptor, reader, writer): """Handle 'PUT' requests to upload data to the DuckDB instance.""" + cur = self.conn.cursor() table = reader.read_all() table_name = descriptor.path[0].decode('utf-8') # Create table if it doesn't exist - self.conn.execute(f""" + cur.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( batch_id BIGINT, timestamp VARCHAR, @@ -35,16 +37,17 @@ def do_put(self, context, descriptor, reader, writer): # Convert to record batches for better alignment batches = table.to_batches(max_chunksize=1024) aligned_table = pa.Table.from_batches(batches) - self.conn.register("temp_table", aligned_table) + cur.register("temp_table", aligned_table) # Insert new data - self.conn.execute(f"INSERT INTO {table_name} SELECT * FROM temp_table") + cur.execute(f"INSERT INTO {table_name} SELECT * FROM temp_table") def do_action(self, context, action): """Handle custom actions like executing SQL queries.""" + cur = self.conn.cursor() if action.type == "query": query = action.body.to_pybytes().decode("utf-8") - self.conn.execute(query) + cur.execute(query) return [] else: raise NotImplementedError(f"Unknown action type: {action.type}")