Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does cstore_fdw Support Parallel COPY into the Same Table? #240

Open
frugardc opened this issue Sep 29, 2020 · 1 comment
Open

Does cstore_fdw Support Parallel COPY into the Same Table? #240

frugardc opened this issue Sep 29, 2020 · 1 comment

Comments

@frugardc
Copy link

I am attempting to move a large amount of data using Spark/JDBC to insert to a single cstore_fdw table. I am attempting to execute many concurrent COPY commands. It appears each one gains a lock on the table, and they get executed one-by-one as a result. Is this the intended behavior?

@ljluestc
Copy link


from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataLoadToCstore") \
    .getOrCreate()

# Assuming df is your DataFrame containing the data to load
df = spark.read.csv("path_to_your_large_csv_file.csv")

# Write DataFrame to PostgreSQL using JDBC with batch inserts
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://your_host:your_port/your_database") \
    .option("dbtable", "cstore_table") \
    .option("user", "your_user") \
    .option("password", "your_password") \
    .option("batchsize", "10000") \  # Set the batch size for inserts
    .mode("append") \
    .save()

# Alternative using COPY command
import psycopg2
import io

# Connect to PostgreSQL
conn = psycopg2.connect("dbname=your_database user=your_user password=your_password host=your_host")
cur = conn.cursor()

# Function to copy data in batches
def copy_to_cstore(df, table_name):
    buffer = io.StringIO()
    df.to_csv(buffer, header=False, index=False)
    buffer.seek(0)
    
    # Execute COPY command
    cur.copy_from(buffer, table_name, sep=',', columns=df.columns.tolist())
    conn.commit()

# Split DataFrame and copy in chunks
chunk_size = 10000  # Set your chunk size
for start in range(0, df.count(), chunk_size):
    chunk_df = df.limit(chunk_size).offset(start)
    copy_to_cstore(chunk_df, "cstore_table")

# Clean up
cur.close()
conn.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants